Seite 3 von 4

Re: Thread Pool

Verfasst: 28.02.2012 17:01
von Christian+
@STARGÅTE
Vielen Danke für das Feedback.
Den Namen werde ich umstellen war da selbst nicht so zufrieden.
Das mit dem Schließen ist klar dass einen Großteil des Vorteils wieder zunichtemacht.
Ich arbeite schon daran das zu verbessern werde deine Vorschläge da mal versuchen mit reinzunehmen vielleicht finde ich dann endlich eine gute und auch schnelle Lösung für das Problem.

@Nino
Ein Beispiel kommt noch aber erst mal will ich einige Code Verbesserungen umsetzen.

Re: Thread Pool

Verfasst: 28.02.2012 20:11
von Christian+
So habe jetzt mal eine neue Version erstellt allerdings noch nicht gründlich getestet hoffe sie ist fehlerfrei.
Stellt ThreadPools zu Verfügung und darauf aufsetzend TaskGroups die angelegt werden können und über eine Wait Funktionen verfügen.

Code: Alles auswählen

; -------------------------------------------------------------------------------
; -------------------------------------------------------------------------------
; ThreadPool.pb
; 28-02-2012
; Christian+
; -------------------------------------------------------------------------------
EnableExplicit

; -------------------------------------------------------------------------------
; TPoolTaskData Structure
; -------------------------------------------------------------------------------
Structure TPoolTaskData
  *Task
  *Parameter
  Semaphore.i
EndStructure

; -------------------------------------------------------------------------------
; TPoolData Structure
; -------------------------------------------------------------------------------
Structure TPoolData
  Semaphore.i
  Mutex.i
  List Tasks.TPoolTaskData()
  List Threads.i()
EndStructure

; -------------------------------------------------------------------------------
; ThreadPoolThread(*TPool.TPoolData)
; -------------------------------------------------------------------------------
; Die Procedure für die Threads zum Ausführen der Tasks.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; -------------------------------------------------------------------------------
Procedure ThreadPoolThread(*TPool.TPoolData)
  Protected *Task, *Parameter
  Protected Semaphore.i
  Repeat
    ;Auf Task Warten
    WaitSemaphore(*TPool\Semaphore)
    ;Task aus der Liste nehmem
    LockMutex(*TPool\Mutex)
      FirstElement(*TPool\Tasks())
      Semaphore = *TPool\Tasks()\Semaphore
      *Task = *TPool\Tasks()\Task
      *Parameter = *TPool\Tasks()\Parameter
      DeleteElement(*TPool\Tasks())
    UnlockMutex(*TPool\Mutex)
    ;Task ausführen
    If *Task = 0 And *Parameter = 0
      Break
    EndIf
    CallFunctionFast(*Task, *Parameter)
    ;Signal dass Task fertig
    If Semaphore
      SignalSemaphore(Semaphore)
    EndIf
  ForEver
EndProcedure

; -------------------------------------------------------------------------------
; CreateThreadPool(*TPool.TPoolData, Number.i)
; -------------------------------------------------------------------------------
; Legt einen neuen ThreadPool an.
; -------------------------------------------------------------------------------
; Number = Anzahl an Threads die verwendet werden soll.
; -------------------------------------------------------------------------------
Procedure.i CreateThreadPool(Number.i)
  Protected *TPool.TPoolData = AllocateMemory(SizeOf(TPoolData))
  Protected i.i
  InitializeStructure(*TPool, TPoolData)
  *TPool\Semaphore = CreateSemaphore()
  *TPool\Mutex = CreateMutex()
  ;Alle Threads anlegen
  For i = 1 To Number
    AddElement(*TPool\Threads())
    *TPool\Threads() = CreateThread(@ThreadPoolThread(), *TPool)
  Next
  ProcedureReturn *TPool
EndProcedure

