Thread Pool

Hier könnt Ihr gute, von Euch geschriebene Codes posten. Sie müssen auf jeden Fall funktionieren und sollten möglichst effizient, elegant und beispielhaft oder einfach nur cool sein.
Christian+
Beiträge: 213
Registriert: 13.07.2008 10:05
Computerausstattung: Windows 8.1 Pro
AMD Phenom II X4 955 @ 3.2 GHz
4GB RAM
NVIDIA GeForce GTX 660

Re: Thread Pool

Beitrag von Christian+ »

@STARGÅTE
Vielen Danke für das Feedback.
Den Namen werde ich umstellen war da selbst nicht so zufrieden.
Das mit dem Schließen ist klar dass einen Großteil des Vorteils wieder zunichtemacht.
Ich arbeite schon daran das zu verbessern werde deine Vorschläge da mal versuchen mit reinzunehmen vielleicht finde ich dann endlich eine gute und auch schnelle Lösung für das Problem.

@Nino
Ein Beispiel kommt noch aber erst mal will ich einige Code Verbesserungen umsetzen.
Windows 8.1 Pro 64Bit | AMD Phenom II X4 955 @ 3.2 GHz | 4GB RAM | NVIDIA GeForce GTX 660
Christian+
Beiträge: 213
Registriert: 13.07.2008 10:05
Computerausstattung: Windows 8.1 Pro
AMD Phenom II X4 955 @ 3.2 GHz
4GB RAM
NVIDIA GeForce GTX 660

Re: Thread Pool

Beitrag von Christian+ »

So habe jetzt mal eine neue Version erstellt allerdings noch nicht gründlich getestet hoffe sie ist fehlerfrei.
Stellt ThreadPools zu Verfügung und darauf aufsetzend TaskGroups die angelegt werden können und über eine Wait Funktionen verfügen.

Code: Alles auswählen

; -------------------------------------------------------------------------------
; -------------------------------------------------------------------------------
; ThreadPool.pb
; 28-02-2012
; Christian+
; -------------------------------------------------------------------------------
EnableExplicit

; -------------------------------------------------------------------------------
; TPoolTaskData Structure
; -------------------------------------------------------------------------------
Structure TPoolTaskData
  *Task
  *Parameter
  Semaphore.i
EndStructure

; -------------------------------------------------------------------------------
; TPoolData Structure
; -------------------------------------------------------------------------------
Structure TPoolData
  Semaphore.i
  Mutex.i
  List Tasks.TPoolTaskData()
  List Threads.i()
EndStructure

; -------------------------------------------------------------------------------
; ThreadPoolThread(*TPool.TPoolData)
; -------------------------------------------------------------------------------
; Die Procedure für die Threads zum Ausführen der Tasks.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; -------------------------------------------------------------------------------
Procedure ThreadPoolThread(*TPool.TPoolData)
  Protected *Task, *Parameter
  Protected Semaphore.i
  Repeat
    ;Auf Task Warten
    WaitSemaphore(*TPool\Semaphore)
    ;Task aus der Liste nehmem
    LockMutex(*TPool\Mutex)
      FirstElement(*TPool\Tasks())
      Semaphore = *TPool\Tasks()\Semaphore
      *Task = *TPool\Tasks()\Task
      *Parameter = *TPool\Tasks()\Parameter
      DeleteElement(*TPool\Tasks())
    UnlockMutex(*TPool\Mutex)
    ;Task ausführen
    If *Task = 0 And *Parameter = 0
      Break
    EndIf
    CallFunctionFast(*Task, *Parameter)
    ;Signal dass Task fertig
    If Semaphore
      SignalSemaphore(Semaphore)
    EndIf
  ForEver
EndProcedure

; -------------------------------------------------------------------------------
; CreateThreadPool(*TPool.TPoolData, Number.i)
; -------------------------------------------------------------------------------
; Legt einen neuen ThreadPool an.
; -------------------------------------------------------------------------------
; Number = Anzahl an Threads die verwendet werden soll.
; -------------------------------------------------------------------------------
Procedure.i CreateThreadPool(Number.i)
  Protected *TPool.TPoolData = AllocateMemory(SizeOf(TPoolData))
  Protected i.i
  InitializeStructure(*TPool, TPoolData)
  *TPool\Semaphore = CreateSemaphore()
  *TPool\Mutex = CreateMutex()
  ;Alle Threads anlegen
  For i = 1 To Number
    AddElement(*TPool\Threads())
    *TPool\Threads() = CreateThread(@ThreadPoolThread(), *TPool)
  Next
  ProcedureReturn *TPool
