Wenn man an Threads denkt, dann fallen einem im selben Rutsch auch immer Mechanismen wie Mutex, Semaphore und Monitore ein. Mutex und Semaphoren sind in PureBasic ja vorhanden, aber Monitore leider nicht. Also habe ich nach dem Beispiel in Wiki (langweilige dt. Version) mal Monitore, oder auch Condition Variables nachgebaut.
Aber erst eine kurze Erläuterung dazu. Wenn man weiß, was Semaphoren sind, dann kann man sich den Unterschied zu Monitoren wie folgt vorstellen (s. hier):
Der größte Unterschied ist, dass Semaphoren die Vergangenheit in ihre Funktion miteinbeziehen. Das rührt daher, dass Semaphoren im Grunde wie ein Zähler implementiert sind. Condition Variablen merken sich im Gegensatz dazu nicht, was in der Vergangenheit geschehen ist. Das bedeutet, dass ein wiederholt Aufruf von 'SignalSemaphore()' jedesmal den Zähler erhöht. Bei Condition Variablen wird ein Aufruf von 'signal()' nur die Threads beeinflussen, die gerade am Warten sind. Jeder Thread, der danach anfängt zu warten, wird von dem vorherigen 'signal()' nichts mehr mitbekommen. Hier also nochmal die wichtigsten Unterschiede zwischen den beiden Ansätzen:
- Semaphoren können überall im Programm angewandt werden, während Condition Variablen immer nur innerhalb eines Monitor benutzt werden dürfen.
- Bei Semaphoren blockt ein 'Wait()' nicht immer den Thread (z.B. wenn der Zähler größer als 0 ist). Bei Condition Variablen blockt ein 'Wait()' immer.
- Bei Semaphoren entblockt ein 'Signal()' einen Thread, falls einer wartet, oder der Zähler wird erhöht. Bei Condition Variablen wird ein geblockter Thread entweder entblockt oder es passiert einfach gar nichts, weil eben kein Thread geblockt ist.
- Wenn bei Semaphoren ein 'Signal()' einen geblockten Thread entblockt, können Signalgeber und Thread weiter laufen. Bei Condition Variablen läuft nach einem 'Signal()' der signalgebende Thread weiter, der signalisierte Thread läuft erst dann weiter, wenn der signalgebende den Mutex entsperrt.
Code: Alles auswählen
DeclareModule Monitor
EnableExplicit
CompilerIf Not #PB_Compiler_Thread
CompilerError "Please activate the thread safe option!"
CompilerEndIf
Interface ConditionVariable
free()
wait()
signal()
broadcast.i()
EndInterface
Interface Mutex
free()
acquire()
release()
newConditionVariable.i()
EndInterface
Declare.i newMutex()
EndDeclareModule
Module Monitor
Structure MutexS
*vTable
mutex.i
held.i
acquires.i
EndStructure
Structure ConditionVariableS
*vTable
numWaiters.i
semaphore.i
StructureUnion
*mutex.Mutex
*mutexAttr.MutexS
EndStructureUnion
*internalMutex.Mutex
EndStructure
Procedure.i newMutex()
Protected *attr.MutexS = AllocateMemory(SizeOf(MutexS))
If (Not *attr)
ProcedureReturn #False
EndIf
With *attr
\vTable = ?vTable_Mutex
\acquires = 0
\mutex = CreateMutex()
If (Not \mutex) : Goto end1 : EndIf
\held = CreateSemaphore()
If (Not \held) : Goto end2 : EndIf
ProcedureReturn *attr
end2:
FreeMutex(\mutex)
end1:
FreeMemory(*attr)
ProcedureReturn #False
EndWith
EndProcedure
Procedure free(*attr.MutexS)
With *attr
FreeMutex(\mutex)
FreeSemaphore(\held)
FreeMemory(*attr)
EndWith
EndProcedure
Procedure acquire(*attr.MutexS)
With *attr
LockMutex(\mutex)
\acquires + 1
SignalSemaphore(\held)
EndWith
EndProcedure
Procedure release(*attr.MutexS)
With *attr
WaitSemaphore(\held)
\acquires - 1
UnlockMutex(\mutex)
EndWith
EndProcedure
Procedure.i newConditionVariable(*mutex.MutexS)
Protected *attr.ConditionVariableS = AllocateMemory(SizeOf(ConditionVariableS))
If (Not *attr)
ProcedureReturn #False
EndIf
With *attr
\vTable = ?vTable_ConditionVariable
\mutex = *mutex
\numWaiters = 0
\semaphore = CreateSemaphore(0)
If (Not \semaphore)
Goto end1
EndIf
\internalMutex = newMutex()
If (Not \internalMutex)
Goto end2
EndIf
ProcedureReturn *attr
end2:
FreeSemaphore(\semaphore)
end1:
FreeMemory(*attr)
ProcedureReturn #False
EndWith
EndProcedure
DataSection
vTable_Mutex:
Data.i @free(), @acquire(), @release(), @newConditionVariable()
EndDataSection
Procedure free2(*attr.ConditionVariableS)
With *attr
FreeMutex(\internalMutex)
FreeSemaphore(\semaphore)
FreeMemory(*attr)
EndWith
EndProcedure
Procedure wait(*attr.ConditionVariableS)
With *attr
If (\mutexAttr\acquires = 0)
RaiseError(#PB_OnError_IllegalInstruction)
EndIf
\internalMutex\acquire()
\numWaiters + 1
\internalMutex\release()
Protected i.i, acquires.i = \mutexAttr\acquires
For i = 1 To acquires
\mutex\release()
Next
WaitSemaphore(\semaphore)
For i = 1 To acquires
\mutex\acquire()
Next
EndWith
EndProcedure
Procedure signal(*attr.ConditionVariableS)
With *attr
\internalMutex\acquire()
If (\numWaiters > 0)
\numWaiters - 1
SignalSemaphore(\semaphore)
EndIf
\internalMutex\release()
EndWith
EndProcedure
Procedure.i broadcast(*attr.ConditionVariableS)
Protected waiters.i
With *attr
\internalMutex\acquire()
waiters = \numWaiters
While (\numWaiters > 0)
\numWaiters - 1
SignalSemaphore(\semaphore)
Wend
\internalMutex\release()
ProcedureReturn waiters
EndWith
EndProcedure
DataSection
vTable_ConditionVariable:
Data.i @free2(), @wait(), @signal(), @broadcast()
EndDataSection
EndModule
CompilerIf #PB_Compiler_IsMainFile
EnableExplicit
;Anzahl lesende Threads
#READER_THREADS = 10
;Anzahl Werte
#VALUES = 500000
#VALUES_PER_LOOP = 50
#DELAY_AFTER_RELEASE = 0
#DEBUG_LEVEL = 0
;Auf 2 setzen um alle Debugs zu sehen
DebugLevel #DEBUG_LEVEL
Macro CONSOLE_DEBUG(text, dbgLevel = 0)
Debug text, dbgLevel
If (dbgLevel <= #DEBUG_LEVEL)
PrintN("" + text)
EndIf
EndMacro
UseModule Monitor
Global NewList stack.i()
Global *mutex.Mutex = newMutex()
Global *newData.ConditionVariable = *mutex\newConditionVariable()
Procedure ReaderThread(id.i)
;Simuliere verschachtelte Locks
*mutex\acquire()
*mutex\acquire()
Protected i.i = -1, r.i
Repeat
r = FirstElement(stack())
CONSOLE_DEBUG("Reader " + id + ": FirstElement: " + r, 3)
If (r)
i = stack()
;Wenn der Wert 0 ist, dann breche ab, ohne das Element zu löschen.
;Auf die Weise können auch alle anderen Threads sich beenden.
If (i = 0)
Break
EndIf
DeleteElement(stack())
CONSOLE_DEBUG("Reader " + id + ": new data: " + i, 1)
Else
CONSOLE_DEBUG("Reader " + id + ": waiting for new data.", 2)
*newData\wait()
CONSOLE_DEBUG("Reader " + id + ": new data is there.", 2)
Continue
EndIf
ForEver
*mutex\release()
*mutex\release()
EndProcedure
Define.i time = ElapsedMilliseconds()
OpenConsole("ConditionVariable Test")
Dim threads.i(#READER_THREADS - 1)
Define i.i
For i = 0 To #READER_THREADS - 1
threads(i) = CreateThread(@ReaderThread(), i)
Next
;Lasse die Readerthreads kurz anlaufen, damit sie sich in ihren waits verfangen.
Delay(100)
*mutex\acquire()
Define i.i
For i = #VALUES To 0 Step -1
CONSOLE_DEBUG(" ADD acquire", 3)
LastElement(stack())
If AddElement(stack())
CONSOLE_DEBUG(" ADD " + i, 2)
stack() = i
EndIf
CONSOLE_DEBUG(" ADD release", 3)
If (i % #VALUES_PER_LOOP = 0)
;Signalisiere einen Thread, damit er Daten empfangen kann.
*newData\signal()
*mutex\release()
CompilerIf #DELAY_AFTER_RELEASE
Delay(#DELAY_AFTER_RELEASE)
CompilerEndIf
*mutex\acquire()
EndIf
;Delay(250)
Next
*mutex\release()
;Signalisiere alle Threads, die noch warten, damit sie die 0 lesen und sich sauber beenden
*newData\broadcast()
For i = 0 To #READER_THREADS - 1
WaitThread(threads(i))
Next
CONSOLE_DEBUG("ENDE")
time = ElapsedMilliseconds() - time
CONSOLE_DEBUG("Zeit: " + time + " ms")
Input()
CloseConsole()
CompilerEndIf
11.03.2014: Neue Version