; -------------------------------------------------------------------------------
; AddThreadPoolTask(*TPool.TPoolData, *Task, *Parameter = 0, Semaphore.i = 0)
; -------------------------------------------------------------------------------
; Übergibt die Angegebene Methode an den Thread Pool zum im Hintergrund
; abarbeiten. Ähnlich wie CreateThread() die Proceduren unterliegen
; auch den gleichen Einschränkungen.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; *Task = Zeiger auf die Procedure die als Task ausgeführt werden soll.
; *Parameter = Optionaler Zeiger auf einen Parameter.
; Semaphore = Optionales Semaphore das Infomiert wird wenn der Task fertig ist.
; -------------------------------------------------------------------------------
Procedure AddThreadPoolTask(*TPool.TPoolData, *Task, *Parameter = 0, Semaphore.i = 0)
  ;Task hinzufügen
  LockMutex(*TPool\Mutex)
    LastElement(*TPool\Tasks())
    AddElement(*TPool\Tasks())
    *TPool\Tasks()\Task = *Task
    *TPool\Tasks()\Parameter = *Parameter
    *TPool\Tasks()\Semaphore = Semaphore
  UnlockMutex(*TPool\Mutex)
  ;Signal dass neuer Task verfügbar
  SignalSemaphore(*TPool\Semaphore)
EndProcedure

; -------------------------------------------------------------------------------
; CloseThreadPool(*TPool.TPoolData)
; -------------------------------------------------------------------------------
; Beendet einen ThreadPool und wartet dabei bis die Warteschlage abgearbeitet ist.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; -------------------------------------------------------------------------------
Procedure CloseThreadPool(*TPool.TPoolData)
  Protected i.i, Size.i
  Size = ListSize(*TPool\Threads())
  ;Alle Threads beenden
  For i = 1 To Size
    AddThreadPoolTask(*TPool, 0, 0)
  Next
  ;Auf alle Threads warten
  For i = 1 To Size
    WaitThread(*TPool\Threads())
    DeleteElement(*TPool\Threads(), 1)
  Next
  FreeSemaphore(*TPool\Semaphore)
  FreeMutex(*TPool\Mutex)
  FreeList(*TPool\Tasks())
  FreeList(*TPool\Threads())
  FreeMemory(*TPool)
EndProcedure

; -------------------------------------------------------------------------------
; TaskGroupData Structure
; -------------------------------------------------------------------------------
Structure TaskGroupData
  *TPool.TPoolData
  TaskMutex.i
  WaitMutex.i
  TaskCount.i
  WaitCount.i
  TaskSemaphore.i
EndStructure   

; -------------------------------------------------------------------------------
; CreateTaskGroup(*TPool.TPoolData)
; -------------------------------------------------------------------------------
; Erstellt eine Task Group.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; -------------------------------------------------------------------------------
Procedure.i CreateTaskGroup(*TPool.TPoolData)
  Protected *TaskGroup.TaskGroupData = AllocateMemory(SizeOf(TaskGroupData))
  If *TPool <> 0
    *TaskGroup\TPool = *TPool
    *TaskGroup\TaskMutex = CreateMutex()
    *TaskGroup\WaitMutex = CreateMutex()
    *TaskGroup\TaskCount = 0
    *TaskGroup\WaitCount = 0
    *TaskGroup\TaskSemaphore = CreateSemaphore()
    ProcedureReturn *TaskGroup
  EndIf
EndProcedure

; -------------------------------------------------------------------------------
; AddTaskGroupTask(*TaskGroup.TaskGroupData, *Task, *Parameter = 0, *Param3 = 0)
; -------------------------------------------------------------------------------
; Übergibt die Angegebene Methode an die Task Group zum im Hintergrund
; abarbeiten.
; -------------------------------------------------------------------------------
; *TaskGroup.TaskGroupData = Zeiger auf eine Task Group (= TaskGroupData Strcuture).
; *Task = Zeiger auf die Procedure die als Task ausgeführt werden soll.
; *Parameter = Optionaler Zeiger auf einen Parameter.
; -------------------------------------------------------------------------------
Procedure AddTaskGroupTask(*TaskGroup.TaskGroupData, *Task, *Parameter = 0)
  LockMutex(*TaskGroup\TaskMutex)
  If *TaskGroup\TPool
    *TaskGroup\TaskCount + 1
    AddThreadPoolTask(*TaskGroup\TPool, *Task, *Parameter, *TaskGroup\TaskSemaphore)
  EndIf
  UnlockMutex(*TaskGroup\TaskMutex)
EndProcedure

