Posted: Sun Feb 03, 2008 7:43 pm
Very interesting.
I have been experimenting with this for a bit and came up with a little different
approach. The idea is this:
- Create a set of "worker threads" on startup which will be initially sleeping
- ParallelFor() wake's up a worker thread with semaphores and dispatches the work to it.
- Then it waits for the next thread to be available to dispatch work to.
It requires some inter-thread communication and synchronisation to do this, but the
benefit is that the overhead is much smaller than creating the threads each time,
which allows the ParallelFor() to dispatch one loop iteration (=procedure call)
to a thread at a time, which makes it easier to scale up to more processor cores.
Also the situation that one thread finishes much earlier than the other (=wasted resources) should be less frequent.
Ok. Some code now:
This is a common code used by both examples below.
(I will probably add these semaphore commands to PB 4.30 for all OS.)
First example: The ParallelFor() with the changed system:
With some tricks, its even possible to execute a generic procedure
call in a separate thread. Lets call this "parallel procedure call":
This code scales very well: If there is only one processor available,
the procedure is called directly without any overhead, and if there
are multiple processors/cores, it uses as many threads as there
are available cores..
Its also very flexible, as you can mix different procedures in the
"parallel" loop without problems.
This is fun to experiment with. Who knows, maybe something like this
could be added natively to PB. Would make a very cool feature imho.
Could look somewhat like this:
btw, i have no dualcore system available yet (will get a quadcore soon),
so i am interested in some results from people who do.
(I get good results even on a singlecore when putting a delay() in the procedure,
which suggests that the parallelism is working, but of course that is no real test.)
I have been experimenting with this for a bit and came up with a little different
approach. The idea is this:
- Create a set of "worker threads" on startup which will be initially sleeping
- ParallelFor() wake's up a worker thread with semaphores and dispatches the work to it.
- Then it waits for the next thread to be available to dispatch work to.
It requires some inter-thread communication and synchronisation to do this, but the
benefit is that the overhead is much smaller than creating the threads each time,
which allows the ParallelFor() to dispatch one loop iteration (=procedure call)
to a thread at a time, which makes it easier to scale up to more processor cores.
Also the situation that one thread finishes much earlier than the other (=wasted resources) should be less frequent.
Ok. Some code now:
This is a common code used by both examples below.
(I will probably add these semaphore commands to PB 4.30 for all OS.)
Code: Select all
DisableDebugger
EnableExplicit
CompilerIf #PB_Compiler_Thread = 0
CompilerError "In order to use ParallelFor, you have to compile as thread safe."
CompilerEndIf
; Platform specific code
;
CompilerSelect #PB_Compiler_OS
CompilerCase #PB_OS_Windows
Macro CreateSemaphore(Initial = 0, Max = 1)
CreateSemaphore_(#Null, Initial, Max, #Null)
EndMacro
Macro FreeSemaphore(Sem)
CloseHandle_(Sem)
EndMacro
Macro SignalSemaphore(Sem, Inc = 1)
ReleaseSemaphore_(Sem, Inc, #Null)
EndMacro
Macro WaitSemaphore(Sem)
WaitForSingleObject_(Sem, #INFINITE)
EndMacro
Procedure GetCPUCount()
Protected Count, i, ProcessMask, SystemMask
If GetProcessAffinityMask_(GetCurrentProcess_(), @ProcessMask, @SystemMask)
For i = 0 To 31
If ProcessMask & (1<<i)
Count + 1
EndIf
Next i
EndIf
If Count = 0
ProcedureReturn 1
Else
ProcedureReturn Count
EndIf
EndProcedure
CompilerCase #PB_OS_Linux
; Bits/pthreadtypes.h
#__SIZEOF_PTHREAD_MUTEX_T = 24
#__SIZEOF_PTHREAD_COND_T = 48
Structure pthread_mutex_t
opaque.b[#__SIZEOF_PTHREAD_MUTEX_T]
EndStructure
Structure pthread_cond_t
opaque.b[#__SIZEOF_PTHREAD_COND_T]
EndStructure
Structure Semaphore
Count.l
Max.l
Mutex.pthread_mutex_t
Cond.pthread_cond_t
EndStructure
ImportC "-lpthread"
pthread_cond_init_(*cond, *attr) As "pthread_cond_init"
pthread_cond_destroy_(*cond) As "pthread_cond_destroy"
pthread_cond_broadcast_(*cond) As "pthread_cond_broadcast"
pthread_cond_wait_(*cond, *mutex) As "pthread_cond_wait"
EndImport
Procedure CreateSemaphore(Initial = 0, Max = 1)
*Sem.Semaphore = AllocateMemory(SizeOf(Semaphore))
If *Sem
*Sem\Count = Initial
*Sem\Max = Max
pthread_mutex_init_(@*Sem\Mutex, #Null)
pthread_cond_init_(@*Sem\Cond, #Null)
EndIf
ProcedureReturn *Sem
EndProcedure
Procedure FreeSemaphore(*Sem.Semaphore)
pthread_mutex_destroy_(@*Sem\Mutex)
pthread_cond_destroy_(@*Sem\Cond)
FreeMemory(*Sem)
EndProcedure
Procedure SignalSemaphore(*Sem.Semaphore, Inc = 1)
pthread_mutex_lock_(@*Sem\Mutex)
If *Sem\Count + Inc <= *Sem\Max
; While we are below 0 (=locked), we must signal the condition
; variable for each increase to release a waiting thread
While *Sem\Count < 0 And Inc > 0
*Sem\Count + 1
Inc - 1
pthread_cond_broadcast_(@*Sem\Cond)
Wend
; Once Count >= 0, we just increase the count as no thread is waiting
*Sem\Count + Inc
EndIf
pthread_mutex_unlock_(@*Sem\Mutex)
EndProcedure
Procedure WaitSemaphore(*Sem.Semaphore)
pthread_mutex_lock_(@*Sem\Mutex)
*Sem\Count - 1
If *Sem\Count < 0
; wait only if the count fell below 0
pthread_cond_wait_(@*Sem\Cond, @*Sem\Mutex) ; unlocks mutex while it waits.
EndIf
pthread_mutex_unlock_(@*Sem\Mutex)
EndProcedure
Procedure GetCPUCount()
;
; Code by remi_meier
;
Protected file.l, count.l, s.s, dir.l
If FileSize("/proc/cpuinfo") <> -1
file = ReadFile(#PB_Any, "/proc/cpuinfo")
If IsFile(file)
count = 0
While Not Eof(file)
s.s = ReadString(file)
If Left(s, 9) = "processor"
count + 1
EndIf
Wend
CloseFile(file)
EndIf
Else
dir = ExamineDirectory(#PB_Any, "/proc/acpi/processor", "")
count = 0
If IsDirectory(dir)
While NextDirectoryEntry(dir)
If Left(DirectoryEntryName(dir), 3) = "CPU"
count + 1
EndIf
Wend
FinishDirectory(dir)
EndIf
EndIf
ProcedureReturn count
EndProcedure
CompilerDefault
CompilerError "Not implemented."
CompilerEndSelect
Code: Select all
XIncludeFile "parallel_common.pb"
Prototype ParallelLoopFunc(Value, *dataPtr)
Global StartSemaphore, StartedSemaphore, RunningSemaphore
Global CurrentValue, *CurrentData, CurrentLoop.ParallelLoopFunc
Global ProcessorCount
Procedure WorkerThread(dummy)
Protected Value, *dataPtr, Loop.ParallelLoopFunc
Repeat
WaitSemaphore(StartSemaphore) ; wait for "weakup" - semaphore
WaitSemaphore(RunningSemaphore) ; this one should be immediately available (allows to later know when the work is complete)
Value = CurrentValue ; copy the values locally
*dataPtr = *CurrentData
Loop = CurrentLoop
SignalSemaphore(StartedSemaphore) ; signal the "startup-complete" semaphore so the next thread can be started
Loop(Value, *dataPtr) ; execute loop
SignalSemaphore(RunningSemaphore) ; signal completion of the job
ForEver
EndProcedure
Procedure ParallelFor(i1, i2, Loop.ParallelLoopFunc, *dataPtr = 0)
Protected i
*CurrentData = *dataPtr
CurrentLoop = Loop
; release one thread at a time so they can access the "CurrentValue"
; only the thread startup is serialized. the Loop() is run in parallel
;
For CurrentValue = i1 To i2
SignalSemaphore(StartSemaphore)
WaitSemaphore(StartedSemaphore)
Next CurrentValue
; wait for all worker threads to complete their work
;
For i = 1 To ProcessorCount
WaitSemaphore(RunningSemaphore)
Next i
; reset the running count semaphore back to the start value
;
SignalSemaphore(RunningSemaphore, ProcessorCount)
EndProcedure
; Initialize
;
ProcessorCount = GetCPUCount()
StartSemaphore = CreateSemaphore(0, 1)
StartedSemaphore = CreateSemaphore(0, 1)
RunningSemaphore = CreateSemaphore(ProcessorCount, ProcessorCount)
Define i
For i = 1 To ProcessorCount
CreateThread(@WorkerThread(), 0)
Next i
; --------------------------------------------------------------------
; --------------------------------------------------------------------
;- EXAMPLE
DisableExplicit
maxX = 1400
maxY = 1050
Global Dim a.d(maxX, maxY)
Delay(100)
time = ElapsedMilliseconds()
counter = 0
Procedure InnerLoop(y.l, maxX.l)
; access has to be synchronized
For x = 0 To maxX
c.l = (Int(y) ! Int(x)) & $FF
a(x, y) = Pow(c, x)
Next
EndProcedure
Repeat
ParallelFor(0, maxY, @InnerLoop(), maxX)
; For y = 0 To maxY
; For x = 0 To maxX
; c.l = (Int(y) ! Int(x)) & $FF
; a(x, y) = Pow(c, x)
; Next
; Next
counter + 1
Until ElapsedMilliseconds() - time > 2000
MessageRequester("Fertig", "Durchläufe: "+Str(counter))
call in a separate thread. Lets call this "parallel procedure call":
This code scales very well: If there is only one processor available,
the procedure is called directly without any overhead, and if there
are multiple processors/cores, it uses as many threads as there
are available cores..
Its also very flexible, as you can mix different procedures in the
"parallel" loop without problems.
Code: Select all
XIncludeFile "parallel_common.pb"
Global StartStackOffset, EndStackOffset, StackCallSize
Global ParallelFunction, ParallelReturn
Global StartSemaphore, StartedSemaphore, RunningSemaphore
Global ProcessorCount, ParallelMutex
Procedure WorkerThread(dummy)
Protected StackOffset
Repeat
WaitSemaphore(StartSemaphore) ; wait for "weakup"
WaitSemaphore(RunningSemaphore) ; should be available immediately (marks this thread as "working")
; copy the stack frame from the "caller" to this thread
!MOV [p.v_StackOffset], dword esp
CopyMemory(EndStackOffset, StackOffset-StackCallSize, StackCallSize)
!SUB esp, dword [v_StackCallSize]
; save the function pointer on the local stack as well
!PUSH dword [v_ParallelFunction]
SignalSemaphore(StartedSemaphore) ; allow next thread to start up
!POP eax ; get function pointer
!CALL eax ; call the function (must be stdcall!)
SignalSemaphore(RunningSemaphore) ; job done
ForEver
EndProcedure
; This is the "stub"-code that is called by the CallParallel() instead of the
; real function (through a prototype). It must determine the stack frame size
; and store the needed information in the global variables.
; Then it signals the "wakeup"-semaphore and waits for the "started" one
; before returning.
;
Goto EndParallelStub
ParallelStub:
!POP dword [v_ParallelReturn]
!MOV [v_EndStackOffset], esp
StackCallSize = StartStackOffset - EndStackOffset
SignalSemaphore(StartSemaphore)
WaitSemaphore(StartedSemaphore)
!PUSH dword [v_ParallelReturn]
!RET ; The stubs are PrototypeC, so no stack cleanup done here
EndParallelStub:
; To allow the CallParallel() to work, we need a prototype with the same
; arguments as the real procedure but which points to the stub code
; above
Macro DeclareParallel(Name, Arguments)
PrototypeC Name#_Prototype#Arguments ; must be prototype C and the procedure must NOT be procedureC!
Global Name#_Caller.Name#_Prototype = ?ParallelStub
EndMacro
; Call a function in parallel (only if multiple processors are available)
;
Macro CallParallel(Name, Arguments)
If ProcessorCount > 1 And (__ParallelLocked Or TryLockMutex(ParallelMutex))
__ParallelLocked = 1 ; MUST NOT be global, so every thread gets its own copy of this variable!
ParallelFunction = @Name()
!MOV [v_StartStackOffset], esp
Name#_Caller#Arguments
Else
Name#Arguments
EndIf
EndMacro
; Waits for all running jobs to complete
;
Procedure __FinishParallel()
Protected i
For i = 1 To ProcessorCount
WaitSemaphore(RunningSemaphore)
Next i
SignalSemaphore(RunningSemaphore, ProcessorCount)
EndProcedure
; Wrapper for the above function to also unlock the
; mutex protection against multiple thread access
;
Macro FinishParallel()
If __ParallelLocked
__FinishParallel()
__ParallelLocked = 0
UnlockMutex(ParallelMutex)
EndIf
EndMacro
; Initializes the parallel code.
; 'MinThreads' can be used to set the number of worker threads higher than
; the actual processor/core count. (for testing mainly)
;
Procedure InitParallel(MinThreads = 1)
Protected i
ProcessorCount = GetCPUCount()
If ProcessorCount < MinThreads
ProcessorCount = MinThreads
EndIf
If ProcessorCount > 1
ParallelMutex = CreateMutex()
StartSemaphore = CreateSemaphore(0, 1)
StartedSemaphore = CreateSemaphore(0, 1)
RunningSemaphore = CreateSemaphore(ProcessorCount, ProcessorCount)
For i = 1 To ProcessorCount
CreateThread(@WorkerThread(), 0)
Next i
EndIf
EndProcedure
; Usage:
;
; InitParallel([MinThreads])
; - startup the parallel call system
;
; DeclareParallel(<procedurename>, <arguments>)
; - declare a procedure for parallel calls.
; - <arguments> must include the ()
; - example: DeclareParallel(proc, (a.l, b.l))
; NOTE: The procedure may not have string parameters, as these would
; not be copied correctly to the thread before execution.
;
; CallParallel(<procedurename>, <arguments>)
; - calls a procedure in parallel
; - <arguments> must be in () again
; - example: CallParallel(proc, (1, 3))
;
; FinishParallel()
; - waits for any running parallel calls to finish
; NOTE: This must be called after a parallel loop, or another thread
; will not be able to execute parallel calls again
; (they will all be done sequentially due to the mutex protection)
; --------------------------------------------------------------------
; --------------------------------------------------------------------
;- EXAMPLE
DisableExplicit
;InitParallel(2)
InitParallel() ; initialize
maxX = 1400
maxY = 1050
Global Dim a.d(maxX, maxY)
Global Dim b.d(maxX, maxY)
Delay(100)
time = ElapsedMilliseconds()
counter = 0
Repeat
For y = 0 To maxY
For x = 0 To maxX
c.l = (Int(y) ! Int(x)) & $FF
a(x, y) = Pow(c, x)
Next
Next
counter + 1
Until ElapsedMilliseconds() - time > 2000
DeclareParallel(InnerLoop, (y.l, maxX.l)) ; declare the parallel call
Procedure InnerLoop(y.l, maxX.l)
; access has to be synchronized
For x = 0 To maxX
c.l = (Int(y) ! Int(x)) & $FF
b(x, y) = Pow(c, x)
Next
EndProcedure
Delay(100)
time = ElapsedMilliseconds()
counter2 = 0
Repeat
For y = 0 To maxY
CallParallel(InnerLoop, (y, maxX)) ; call in parallel
Next
FinishParallel() ; wait for all calls to complete
counter2 + 1
Until ElapsedMilliseconds() - time > 2000
equal = 1
For y = 0 To maxY
For x = 0 To maxX
If a(x, y) <> b(x, y)
equal = 0
Break 2
EndIf
Next
Next
MessageRequester("Fertig", "Durchläufe seriell: "+Str(counter)+Chr(13)+"Durchläufe parallel: "+Str(counter2)+Chr(13)+"Ergebnis gleich?: "+Str(equal))
could be added natively to PB. Would make a very cool feature imho.
Could look somewhat like this:
Code: Select all
For i = 1 To 1000
Parallel MyProcedure(a, b)
Next i
WaitParallel() ; syncronize again
so i am interested in some results from people who do.
(I get good results even on a singlecore when putting a delay() in the procedure,
which suggests that the parallelism is working, but of course that is no real test.)