EndProcedure

; -------------------------------------------------------------------------------
; AddThreadPoolTask(*TPool.TPoolData, *Task, *Parameter = 0, Semaphore.i = 0)
; -------------------------------------------------------------------------------
; Übergibt die Angegebene Methode an den Thread Pool zum im Hintergrund
; abarbeiten. Ähnlich wie CreateThread() die Proceduren unterliegen
; auch den gleichen Einschränkungen.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; *Task = Zeiger auf die Procedure die als Task ausgeführt werden soll.
; *Parameter = Optionaler Zeiger auf einen Parameter.
; Semaphore = Optionales Semaphore das Infomiert wird wenn der Task fertig ist.
; -------------------------------------------------------------------------------
Procedure AddThreadPoolTask(*TPool.TPoolData, *Task, *Parameter = 0, Semaphore.i = 0)
  ;Task hinzufügen
  LockMutex(*TPool\Mutex)
    LastElement(*TPool\Tasks())
    AddElement(*TPool\Tasks())
    *TPool\Tasks()\Task = *Task
    *TPool\Tasks()\Parameter = *Parameter
    *TPool\Tasks()\Semaphore = Semaphore
  UnlockMutex(*TPool\Mutex)
  ;Signal dass neuer Task verfügbar
  SignalSemaphore(*TPool\Semaphore)
EndProcedure

; -------------------------------------------------------------------------------
; CloseThreadPool(*TPool.TPoolData)
; -------------------------------------------------------------------------------
; Beendet einen ThreadPool und wartet dabei bis die Warteschlage abgearbeitet ist.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; -------------------------------------------------------------------------------
Procedure CloseThreadPool(*TPool.TPoolData)
  Protected i.i, Size.i
  Size = ListSize(*TPool\Threads())
  ;Alle Threads beenden
  For i = 1 To Size
    AddThreadPoolTask(*TPool, 0, 0)
  Next
  ;Auf alle Threads warten
  For i = 1 To Size
    WaitThread(*TPool\Threads())
    DeleteElement(*TPool\Threads(), 1)
  Next
  FreeSemaphore(*TPool\Semaphore)
  FreeMutex(*TPool\Mutex)
  FreeList(*TPool\Tasks())
  FreeList(*TPool\Threads())
  FreeMemory(*TPool)
EndProcedure

; -------------------------------------------------------------------------------
; TaskGroupData Structure
; -------------------------------------------------------------------------------
Structure TaskGroupData
  *TPool.TPoolData
  TaskMutex.i
  WaitMutex.i
  TaskCount.i
  WaitCount.i
  TaskSemaphore.i
EndStructure   

; -------------------------------------------------------------------------------
; CreateTaskGroup(*TPool.TPoolData)
; -------------------------------------------------------------------------------
; Erstellt eine Task Group.
; -------------------------------------------------------------------------------
; *TPool.TPoolData = Zeiger auf einen Thread Pool (= TPoolData Strcuture)
; -------------------------------------------------------------------------------
Procedure.i CreateTaskGroup(*TPool.TPoolData)
  Protected *TaskGroup.TaskGroupData = AllocateMemory(SizeOf(TaskGroupData))
  If *TPool <> 0
    *TaskGroup\TPool = *TPool
    *TaskGroup\TaskMutex = CreateMutex()
    *TaskGroup\WaitMutex = CreateMutex()
    *TaskGroup\TaskCount = 0
    *TaskGroup\WaitCount = 0
    *TaskGroup\TaskSemaphore = CreateSemaphore()
    ProcedureReturn *TaskGroup
  EndIf
EndProcedure