; -------------------------------------------------------------------------------
; WaitTaskGroup(*TaskGroup.TaskGroupData)
; -------------------------------------------------------------------------------
; Wartet auf eine Task Group.
; -------------------------------------------------------------------------------
; *TaskGroup.TaskGroupData = Zeiger auf eine Task Group (= TaskGroupData Strcuture).
; -------------------------------------------------------------------------------
Procedure WaitTaskGroup(*TaskGroup.TaskGroupData)
  Repeat
    LockMutex(*TaskGroup\WaitMutex)
    LockMutex(*TaskGroup\TaskMutex)
    If *TaskGroup\WaitCount = *TaskGroup\TaskCount
      UnlockMutex(*TaskGroup\TaskMutex)
      UnlockMutex(*TaskGroup\WaitMutex)
      Break
    EndIf
    UnlockMutex(*TaskGroup\TaskMutex)
    WaitSemaphore(*TaskGroup\TaskSemaphore)
    *TaskGroup\WaitCount + 1
    UnlockMutex(*TaskGroup\WaitMutex)
  ForEver
EndProcedure

; -------------------------------------------------------------------------------
; CloseTaskGroup()
; -------------------------------------------------------------------------------
; Wartet auf eine Task Group und löscht diese.
; -------------------------------------------------------------------------------
; *TaskGroup.TaskGroupData = Zeiger auf eine Task Group (= TaskGroupData Strcuture).
; -------------------------------------------------------------------------------
Procedure CloseTaskGroup(*TaskGroup.TaskGroupData)
  WaitTaskGroup(*TaskGroup)
  FreeMutex(*TaskGroup\TaskMutex)
  FreeMutex(*TaskGroup\WaitMutex)
  FreeSemaphore(*TaskGroup\TaskSemaphore)
  FreeMemory(*TaskGroup)
EndProcedure

; -------------------------------------------------------------------------------
; -------------------------------------------------------------------------------

; -------------------------------------------------------------------------------
; Beispiel:
; (Nicht sehr sinnvoll aber es sollte ja möglichst kurz sein.)
; -------------------------------------------------------------------------------

Procedure Thread(*p)
  Protected i.i
  For i = 0 To 10000000
  Next
EndProcedure

Define i.i, Time.i
OpenConsole()

Time = ElapsedMilliseconds()
Define Pool.i = CreateThreadPool(4)
Define Group.i = CreateTaskGroup(Pool)
For i = 0 To 50
  AddTaskGroupTask(Group, @Thread(), 0)
Next
WaitTaskGroup(Group)
; ...
; ...
CloseTaskGroup(Group)
CloseThreadPool(Pool)
time = ElapsedMilliseconds() - time
PrintN("Mit TaskGroup: " + Str(time) + "ms")


Time = ElapsedMilliseconds()
Define ThreadPool.i = CreateThreadPool(4)
For i = 0 To 50
  AddThreadPoolTask(ThreadPool, @Thread(), 0)
Next
CloseThreadPool(ThreadPool)
time = ElapsedMilliseconds() - time
PrintN("Mit ThreadPool: " + Str(time) + "ms")


Time.i = ElapsedMilliseconds()
For i = 0 To 50
  Thread(0)
Next
time = ElapsedMilliseconds() - time
PrintN("Ohne Threads: " + Str(time) + "ms")


Input()
End

Re: Thread Pool

Verfasst: 28.02.2012 20:37
von Nino
Hallo Christian+,

im Moment habe ich 2 Frageen (zum neuen Code):

a) CreateThreadPool(Number.i)
; Number = Anzahl an Threads die verwendet werden soll.
=> Wie viele nimmt man da am besten? So viele Threads, wie Kerne vorhanden sind?

b) Muss man die Compiler-Option "Thread-sicheres Executable erstellen" einschalten?

Danke und Grüße, Nino

Re: Thread Pool

Verfasst: 28.02.2012 20:56
von STARGÅTE
Hallo Christian+

Der neue Code mit

Code: Alles auswählen

AddThreadPoolTask(*TPool.TPoolData, *Task, Params.i = 0, *Param1 = 0, *Param2 = 0, *Param3 = 0, Semaphore.i = 0)
ist total sinnfrei.
  • Ersten müsste man jetzt (um Semaphore zu ändern) viele Nullen davor schreiben.
  • Zweitens ist es quscht dem Benutzer 3 Parameter anzugeben, weil diese eh nur Pointer sein können.
    In den meisten Fällen muss der Nutzer also eh eine Struktur anlegen und diese übergeben (für Floats, Strings usw.).
    Hier 3 statt wie normal 1 Pointer zu übergeben ist einfach nur quascht.
    Zumal man jetzt auch noch den nervigen Params.i Parameter hat -.-
