Monitor mit ConditionVariable als Erweiterung für Mutex

Hier könnt Ihr gute, von Euch geschriebene Codes posten. Sie müssen auf jeden Fall funktionieren und sollten möglichst effizient, elegant und beispielhaft oder einfach nur cool sein.
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8679
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 32 GB DDR4-3200
Ubuntu 22.04.3 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken
Kontaktdaten:

Monitor mit ConditionVariable als Erweiterung für Mutex

Beitrag von NicTheQuick »

Hallo Leute.

Wenn man an Threads denkt, dann fallen einem im selben Rutsch auch immer Mechanismen wie Mutex, Semaphore und Monitore ein. Mutex und Semaphoren sind in PureBasic ja vorhanden, aber Monitore leider nicht. Also habe ich nach dem Beispiel in Wiki (langweilige dt. Version) mal Monitore, oder auch Condition Variables nachgebaut.

Aber erst eine kurze Erläuterung dazu. Wenn man weiß, was Semaphoren sind, dann kann man sich den Unterschied zu Monitoren wie folgt vorstellen (s. hier):
Der größte Unterschied ist, dass Semaphoren die Vergangenheit in ihre Funktion miteinbeziehen. Das rührt daher, dass Semaphoren im Grunde wie ein Zähler implementiert sind. Condition Variablen merken sich im Gegensatz dazu nicht, was in der Vergangenheit geschehen ist. Das bedeutet, dass ein wiederholt Aufruf von 'SignalSemaphore()' jedesmal den Zähler erhöht. Bei Condition Variablen wird ein Aufruf von 'signal()' nur die Threads beeinflussen, die gerade am Warten sind. Jeder Thread, der danach anfängt zu warten, wird von dem vorherigen 'signal()' nichts mehr mitbekommen. Hier also nochmal die wichtigsten Unterschiede zwischen den beiden Ansätzen:
  • Semaphoren können überall im Programm angewandt werden, während Condition Variablen immer nur innerhalb eines Monitor benutzt werden dürfen.
  • Bei Semaphoren blockt ein 'Wait()' nicht immer den Thread (z.B. wenn der Zähler größer als 0 ist). Bei Condition Variablen blockt ein 'Wait()' immer.
  • Bei Semaphoren entblockt ein 'Signal()' einen Thread, falls einer wartet, oder der Zähler wird erhöht. Bei Condition Variablen wird ein geblockter Thread entweder entblockt oder es passiert einfach gar nichts, weil eben kein Thread geblockt ist.
  • Wenn bei Semaphoren ein 'Signal()' einen geblockten Thread entblockt, können Signalgeber und Thread weiter laufen. Bei Condition Variablen läuft nach einem 'Signal()' der signalgebende Thread weiter, der signalisierte Thread läuft erst dann weiter, wenn der signalgebende den Mutex entsperrt.
Hier also der Code plus Beispiel:

Code: Alles auswählen

DeclareModule Monitor
	EnableExplicit
	
	CompilerIf Not #PB_Compiler_Thread
		CompilerError "Please activate the thread safe option!"
	CompilerEndIf
	
	Interface ConditionVariable
		free()
		wait()
		signal()
		broadcast.i()
	EndInterface
	
	Interface Mutex
		free()
		acquire()
		release()
		newConditionVariable.i()
	EndInterface
	
	Declare.i newMutex()
EndDeclareModule