; -------------------------------------------------------------------------------
; AddTaskGroupTask(*TaskGroup.TaskGroupData, *Task, *Parameter = 0, *Param3 = 0)
; -------------------------------------------------------------------------------
; Übergibt die Angegebene Methode an die Task Group zum im Hintergrund
; abarbeiten.
; -------------------------------------------------------------------------------
; *TaskGroup.TaskGroupData = Zeiger auf eine Task Group (= TaskGroupData Strcuture).
; *Task = Zeiger auf die Procedure die als Task ausgeführt werden soll.
; *Parameter = Optionaler Zeiger auf einen Parameter.
; -------------------------------------------------------------------------------
Procedure AddTaskGroupTask(*TaskGroup.TaskGroupData, *Task, *Parameter = 0)
  LockMutex(*TaskGroup\TaskMutex)
  If *TaskGroup\TPool
    *TaskGroup\TaskCount + 1
    AddThreadPoolTask(*TaskGroup\TPool, *Task, *Parameter, *TaskGroup\TaskSemaphore)
  EndIf
  UnlockMutex(*TaskGroup\TaskMutex)
EndProcedure

; -------------------------------------------------------------------------------
; WaitTaskGroup(*TaskGroup.TaskGroupData)
; -------------------------------------------------------------------------------
; Wartet auf eine Task Group.
; -------------------------------------------------------------------------------
; *TaskGroup.TaskGroupData = Zeiger auf eine Task Group (= TaskGroupData Strcuture).
; -------------------------------------------------------------------------------
Procedure WaitTaskGroup(*TaskGroup.TaskGroupData)
  Repeat
    LockMutex(*TaskGroup\WaitMutex)
    LockMutex(*TaskGroup\TaskMutex)
    If *TaskGroup\WaitCount = *TaskGroup\TaskCount
      UnlockMutex(*TaskGroup\TaskMutex)
      UnlockMutex(*TaskGroup\WaitMutex)
      Break
    EndIf
    UnlockMutex(*TaskGroup\TaskMutex)
    WaitSemaphore(*TaskGroup\TaskSemaphore)
    *TaskGroup\WaitCount + 1
    UnlockMutex(*TaskGroup\WaitMutex)
  ForEver
EndProcedure

; -------------------------------------------------------------------------------
; CloseTaskGroup()
; -------------------------------------------------------------------------------
; Wartet auf eine Task Group und löscht diese.
; -------------------------------------------------------------------------------
; *TaskGroup.TaskGroupData = Zeiger auf eine Task Group (= TaskGroupData Strcuture).
; -------------------------------------------------------------------------------
Procedure CloseTaskGroup(*TaskGroup.TaskGroupData)
  WaitTaskGroup(*TaskGroup)
  FreeMutex(*TaskGroup\TaskMutex)
  FreeMutex(*TaskGroup\WaitMutex)
  FreeSemaphore(*TaskGroup\TaskSemaphore)
  FreeMemory(*TaskGroup)
EndProcedure

; -------------------------------------------------------------------------------
; -------------------------------------------------------------------------------

; -------------------------------------------------------------------------------
; Beispiel:
; (Nicht sehr sinnvoll aber es sollte ja möglichst kurz sein.)
; -------------------------------------------------------------------------------

Procedure Thread(*p)
  Protected i.i
  For i = 0 To 10000000
  Next
EndProcedure

Define i.i, Time.i
OpenConsole()

Time = ElapsedMilliseconds()
Define Pool.i = CreateThreadPool(4)
Define Group.i = CreateTaskGroup(Pool)
For i = 0 To 50
  AddTaskGroupTask(Group, @Thread(), 0)
Next
WaitTaskGroup(Group)
; ...
; ...
CloseTaskGroup(Group)
CloseThreadPool(Pool)
time = ElapsedMilliseconds() - time
PrintN("Mit TaskGroup: " + Str(time) + "ms")


Time = ElapsedMilliseconds()
Define ThreadPool.i = CreateThreadPool(4)
For i = 0 To 50
  AddThreadPoolTask(ThreadPool, @Thread(), 0)
Next
CloseThreadPool(ThreadPool)
time = ElapsedMilliseconds() - time
PrintN("Mit ThreadPool: " + Str(time) + "ms")


Time.i = ElapsedMilliseconds()
For i = 0 To 50
  Thread(0)
Next
time = ElapsedMilliseconds() - time
PrintN("Ohne Threads: " + Str(time) + "ms")


Input()
End
Zuletzt geändert von Christian+ am 28.02.2012 21:27, insgesamt 3-mal geändert.
Windows 8.1 Pro 64Bit | AMD Phenom II X4 955 @ 3.2 GHz | 4GB RAM | NVIDIA GeForce GTX 660
Nino
Beiträge: 1300
Registriert: 13.05.2010 09:26
Wohnort: Berlin

Re: Thread Pool

Beitrag von Nino »

Hallo Christian+,

im Moment habe ich 2 Frageen (zum neuen Code):

a) CreateThreadPool(Number.i)
; Number = Anzahl an Threads die verwendet werden soll.
=> Wie viele nimmt man da am besten? So viele Threads, wie Kerne vorhanden sind?

b) Muss man die Compiler-Option "Thread-sicheres Executable erstellen" einschalten?

Danke und Grüße, Nino
Benutzeravatar
STARGÅTE
Kommando SG1
Beiträge: 7032
Registriert: 01.11.2005 13:34
Wohnort: Glienicke
Kontaktdaten:

Re: Thread Pool

Beitrag von STARGÅTE »

Hallo Christian+

Der neue Code mit

Code: Alles auswählen

AddThreadPoolTask(*TPool.TPoolData, *Task, Params.i = 0, *Param1 = 0, *Param2 = 0, *Param3 = 0, Semaphore.i = 0)
ist total sinnfrei.
  • Ersten müsste man jetzt (um Semaphore zu ändern) viele Nullen davor schreiben.
  • Zweitens ist es quscht dem Benutzer 3 Parameter anzugeben, weil diese eh nur Pointer sein können.
    In den meisten Fällen muss der Nutzer also eh eine Struktur anlegen und diese übergeben (für Floats, Strings usw.).
    Hier 3 statt wie normal 1 Pointer zu übergeben ist einfach nur quascht.
    Zumal man jetzt auch noch den nervigen Params.i Parameter hat -.-
Bitte ändere das wieder!

Desweiteren kannst du statt CallFunctionFast auch einfach einene Prototype anlegen und diesen dann direkt aufrufen.
PB 6.01 ― Win 10, 21H2 ― Ryzen 9 3900X, 32 GB ― NVIDIA GeForce RTX 3080 ― Vivaldi 6.0 ― www.unionbytes.de
Aktuelles Projekt: Lizard - Skriptsprache für symbolische Berechnungen und mehr
Christian+
Beiträge: 213
Registriert: 13.07.2008 10:05
Computerausstattung: Windows 8.1 Pro
AMD Phenom II X4 955 @ 3.2 GHz
4GB RAM
NVIDIA GeForce GTX 660

Re: Thread Pool

Beitrag von Christian+ »

@Nino

zu a)
Nun das hängt ab davon was du machst. Viele Threads schaden nicht außer es werden zu viele das das verwalten der Threads schon erheblich rechenaufwand ist. Das passiert aber eigentlich nur mit CreateThread in einer Schleife nicht beim Festlegen einer Anzahl für einen ThreadPool meist ist man da bescheiden genug. Nutzen bringen dir nur so viele Threads wie du Kerne hast. Wenn es dir ums Testen geht nimm so viele wie die CPU Kerne hat das ist meist optimal kommt aber auch drauf an was du machst. Ich bau bei mir meist 8 ein mehr Kerne haben die meisten ja nicht. Wenn allerdings in den Tasks Befehle verwendet werden die den Thread zum pausieren/auf etwas zu warten bringen können schadet auch mehr nicht so, dasd immer min. so viele Arbeiten können wie Kerne da sind wobei da dann die Frage ist ob man nicht lieber einen Richtigen Thread für so was verwendet.

zu b)
"Thread-sicheres Executable" sollte man aktivieren den es gibt einige Befehle die wenn man in Threads verwendet dies voraussetzen um korrekt zu funktionieren und da immer aufpassen ist nur was für die wo sich super auskennen und macht nicht unbedingt Sinn.

@STARGÅTE
Das mit den Parametern werde ich auch wieder ändern ich wollte da nur mal probieren.
Das mit dem Prototype anlegen werde ich dann auch mal noch Testen habe das für so was noch nie verwendet mal schauen wie das geht.