Bitte ändere das wieder!

Desweiteren kannst du statt CallFunctionFast auch einfach einene Prototype anlegen und diesen dann direkt aufrufen.

Re: Thread Pool

Verfasst: 28.02.2012 21:14
von Christian+
@Nino

zu a)
Nun das hängt ab davon was du machst. Viele Threads schaden nicht außer es werden zu viele das das verwalten der Threads schon erheblich rechenaufwand ist. Das passiert aber eigentlich nur mit CreateThread in einer Schleife nicht beim Festlegen einer Anzahl für einen ThreadPool meist ist man da bescheiden genug. Nutzen bringen dir nur so viele Threads wie du Kerne hast. Wenn es dir ums Testen geht nimm so viele wie die CPU Kerne hat das ist meist optimal kommt aber auch drauf an was du machst. Ich bau bei mir meist 8 ein mehr Kerne haben die meisten ja nicht. Wenn allerdings in den Tasks Befehle verwendet werden die den Thread zum pausieren/auf etwas zu warten bringen können schadet auch mehr nicht so, dasd immer min. so viele Arbeiten können wie Kerne da sind wobei da dann die Frage ist ob man nicht lieber einen Richtigen Thread für so was verwendet.

zu b)
"Thread-sicheres Executable" sollte man aktivieren den es gibt einige Befehle die wenn man in Threads verwendet dies voraussetzen um korrekt zu funktionieren und da immer aufpassen ist nur was für die wo sich super auskennen und macht nicht unbedingt Sinn.

@STARGÅTE
Das mit den Parametern werde ich auch wieder ändern ich wollte da nur mal probieren.
Das mit dem Prototype anlegen werde ich dann auch mal noch Testen habe das für so was noch nie verwendet mal schauen wie das geht.

Edit: Parameter wieder geändert

Re: Thread Pool

Verfasst: 28.02.2012 21:49
von Danilo
Mit dem ThreadPool von Windows 2000 Professional+ geht das mit der Funktion QueueUserWorkItem().
Dann kümmert sich Windows um alles. Ist effizienter als CreateThread, da nicht immer neue Threads
erstellt werden müssen. Ideal wenn man immer wieder viele kleine, kurze Threads laufen lässt, z.B.
Berechnungen, SortArray und sowas.

Code: Alles auswählen

;
; Windows 2000 Professional+
;
#WT_EXECUTELONGFUNCTION = $10

Import "kernel32.lib"
    QueueUserWorkItem(*threadFunction,*argument=0,Flags.l=0)
EndImport

Procedure GetProcessorCount()
    Protected si.SYSTEM_INFO
    GetSystemInfo_(@si)
    ProcedureReturn si\dwNumberOfProcessors
EndProcedure

Procedure t(value)
    Repeat
        a = Random($FFFFFFFF)*Random($FFFFFFFF)*Random($FFFFFFFF)
    ForEver
EndProcedure

MessageRequester("Threads","Starte "+Str(GetProcessorCount())+" Threads")

For i = 1 To GetProcessorCount()
    QueueUserWorkItem(@t())   ;,i,#WT_EXECUTELONGFUNCTION)
Next

Delay(20000)

Re: Thread Pool

Verfasst: 28.02.2012 21:59
von RSBasic
@Danilo
:allright:

Re: Thread Pool

Verfasst: 28.02.2012 22:24
von Nino
@Danilo:
Was genau macht QueueUserWorkItem() mit der Prozedur t()?
Es sieht ja eigentlich so aus, als ob t() wegen der Repeat/ForEver-Schleife nie endet. Das Program endet ja aber doch. :o

Grüße, Nino

Re: Thread Pool

Verfasst: 28.02.2012 22:30
von RSBasic
@Nino
Die Prozedur wird ja im Hintergrund durchgeführt, wie als ob du diese Prozedur in einem PB-Thread auslagerst. D.h. es wird nicht so lange gewartet, bis t() fertig ist, sondern t() wird einfach im Hintergrund durchgeführt, so dass die Anwendung nicht gestoppt wird.
Und wenn die 20 Sekunden vorbei sind, dann wird die Anwendung beendet, weil danach nichts kommt.

Re: Thread Pool

Verfasst: 28.02.2012 22:31
von ts-soft
@Nino
Threads leben und sterben mit dem Process. Nach beenden des Processes gibt es auch keine Zombies mehr :mrgreen: