JobQueue - MultiThread

Hier könnt Ihr gute, von Euch geschriebene Codes posten. Sie müssen auf jeden Fall funktionieren und sollten möglichst effizient, elegant und beispielhaft oder einfach nur cool sein.
Benutzeravatar
cxAlex
Beiträge: 2111
Registriert: 26.06.2008 10:42

JobQueue - MultiThread

Beitrag von cxAlex »

In Zeiten von Doppel- und Vierkernprozessoren werden Multithread Anwendungen immer interessanter, umd die ganze Rechenleistung so eines CPU auszunutzen. Doch die Programmierung mit Threads hat ihre Tücken, schnell passieren Speicherkollisionen, usw.

Darum hab ich eine JobQueue geschrieben die auf beliebig vielen Threads basieren kann. Doch nicht nur für parallele Verarbeitung mehrerer Proceduren ist die JobQueue gut. Serveranwendungen können z.B. den Input der Clients verarbeiten, die Reaktionen (Antworten/Daten senden/usw) auf die JobQueue pushen und ohne Verzögerung wieder weiterarbeiten. (Master - Slave)

Features:
  • FirstIn - FirstOut Prinzip: Was zuerst gepusht wird, wird zuerst bearbeitet
  • Initialisierbar mit x Threads
  • Jobs in 3 Prioritätsklassen teilbar
  • Rückgabewerte werden gecached und können später abgerufen werden
  • Sehr Perfomat
Democode (auch beim Download enthalten):

Die parallele Berechnung ist auf meinen Quad Core ca. 3,8 - 3,9x schneller als die serielle. Ohne Debugger ausfüren, sonst dauert das ewig ^^.

Code: Alles auswählen

XIncludeFile "JobQueue_MultiThread.pbi"
XIncludeFile "HighResTimer.pbi"

Procedure DemoProg(Para)
  ; Seeeeeeehr Recheneintensive Berechnung
  For i = 1 To 1000000
    For j = 1 To 1000
      Para + j/i
    Next
  Next
  ProcedureReturn Para
EndProcedure

#RealTimeTest = 0 ; Für Echtzeittest auf 1 setzen. ACHTUNG: Möglicherweise reagiert der PC einige Sekunden lang nicht mehr.