Edit: Parameter wieder geändert
Windows 8.1 Pro 64Bit | AMD Phenom II X4 955 @ 3.2 GHz | 4GB RAM | NVIDIA GeForce GTX 660
Benutzeravatar
Danilo
-= Anfänger =-
Beiträge: 2284
Registriert: 29.08.2004 03:07

Re: Thread Pool

Beitrag von Danilo »

Mit dem ThreadPool von Windows 2000 Professional+ geht das mit der Funktion QueueUserWorkItem().
Dann kümmert sich Windows um alles. Ist effizienter als CreateThread, da nicht immer neue Threads
erstellt werden müssen. Ideal wenn man immer wieder viele kleine, kurze Threads laufen lässt, z.B.
Berechnungen, SortArray und sowas.

Code: Alles auswählen

;
; Windows 2000 Professional+
;
#WT_EXECUTELONGFUNCTION = $10

Import "kernel32.lib"
    QueueUserWorkItem(*threadFunction,*argument=0,Flags.l=0)
EndImport

Procedure GetProcessorCount()
    Protected si.SYSTEM_INFO
    GetSystemInfo_(@si)
    ProcedureReturn si\dwNumberOfProcessors
EndProcedure

Procedure t(value)
    Repeat
        a = Random($FFFFFFFF)*Random($FFFFFFFF)*Random($FFFFFFFF)
    ForEver
EndProcedure

MessageRequester("Threads","Starte "+Str(GetProcessorCount())+" Threads")

For i = 1 To GetProcessorCount()
    QueueUserWorkItem(@t())   ;,i,#WT_EXECUTELONGFUNCTION)
Next

Delay(20000)
cya,
...Danilo
"Ein Genie besteht zu 10% aus Inspiration und zu 90% aus Transpiration" - Max Planck
Benutzeravatar
RSBasic
Admin
Beiträge: 8047
Registriert: 05.10.2006 18:55
Wohnort: Gernsbach
Kontaktdaten:

Re: Thread Pool

Beitrag von RSBasic »

@Danilo
:allright:
Aus privaten Gründen habe ich leider nicht mehr so viel Zeit wie früher. Bitte habt Verständnis dafür.
Bild
Bild
Nino
Beiträge: 1300
Registriert: 13.05.2010 09:26
Wohnort: Berlin

Re: Thread Pool

Beitrag von Nino »

@Danilo:
Was genau macht QueueUserWorkItem() mit der Prozedur t()?
Es sieht ja eigentlich so aus, als ob t() wegen der Repeat/ForEver-Schleife nie endet. Das Program endet ja aber doch. :o

Grüße, Nino
Benutzeravatar
RSBasic
Admin
Beiträge: 8047
Registriert: 05.10.2006 18:55
Wohnort: Gernsbach
Kontaktdaten:

Re: Thread Pool

Beitrag von RSBasic »

@Nino
Die Prozedur wird ja im Hintergrund durchgeführt, wie als ob du diese Prozedur in einem PB-Thread auslagerst. D.h. es wird nicht so lange gewartet, bis t() fertig ist, sondern t() wird einfach im Hintergrund durchgeführt, so dass die Anwendung nicht gestoppt wird.
Und wenn die 20 Sekunden vorbei sind, dann wird die Anwendung beendet, weil danach nichts kommt.
Aus privaten Gründen habe ich leider nicht mehr so viel Zeit wie früher. Bitte habt Verständnis dafür.
Bild
Bild
Benutzeravatar
ts-soft
Beiträge: 22292
Registriert: 08.09.2004 00:57
Computerausstattung: Mainboard: MSI 970A-G43
CPU: AMD FX-6300 Six-Core Processor
GraKa: GeForce GTX 750 Ti, 2 GB
Memory: 16 GB DDR3-1600 - Dual Channel
Wohnort: Berlin

Re: Thread Pool

Beitrag von ts-soft »

@Nino
Threads leben und sterben mit dem Process. Nach beenden des Processes gibt es auch keine Zombies mehr :mrgreen:
PureBasic 5.73 LTS | SpiderBasic 2.30 | Windows 10 Pro (x64) | Linux Mint 20.1 (x64)
Nutella hat nur sehr wenig Vitamine. Deswegen muss man davon relativ viel essen.
Bild
Antworten