Systemen parallel abarbeiten zu lassen.
Mir ging es hier darum, die Parallelisierung von Aufgaben möglichst
einfach zu machen.
Die Aufgaben werden durch Threads abgearbeitet. Je nachdem wieviele
CPUs zur Laufzeit des Programmes verfügbar sind, werden so die Jobs
parallelisiert.
Man hat beispielsweise 8 unterschiedliche Berechnungen durchzuführen,
bevor man im Programm fortfahren kann.
Dann erstellt man daraus 8 Jobs (Aufgaben) und lässt sie abarbeiten.
Auf einem System mit 1 CPU und 1 Prozessorkern werden die Aufgaben
der Reihe nach abgearbeitet.
Auf einem System mit beispielsweise 2 QuadCore-CPUs, können dagegen
alle 8 Aufgaben parallel abgearbeitet werden.
Das Thema der Parallelisierung von Aufgaben ist heute sehr wichtig,
und wird in Zukunft noch viel wichtiger werden.
Code: Alles auswählen
;*----------------------------------------------------------------------------------
;*
;* JOBS.pbi
;*
;* Author: Danilo Krahn, 2008/06/19
;*
;* functions for running jobs simultaneously
;* on multiprocessor/multicore systems
;*
;* functions:
;* jobID = AddJob( @myProcedure(), optional_argument.l = 0 ) ; add a job to the queue
;*
;* jobsDone = DoJobs() ; run all jobs in the queue
;*
;* result = JobResult(jobID) ; get the return value of a job
;*
;* ClearJobs() ; clear the job queue
;*
;*
;* compiler options: [X] create threadsafe executable
;*
;*----------------------------------------------------------------------------------
Structure __JOB_INFO
id.l
jobProc.l
jobArg.l
jobResult.l
EndStructure
Global NewList __Jobs__.__JOB_INFO()
Global __Jobs__mutex.l
Procedure __GetNextJob(*job.__JOB_INFO)
Shared __Jobs__mutex
Protected element.l, retval.l
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
LockMutex(__Jobs__mutex)
If *job
element = NextElement(__Jobs__())
If element
*job\id = __Jobs__()\id
*job\jobProc = __Jobs__()\jobProc
*job\jobArg = __Jobs__()\jobArg
*job\jobResult = element
retval = #True
EndIf
EndIf
UnlockMutex(__Jobs__mutex)
ProcedureReturn retval
EndProcedure
Procedure __WorkerThread(id)
Protected element.l, result.l, job.__JOB_INFO
While __GetNextJob(@job)
If job\jobProc
element = job\jobResult
result = CallFunctionFast(job\jobProc,job\jobArg)
If element
PokeL(element+(2*sizeOf(LONG))+OffsetOf(__JOB_INFO\jobResult),result)
EndIf
EndIf
Wend
EndProcedure
Procedure.l JobResult(jobID.l)
Protected result.l
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
LockMutex(__Jobs__mutex)
ForEach __Jobs__()
If __Jobs__()\id = jobID
result = __Jobs__()\jobResult
Break
EndIf
Next
UnlockMutex(__Jobs__mutex)
ProcedureReturn result
EndProcedure
Procedure ClearJobs()
Shared __Jobs__mutex
If Not __Jobs__mutex
__Jobs__mutex = CreateMutex()
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
EndIf
LockMutex(__Jobs__mutex)
ClearList(__Jobs__())
UnlockMutex(__Jobs__mutex)
EndProcedure
Procedure.l AddJob(proc.l,arg.l=0)
Protected retval.l
If Not __Jobs__mutex
__Jobs__mutex = CreateMutex()
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
EndIf
LockMutex(__Jobs__mutex)
LastElement(__Jobs__())
If AddElement(__Jobs__())
__Jobs__()\jobProc = proc
__Jobs__()\jobArg = arg
__Jobs__()\id.l = CountList(__Jobs__())
retval = __Jobs__()\id
EndIf
UnlockMutex(__Jobs__mutex)
ProcedureReturn retval
EndProcedure
Procedure.l DoJobs()
CompilerIf #PB_Compiler_OS = #PB_OS_Windows
Protected si.SYSTEM_INFO
CompilerEndIf
Protected numCPUs.l, job_count.l, i.l, thread_running.l
If Not __Jobs__mutex
__Jobs__mutex = CreateMutex()
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
EndIf
CompilerIf #PB_Compiler_OS = #PB_OS_Windows
GetSystemInfo_(@si)
numCPUs = si\dwNumberOfProcessors
CompilerElse
numCPUs = 4 ; run 4 threads simultanously by default
CompilerEndIf
;Debug "number of CPUs: "+Str(numCPUs)
;numCPUs = 1
Dim threads(numCPUs)
job_count = CountList(__Jobs__())
If job_count = 0
ProcedureReturn 0
ElseIf job_count < numCPUs
numCPUs = job_count
EndIf
;Debug "starting "+Str(numCPUs)+" worker threads."
ResetList(__Jobs__())
For i = 1 To numCPUs
threads(i) = CreateThread(@__WorkerThread(),i)
Next i
; wait until all worker threads are done
Repeat
thread_running = 0
For i = 1 To numCPUs
If IsThread(threads(i))
thread_running = 1
EndIf
Next i
Delay(10)
Until thread_running = 0
ProcedureReturn CountList(__Jobs__())
EndProcedure
;*----------------------------------------------------------------------------------
;*--[ END OF INCLUDE 'JOBS.pbi' ]---------------------------------------------------
;*----------------------------------------------------------------------------------
;*-------------------------------------
;----[ Program ]-----------------------
;*-------------------------------------
EnableExplicit
;XIncludeFile "JOBS.pbi"
Procedure.l MyJob(arg.l)
Protected a.l, b.l
Debug "running job "+Str(arg)
; dummy loop
For a = 1 To 10000000
b=a*2
Next a
ProcedureReturn arg * arg
EndProcedure
Define.l a, b
Dim jobs.l(10)
For a = 1 To 10
jobs(a) = AddJob(@MyJob(),a)
Next a
a = DoJobs()
Debug "------------------------"
Debug Str(a)+" jobs done."
Debug "results:"
For a = 1 To 10
Debug JobResult( jobs(a) )
Next a
ClearJobs()
; IDE Options = PureBasic 4.20 (Windows - x86)
; CursorPosition = 194
; FirstLine = 178
; Folding = --
; EnableThread
; EnableXP
werden, da sonst die Debug-Ausgabe (und noch mehr) durcheinander kommen kann.
Ich hoffe das jemand etwas damit anfangen kann und es vielleicht
etwas hilft die Parallelisierung von bestimmten Aufgaben etwas leichter
zu machen.
Man sollte damit alles mögliche leicht parallelisieren können:
Man möchte 8 Dateien oder 8 Speicherbereiche packen? OK, 8 Jobs
erstellen und den Rechner parallel arbeiten lassen.
Man muß 3 Arrays mit Werten füllen? OK, warum nicht parallel erledigen lassen, wenn möglich?
Die Möglichkeiten sollten groß sein, die Benutzung (hoffentlich) einfach.

EDIT: Kompatibel zu PB-Threads gemacht, die Job-Procedure muß immer
1 Parameter haben, den man optional bei AddJob() angeben kann.