; Parallele Berechnung
t = GetTimeMS()
Jobs = JobQueue_Init(4) ; Initialisierung mit 4 Threads (Quad Core)
If #RealTimeTest ; Echtzeit Test
  Job1 = JobQueue_Add(Jobs, @DemoProg(), 1, #Job_RealTimePriority)
  Job2 = JobQueue_Add(Jobs, @DemoProg(), 23, #Job_RealTimePriority)
  Job3 = JobQueue_Add(Jobs, @DemoProg(), 45, #Job_RealTimePriority)
  Job4 = JobQueue_Add(Jobs, @DemoProg(), 67, #Job_RealTimePriority)
Else ; Normale Priorität
  Job1 = JobQueue_Add(Jobs, @DemoProg(), 1)
  Job2 = JobQueue_Add(Jobs, @DemoProg(), 23)
  Job3 = JobQueue_Add(Jobs, @DemoProg(), 45)
  Job4 = JobQueue_Add(Jobs, @DemoProg(), 67)
EndIf

While JobQueue_Count(Jobs)
  Delay(1)
Wend


Result1_1 = JobQueue_JobReturnValue(Jobs, Job1)
Result1_2 = JobQueue_JobReturnValue(Jobs, Job2)
Result1_3 = JobQueue_JobReturnValue(Jobs, Job3)
Result1_4 = JobQueue_JobReturnValue(Jobs, Job4)
; Ein bisschen Statistik
Worker_1 = JobQueue_JobWorker(Jobs, Job1)
Worker_2 = JobQueue_JobWorker(Jobs, Job2)
Worker_3 = JobQueue_JobWorker(Jobs, Job3)
Worker_4 = JobQueue_JobWorker(Jobs, Job4)

JobQueue_Free(Jobs)
t1 = GetTimeMS()-t

; Serielle Berechnung
t = GetTimeMS()
Result2_1 = DemoProg(1)
Result2_2 = DemoProg(23)
Result2_3 = DemoProg(45)
Result2_4 = DemoProg(67)
t2 = GetTimeMS()-t

; Auswertung
msg.s = "Parallel: " + Str(t1) + " ms" + Chr(13)
msg + "Result1: " + Str(Result1_1) + " ,Worker Thread: " + Str(Worker_1) + Chr(13)
msg + "Result2: " + Str(Result1_2) + " ,Worker Thread: " + Str(Worker_2) + Chr(13)
msg + "Result3: " + Str(Result1_3) + " ,Worker Thread: " + Str(Worker_3) + Chr(13)
msg + "Result4: " + Str(Result1_4) + " ,Worker Thread: " + Str(Worker_4) + Chr(13)
msg + Chr(13) + "Seriell: " + Str(t2) + " ms" + Chr(13)
msg + "Result1: " + Str(Result2_1) + Chr(13)
msg + "Result2: " + Str(Result2_2) + Chr(13)
msg + "Result3: " + Str(Result2_3) + Chr(13)
msg + "Result4: " + Str(Result2_4) + Chr(13)
msg + Chr(13) + "Parallel: " + StrF(t2/t1, 2) + "x schneller"

MessageRequester("Test", msg)
Download:

http://files.cxalex.bplaced.net/secure. ... thread.zip
Zuletzt geändert von cxAlex am 09.02.2011 11:45, insgesamt 4-mal geändert.
Projekte: IO.pbi, vcpu
Pausierte Projekte: Easy Network Manager, µC Emulator
Aufgegebene Projekte: ECluster

Bild

PB 5.1 x64/x86; OS: Win7 x64/Ubuntu 10.x x86
Benutzeravatar
Rings
Beiträge: 977
Registriert: 29.08.2004 08:48

Beitrag von Rings »

Ich mag das.Gute Arbeit !!!

Interessant ist auch der Artikel von Freak:
http://www.purebasic.fr/blog/
Rings hat geschrieben:ziert sich nich beim zitieren
Benutzeravatar
cxAlex
Beiträge: 2111
Registriert: 26.06.2008 10:42

Beitrag von cxAlex »

Freut mich das dir meine Arbeit gefällt! Den Artikel hab ich schon gelesen, sehr interessant, war auch ein Grund dafür das zu programmieren.
Projekte: IO.pbi, vcpu
Pausierte Projekte: Easy Network Manager, µC Emulator
Aufgegebene Projekte: ECluster

Bild

PB 5.1 x64/x86; OS: Win7 x64/Ubuntu 10.x x86
Benutzeravatar
cxAlex
Beiträge: 2111
Registriert: 26.06.2008 10:42

Beitrag von cxAlex »

Update:
  • JobQueue_JobInProgress(): Ermitteln ob ein Job gerade in Bearbeitung ist.
  • JobQueue_JobWorker(): Ermitteln welcher Thread den Job bearbeitet hat
  • #Job_RealTimePriority: Job mit Echtzeit-Priorität ausführen. Auf meinem QuadCore erreiche ich damit 3.99-4.00x fache Geschwidigkeit gegenüber Seriell.
Democode & Download im 1. Post
Projekte: IO.pbi, vcpu
Pausierte Projekte: Easy Network Manager, µC Emulator
Aufgegebene Projekte: ECluster

Bild

PB 5.1 x64/x86; OS: Win7 x64/Ubuntu 10.x x86
Benutzeravatar
cxAlex
Beiträge: 2111
Registriert: 26.06.2008 10:42

Beitrag von cxAlex »

Update:
  • JobQueue_AddThread(): Fügt einen Worker Thread hinzu
  • JobQueue_RemoveThread(): Entfernt einen Worker Thread
  • Kleinen Bug im ReturnBuffer behoben durch den eine Speicherkollision entstehen konnte
  • Messaging - System integriert: Threads können nun einfach miteinander kommunizieren/gesteuert werden (Lastverteilung usw. geplant)
Download & Democode im 1. Post
Projekte: IO.pbi, vcpu
Pausierte Projekte: Easy Network Manager, µC Emulator
Aufgegebene Projekte: ECluster

Bild

PB 5.1 x64/x86; OS: Win7 x64/Ubuntu 10.x x86
Benutzeravatar
cxAlex
Beiträge: 2111
Registriert: 26.06.2008 10:42

Beitrag von cxAlex »

Update:
  • JobQueue_SetWorkerPriority(): Setzt die Priorität eines Worker Threads
  • JobQueue_PauseWorker(): Pausiert einen Worker x Millisekunden
  • JobQueue_CountWorkerJobs(): Ermittelt die Anzahl der erledigten Jobs des Workers
Download & Democode im 1. Post
Projekte: IO.pbi, vcpu
Pausierte Projekte: Easy Network Manager, µC Emulator
Aufgegebene Projekte: ECluster

Bild

PB 5.1 x64/x86; OS: Win7 x64/Ubuntu 10.x x86
Benutzeravatar
cxAlex
Beiträge: 2111
Registriert: 26.06.2008 10:42

Beitrag von cxAlex »

Update:
  • JobQueue_Add(): Job kann nun optinal einen Parent Job besitzen.
  • JobQueue_JobWorker(): Intern geändert, bei vielen Workern schneller.
  • Neues Beispiel
Kleine Erklärung dazu:
Was passiert wenn ein Job mit einem Parent zur Ausführung kommt? Dann wird geprüft ob der Parent - Job schon erledigt ist. Ist dies der Fall passiert nix weiter und der Job wird ausgeführt. Ist der Parent noch nicht erledigt, wird der Job aus der Ausführung genommen, dem Parent als Child angehängt und der Worker kann sich um den nächsten Job kümmern. Jeder Parent kann beliebig viele Childs haben. Ist der Parent dann fertig werden all seine Childs wieder in die Ausführung aufgenommen und können wieder parallel weiterverarbeitet werden.

Dies ermöglich es einfach, Ablaufe zeitlich zu Synchronisieren ohne auf parallele Verarbeitung zu verzichten, bzw. die ganze Wartezeit an Synchronisationspunkten zu verblasen.
freak im PB Blog hat geschrieben:It is hard to do it without introducing hidden bugs, and as i found out myself it is even harder to do it in a way that fully utilizes the available resources and does not spend all its time waiting on synchronisation points.
zum 3. Punkt, ein neues Codebeispiel (auch im Download):
(Unbedingt ThreadSafe aktivieren da wir hier mit Strings spielen)

Code: Alles auswählen

XIncludeFile "JobQueue_MultiThread.pbi"
XIncludeFile "HighResTimer.pbi"

Procedure.s DemoProg2(Para)
  ; Hier könnt der Speicher aus einer Datei mit übergebender FileID kommen, ist auch egal.
  *Mem = AllocateMemory(1024*1024) ; 1 Mb Memory kommen schnell mal vor
  Result.s = MD5Fingerprint(*Mem, 1024*1024)
  FreeMemory(*Mem)
  ProcedureReturn Result
EndProcedure

t = GetTimeMS()
; Jobs = JobQueue_Init(16)
; Jobs = JobQueue_Init(10)
; Jobs = JobQueue_Init(8)
Jobs = JobQueue_Init(4) ; Initialisierung mit 4 Threads (Quad Core)

For i = 1 To 1000
  LastJob = JobQueue_Add(Jobs, @DemoProg2(), i)
Next
While JobQueue_Count(Jobs)
  Delay(10)
Wend
t1 = GetTimeMS()-t

Result1.s = PeekS(JobQueue_JobReturnValue(Jobs, LastJob))
JobQueue_Free(Jobs)

t = GetTimeMS()
For i = 1 To 1000
  Result2.s = DemoProg2(i)
Next
t2 = GetTimeMS()-t

MessageRequester("Test",Str(t1)+"  "+Result1+Chr(13)+Str(t2)+"  "+Result2)

1000 MD5 Berechnungen à 1 MB, kann schnell mal gebraucht werden, kleine Fotosammlung usw.

Auch hier ist parallel 3,2 - 3,5 mal schneller als seriell, was bei mehr Berechnungen schnell ins Gewicht fallen kann. Wobei interessanter Weise 8,10 und 16 Threads etwas schneller sind als 4 Threads auf meinen Quad Core. Kann das jemand auf einem Dual/Quad/OctaCore mal mit ein paar verschiedenen Anzahlen von Threads probieren und posten was am schnellsten/langsamsten war? Auch sonst währe ich über Bugreports, Verbesserungsvorschläge sehr froh.

Das würde mir bei der Optimierung des Codes helfen. Danke.


Download & Democode im 1. Post.
Projekte: IO.pbi, vcpu
Pausierte Projekte: Easy Network Manager, µC Emulator
Aufgegebene Projekte: ECluster

Bild

PB 5.1 x64/x86; OS: Win7 x64/Ubuntu 10.x x86
freak
PureBasic Team
Beiträge: 766
Registriert: 29.08.2004 00:20
Wohnort: Stuttgart

Beitrag von freak »

> Wobei interessanter Weise 8,10 und 16 Threads etwas schneller sind als 4 Threads auf meinen Quad Core.

Ich denke mal das hier Speicher-Effekte mit eine Rolle spielen. Während ein Job auf die Allokation von Speicher wartet, oder auf das nachladen einer Seite bei einem eventuellen Pagefault kann ein anderer Worker in der Zeit was arbeiten.

Bei mir hängt das Verhalten deines Tests stark von der Größe des verwendeten Speicherblocks ab. Mit 1Mb und weniger sind 4 Worker am schnellsten. Bei 2Mb und mehr sind 8 besser. Zwischen 8, 10 und 16 gibt es kaum einen unterschied (Intel QuadCore mit 8Gb Speicher). Ich denke mal wenn der Speicherbereich zu klein ist (also zu wenig Arbeit pro Job) dann sind die Jobs zu schnell fertig und "streiten" sich um den Mutex. Da sind dann weniger Threads sich gegenseitig nicht so sehr im Weg.

Das ist eigentlich der interessante Punkt bei so einem System: Wie klein können die Arbeitseinheiten werden ohne das der Overhead des JobManagers zu sehr ins Gewicht fällt. Je kleiner die Einheiten sein können desto universeller ist das System einsetzbar (vorrausgesetzt man will das überhaupt. Wenn man nur größere Jobs hat braucht man das nicht). Dein Code ist vom Prinzip her dem was ich für PB geschrieben habe ziemlich ähnlich nur das ich mich stark auf das Verarbeiten "kleiner" Jobs konzentriert habe weil das Ziel war Dinge wie Sortieren zu parallelisieren und das hauptsächlich in kleinen Einheiten passiert.

Noch ein paar Tips:

Wie du im 2. Beispiel Strings zurückgibst ist ein Problem. Die Rückgabe von Strings per ProcedureReturn erfolgt über den StringBuffer des Threads. Wenn du da später ein PeekS drauf machst steht da eventuell was ganz anderes drin. Das funktioniert hier nur weil alle Jobs das gleiche machen (Ich bin mir auch momentan gar nicht ganz sicher ob das Aufrufen einer String-Prozedur über einen Nicht-String Prototyp überhaupt Stack-Technisch korrekt ist). Am besten rufst du String-Prozeduren über einen separaten Prototyp auf und speicherst die Rückgabe explizit in der Info Struktur (vielleicht über ein Flag bei JobQueue_Add() oder so). Das sollte dann zuverlässiger sein.

Bei der Verwendung des Speichers lässt sich eventuell ein bischen optimieren: Dein Stack Code zum Beispiel resized bei jedem Push/Pop den Speicher. Ich würde das in größeren Schritten machen. Es ist erstaunlich wie stark die Zahl der reallocate Aufrufe zurück geht wenn man die Schritte nur leicht größer macht weil der Stack ja nicht monoton wächst sondern Push/Pop durch die Threads gemischt sind.

Du kannst bei einigen der readonly Zugriffe auf die Strukturen den Mutex Lock weglassen. Zum Beispiel bei JobQueue_JobFinished(). Da hier nur 1 einfacher Wert gelesen wird kann es nicht zu Problemen mit dem Thread kommen. Je weniger der Mutex verwendet desto besser für die Parallelität. Für mein System bin ich immernoch am überlegen ob ich nicht irgendwie ganz auf einen zentralen Mutex verzichten kann weil der doch ein Bottleneck für die Skalierbarkeit ist. Das bedeutet dann aber das die Jobs dezentral verwaltet werden müssen und da habe ich noch keine gute Lösung dafür.
Benutzeravatar
cxAlex
Beiträge: 2111
Registriert: 26.06.2008 10:42

Beitrag von cxAlex »

Thx, werd mal einiges davon umsetzen.
freak hat geschrieben: Du kannst bei einigen der readonly Zugriffe auf die Strukturen den Mutex Lock weglassen. Zum Beispiel bei JobQueue_JobFinished()
K, war mir nicht sicher ob ich die Mutex auch locken muss wenn ich nur lesen und nichts schreibe. Habs zur Sicherheit trozdem gelockt, das kann ja jetzt wohl raus...
Projekte: IO.pbi, vcpu
Pausierte Projekte: Easy Network Manager, µC Emulator
Aufgegebene Projekte: ECluster

Bild

PB 5.1 x64/x86; OS: Win7 x64/Ubuntu 10.x x86
Benutzeravatar
milan1612
Beiträge: 810
Registriert: 15.04.2007 17:58

Beitrag von milan1612 »

[ot]
Sorry für Offtopic, aber ich wollt mich einfach mal bei dir bedanken Alex.
Deine Codes in hier sind eine echte Bereicherung fürs Board, ich hab immer
wieder Spaß daran sie mir durchzusehen - du schreibst echt gut!
Bitte weiter so... :allright:
[/ot]
Bin nur noch sehr selten hier, bitte nur noch per PN kontaktieren
Antworten