Module Monitor
	Structure MutexS
		*vTable
		mutex.i
		held.i
		acquires.i
	EndStructure
	
	Structure ConditionVariableS
		*vTable
		numWaiters.i
		semaphore.i
		StructureUnion
			*mutex.Mutex
			*mutexAttr.MutexS
		EndStructureUnion
		*internalMutex.Mutex
	EndStructure
	
	Procedure.i newMutex()
		Protected *attr.MutexS = AllocateMemory(SizeOf(MutexS))
		If (Not *attr)
			ProcedureReturn #False
		EndIf
		
		With *attr
			\vTable = ?vTable_Mutex
			\acquires = 0
			
			\mutex = CreateMutex()
			If (Not \mutex) : Goto end1 : EndIf
			
			\held = CreateSemaphore()
			If (Not \held) : Goto end2 : EndIf
			
			ProcedureReturn *attr
			
			end2:
			FreeMutex(\mutex)
			
			end1:
			FreeMemory(*attr)
			
			ProcedureReturn #False
		EndWith
	EndProcedure
	
	Procedure free(*attr.MutexS)
		With *attr
			FreeMutex(\mutex)
			FreeSemaphore(\held)
			FreeMemory(*attr)
		EndWith
	EndProcedure
	
	Procedure acquire(*attr.MutexS)
		With *attr
			LockMutex(\mutex)
			\acquires + 1
			SignalSemaphore(\held)
		EndWith
	EndProcedure

	Procedure release(*attr.MutexS)
		With *attr
			WaitSemaphore(\held)
			\acquires - 1
			UnlockMutex(\mutex)
		EndWith
	EndProcedure
	
	Procedure.i newConditionVariable(*mutex.MutexS)
		Protected *attr.ConditionVariableS = AllocateMemory(SizeOf(ConditionVariableS))
		If (Not *attr)
			ProcedureReturn #False
		EndIf
		
		With *attr
			\vTable = ?vTable_ConditionVariable
			\mutex = *mutex
			\numWaiters = 0
			\semaphore = CreateSemaphore(0)
			If (Not \semaphore)
				Goto end1
			EndIf
			\internalMutex = newMutex()
			If (Not \internalMutex)
				Goto end2
			EndIf
			
			ProcedureReturn *attr
			
			end2:
			FreeSemaphore(\semaphore)
			
			end1:
			FreeMemory(*attr)
			
			ProcedureReturn #False
		EndWith
	EndProcedure
	
	DataSection
		vTable_Mutex:
			Data.i @free(), @acquire(), @release(), @newConditionVariable()
	EndDataSection
	
	Procedure free2(*attr.ConditionVariableS)
		With *attr
			FreeMutex(\internalMutex)
			FreeSemaphore(\semaphore)
			FreeMemory(*attr)
		EndWith
	EndProcedure
	
	Procedure wait(*attr.ConditionVariableS)
		With *attr
			If (\mutexAttr\acquires = 0)
				RaiseError(#PB_OnError_IllegalInstruction)
			EndIf
			\internalMutex\acquire()
			
			\numWaiters + 1
			
			\internalMutex\release()
			
			Protected i.i, acquires.i = \mutexAttr\acquires
			For i = 1 To acquires
				\mutex\release()
			Next
			WaitSemaphore(\semaphore)
			For i = 1 To acquires
				\mutex\acquire()
			Next
		EndWith
	EndProcedure
	
	Procedure signal(*attr.ConditionVariableS)
		With *attr
			\internalMutex\acquire()
			If (\numWaiters > 0)
				\numWaiters - 1
				SignalSemaphore(\semaphore)
			EndIf
			\internalMutex\release()
		EndWith
	EndProcedure
	
	Procedure.i broadcast(*attr.ConditionVariableS)
		Protected waiters.i
		With *attr
			\internalMutex\acquire()
			waiters = \numWaiters
			While (\numWaiters > 0)
				\numWaiters - 1
				SignalSemaphore(\semaphore)
			Wend
			\internalMutex\release()
			
			ProcedureReturn waiters
		EndWith
	EndProcedure
	
	DataSection
		vTable_ConditionVariable:
			Data.i @free2(), @wait(), @signal(), @broadcast()
	EndDataSection
EndModule

CompilerIf #PB_Compiler_IsMainFile
	EnableExplicit
	
	;Anzahl lesende Threads
	#READER_THREADS = 10
	
	;Anzahl Werte
	#VALUES = 500000
	#VALUES_PER_LOOP = 50
	
	#DELAY_AFTER_RELEASE = 0
	
	#DEBUG_LEVEL = 0
	
	;Auf 2 setzen um alle Debugs zu sehen
	DebugLevel #DEBUG_LEVEL
	
	Macro CONSOLE_DEBUG(text, dbgLevel = 0)
		Debug text, dbgLevel
		If (dbgLevel <= #DEBUG_LEVEL)
			PrintN("" + text)
		EndIf
	EndMacro
	
	UseModule Monitor
	
	Global NewList stack.i()
	Global *mutex.Mutex = newMutex()
	Global *newData.ConditionVariable = *mutex\newConditionVariable()
	
	Procedure ReaderThread(id.i)
		;Simuliere verschachtelte Locks
		*mutex\acquire()
		*mutex\acquire()
		
		Protected i.i = -1, r.i
		Repeat
			r = FirstElement(stack())
			CONSOLE_DEBUG("Reader " + id + ": FirstElement: " + r, 3)
			If (r)
				i = stack()
				;Wenn der Wert 0 ist, dann breche ab, ohne das Element zu löschen.
				;Auf die Weise können auch alle anderen Threads sich beenden.
				If (i = 0)
					Break
				EndIf
				DeleteElement(stack())
				CONSOLE_DEBUG("Reader " + id + ": new data: " + i, 1)
			Else
				CONSOLE_DEBUG("Reader " + id + ": waiting for new data.", 2)
				*newData\wait()
				CONSOLE_DEBUG("Reader " + id + ": new data is there.", 2)
				Continue
			EndIf
			
		ForEver
		
		*mutex\release()
		*mutex\release()
	EndProcedure
	
	Define.i time = ElapsedMilliseconds()
	OpenConsole("ConditionVariable Test")
	
	Dim threads.i(#READER_THREADS - 1)
	Define i.i
	For i = 0 To #READER_THREADS - 1
		threads(i) = CreateThread(@ReaderThread(), i)
	Next
	
	;Lasse die Readerthreads kurz anlaufen, damit sie sich in ihren waits verfangen.
	Delay(100)
	
	*mutex\acquire()
	
	Define i.i
	For i = #VALUES To 0 Step -1
		
		CONSOLE_DEBUG("        ADD acquire", 3)
		LastElement(stack())
		If AddElement(stack())
			CONSOLE_DEBUG("        ADD " + i, 2)
			stack() = i
		EndIf
		
		
		CONSOLE_DEBUG("        ADD release", 3)
		If (i % #VALUES_PER_LOOP = 0)
			;Signalisiere einen Thread, damit er Daten empfangen kann.
			*newData\signal()
			*mutex\release()
			CompilerIf #DELAY_AFTER_RELEASE
				Delay(#DELAY_AFTER_RELEASE)
			CompilerEndIf
			*mutex\acquire()
		EndIf
		;Delay(250)
	Next
	*mutex\release()
	
		
	;Signalisiere alle Threads, die noch warten, damit sie die 0 lesen und sich sauber beenden
	*newData\broadcast()
	
	For i = 0 To #READER_THREADS - 1
		WaitThread(threads(i))
	Next
	
	CONSOLE_DEBUG("ENDE")
	time = ElapsedMilliseconds() - time
	CONSOLE_DEBUG("Zeit: " + time + " ms")
	Input()
	CloseConsole()
CompilerEndIf
Versionen:
11.03.2014: Neue Version
Bild
Benutzeravatar
ts-soft
Beiträge: 22292
Registriert: 08.09.2004 00:57
Computerausstattung: Mainboard: MSI 970A-G43
CPU: AMD FX-6300 Six-Core Processor
GraKa: GeForce GTX 750 Ti, 2 GB
Memory: 16 GB DDR3-1600 - Dual Channel
Wohnort: Berlin

Re: ConditionVariable bzw. Monitor als Erweiterung für Mutex

Beitrag von ts-soft »

:allright: sieht erstmal gut aus, muß mich damit aber noch ein bissel auseinandersetzen :wink:

Den ersten 3 Zeilen, entnehme ich, das Dir nicht bewußt ist, das ein EnableExplicit deklariert innerhalb
eines Modules, sich nur dort auswirkt, somit also keinen Einfluß darauf nimmt, was im nutzenden Source
eingestellt ist!
Im moment hast für das Example 2x EnableExplicit und für das Modul gar nicht.

Gruß
Thomas

// edit
Nachdem ich das EnableExplicit korrekt eingefügt habe, gibt es:
[Error] Programm abgebrochen. (durch eine externe Library)
Ich schätze mal, da bringt der Debugger von PB irgendwas zum nichtfunktionieren.

// edit2
Dieser Fehler ist jetzt wieder verschwunden? Merkwürdig.
PureBasic 5.73 LTS | SpiderBasic 2.30 | Windows 10 Pro (x64) | Linux Mint 20.1 (x64)
Nutella hat nur sehr wenig Vitamine. Deswegen muss man davon relativ viel essen.
Bild
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8679
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 32 GB DDR4-3200
Ubuntu 22.04.3 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken
Kontaktdaten:

Re: ConditionVariable bzw. Monitor als Erweiterung für Mutex

Beitrag von NicTheQuick »

Okay, das mit EnableExplicit war mir nicht bewusst. :D Liegt wohl daran, dass ich das schon automatisch im Kopf mache. :wink:

Aber dieser Fehler mit der externen Library ist bei mir auch schon einmal aufgetreten. Vielleicht passiert das nur sporadisch, aber selbst das dürfte nicht sein. Womöglich muss ich doch noch ein bisschen debuggen, oder darauf warten, dass Fred mein Feature Request durchsetzt, denn ich brauche Condition Variables öfter als Semaphoren. Und nativ wären sie auch schneller als dieser Wordkaround.

Danke jedenfalls schon mal für's Feedback.
Bild
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8679
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 32 GB DDR4-3200
Ubuntu 22.04.3 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken
Kontaktdaten:

Re: ConditionVariable bzw. Monitor als Erweiterung für Mutex

Beitrag von NicTheQuick »

Okay, ich habe es nochmal eingehend getestet. Es gibt keinen Fehler. Ich vermute deswegen einfach mal, dass du ThreadSafe vergessen hast. Dafür habe ich jetzt einen CompilerError eingebaut, falls man die Option nicht aktiviert hat.

Außerdem habe ich das Beispiel etwas aufgebohrt. Damit kann man jetzt sehen, wann es sich lohnt mehr Threads zu nehmen.

Beispiel:
Wenn #VALUES_PER_LOOP = 1 ist, dann wird nach jedem Hinzufügen eines Elements ein ReaderThread aufgeweckt, der dann, falls er dran kommt, dieses Element liest und ausgibt. Hier ist es am optimalsten, wenn man tatsächlich nur einen Thread nutzt.
Setzt man #VALUES_PER_LOOP auf einen höheren Wert, macht es durchaus Sinn mehrere ReaderThreads zu nutzen, da in diesem Fall einer zu langsam sein kann.
Bild
Antworten