Threadsafe BufferQueue

Hier könnt Ihr gute, von Euch geschriebene Codes posten. Sie müssen auf jeden Fall funktionieren und sollten möglichst effizient, elegant und beispielhaft oder einfach nur cool sein.
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8808
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 64 GB DDR4-3200
Ubuntu 24.04.2 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken

Threadsafe BufferQueue

Beitrag von NicTheQuick »

Hi Leute,

für mein aktuelles Projekt benötigte ich eine FIFO-BufferQueue. Sie funktioniert wie folgt. Es gibt zunächst folgendes Interface:

Code: Alles auswählen

Interface BufferQueue
	free.i()
	
	popAscii.a()
	popByte.b()
	popUnicode.u()
	popWord.w()
	popCharacter.c()
	popLong.l()
	popFloat.f()
	popDouble.d()
	popQuad.q()
	popInteger.i()
	popString.s(format.i = -1)
	
	popMemory.i(*buffer, length.i)
	
	pushAscii(a.a)
	pushByte(b.b)
	pushUnicode(u.u)
	pushWord(w.w)
	pushCharacter(c.c)
	pushLong(l.l)
	pushFloat(f.f)
	pushDouble(d.d)
	pushQuad(q.q)
	pushInteger(i.i)
	pushString(s.s, format.i = -1)
	
	pushMemory(*buffer, length.i)
EndInterface
Mit 'free()' kann man das Objekt freigeben.
Mit allen 'pushXXX()'-Methoden kann man Zahlenwerte oder einen ganzen Speicherbereich in die Warteschlange kopieren. Intern ist 'pushMemory()' die Hauptfunktion, alle anderen 'pushXXX()' nutzen dieses lediglich nur noch.
Mit allen 'popXXX()'-Methoden kann man diese Zahlenwerte wieder aus der Warteschlange extrahieren. Sind noch nicht genügend Bytes in der Warteschlange, wartet die 'popXXX()'-Methode bis genügend Bytes da sind. Wie die 'pushXXX()'-Methoden wird hier eigentlich immer 'popMemory()' benutzt, alle anderen 'popXXX()'-Methoden wrappen diese nur.

Da diese Methoden alle auf Byte-Basis arbeiten, kann man zum Beispiel mit 'pushLong()' eine 32-Bit-Variable in die Warteschlange kopieren und dann mit 'popWord()' wieder zwei einzelne Words heraus kopieren.

Beim Initialisieren der Warteschlange mit 'newBufferQueue()' kann man einen Parameter mitgeben, der die Größe der einzelnen internen Blöcke angibt. Ist ein Block voll, weil man oft genug 'pushXXX()' benutzt hat ohne 'popXXX()' zu nutzen, dann wird ein neuer Block erstellt. Allerdings ist diese Blockgröße immer noch dynamisch. Hat man zum Beispiel festgelegt, dass ein einzelner Block 64 Bytes groß ist und schreibt man dann mit 'pushMemory()' einen Block in die Warteschlange, der größer als 64 Bytes ist, dann wird ein neuer Block angelegt in der Größe des zu schreibenden Speicherbereiches, damit nicht mehrere Blöcke erstellt werden müssen.

Natürlich sind alle Methoden Threadsafe, sodass man jede Methode von verschiedenen Threads aus aufrufen kann.

Damit der folgende Code funktioniert, benötigt man mein Monitor-Module!

Code: Alles auswählen

XIncludeFile #PB_Compiler_FilePath + "/Monitor.pbi"

EnableExplicit

DeclareModule BufferQueue
	EnableExplicit
	
	DebugLevel 0
	
	CompilerIf Not #PB_Compiler_Thread
		CompilerError "Please activate the thread safe option!"
	CompilerEndIf
	
	#STRING_BUFFER_SIZE = 64
	
	Interface BufferQueue
		free.i()
		
		popAscii.a()
		popByte.b()
		popUnicode.u()
		popWord.w()
		popCharacter.c()
		popLong.l()
		popFloat.f()
		popDouble.d()
		popQuad.q()
		popInteger.i()
		popString.s(format.i = -1)
		
		popMemory.i(*buffer, length.i)
		
		pushAscii(a.a)
		pushByte(b.b)
		pushUnicode(u.u)
		pushWord(w.w)
		pushCharacter(c.c)
		pushLong(l.l)
		pushFloat(f.f)
		pushDouble(d.d)
		pushQuad(q.q)
		pushInteger(i.i)
		pushString(s.s, format.i = -1)
		
		pushMemory(*buffer, length.i)
	EndInterface
	
	Declare.i newBufferQueue(size.i = 4096)
EndDeclareModule

Module BufferQueue
	UseModule Monitor
	
	Structure BufferBlockS
		;Pointer zum nächsten Bufferblock
		*next.BufferBlockS
		
		;Pointer zum Buffer
		*buffer
		
		;Größe des Buffers
		size.i
		
		;Tatsächlich genutzter Buffer
		iWrite.i
		
		;Position im ersten Block, von der als nächstes gelesen werden kann.
		iRead.i
		
		;Lock um iWrite und iRead zu schützen
		*mutex.Mutex
		
		;Monitor, wenn neue Daten anstehen
		*newDataCond.ConditionVariable
		
	EndStructure
	
	Structure BufferQueueS
		*vTable
		;Lock für neue Blocks oder Löschen von Blocks
		*blockLock.Mutex
		
		;Monitor, wenn ein neuer Block erstellt wird
		*newBlockCond.ConditionVariable

		;Standardgröße für einen neuen Block
		defaultSize.i
		
		;Pointer zum ersten Block (Leseblock)
		*first.BufferBlockS
		
		;Pointer zum letzten Block (Schreibblock)
		*last.BufferBlockS
		
		*readLock.Mutex
	EndStructure
	
	;{ BufferBlock
	Procedure.i BB_new(size.i)
		Protected *attr.BufferBlockS = AllocateMemory(SizeOf(BufferBlockS))
		If (Not *attr)
			ProcedureReturn #False
		EndIf
		
		With *attr
			\buffer = AllocateMemory(size)
			If (Not \buffer) : Goto end1 : EndIf
			
			\size = size
			\iWrite = 0
			\iRead = 0
			
			\mutex = newMutex()
			If (Not \mutex) : Goto end2 : EndIf
			
			\newDataCond = \mutex\newConditionVariable()
			If (Not \newDataCond) : Goto end3 : EndIf
			
			ProcedureReturn *attr
			
			end3:
			\mutex\free()
			
			end2:
			FreeMemory(\buffer)
			
			end1:
			FreeMemory(*attr)
			
			ProcedureReturn #False
		EndWith
	EndProcedure
	
	Procedure BB_free(*attr.BufferBlockS)
		With *attr
			\newDataCond\free()
			\mutex\free()
			FreeMemory(*attr)
		EndWith
	EndProcedure
	;}
	
	;{ BufferQueue
	Procedure.i newBufferQueue(defaultSize.i = 4096)
		If (defaultSize < 8)
			defaultSize = 8
		EndIf	
		Protected *attr.BufferQueueS = AllocateMemory(SizeOf(BufferBlockS))
		If (Not *attr)
			ProcedureReturn #False
		EndIf
		
		With *attr
			\vTable = ?vTable_BufferQueue
			\defaultSize = defaultSize
			\first = 0
			\last = 0
			;\iWrite = 0
			;\iRead = 0
			;\availableBytes = 0
			
			\blockLock = newMutex()
			If (Not \blockLock) : Goto end1 : EndIf
			
			\newBlockCond = \blockLock\newConditionVariable()
			If (Not \newBlockCond) : Goto end2 : EndIf
			
			\readLock = newMutex()
			If (Not \readLock) : Goto end3 : EndIf
			
			ProcedureReturn *attr
			
			end3:
			\newBlockCond\free()
			
			end2:
			\blockLock\free()
			
			end1:
			FreeMemory(*attr)
			
			ProcedureReturn #False
		EndWith
	EndProcedure
	
	Procedure free(*attr.BufferQueueS)
		With *attr
			;Iterate over blocks and free them
			Protected *bb.BufferBlockS = \first
			While *bb
				BB_free(*bb)
				*bb = *bb\next
			Wend
			
			\readLock\free()
			\newBlockCond\free()
			\blockLock\free()
		EndWith
	EndProcedure
	
	Procedure.i add(*attr.BufferQueueS, size.i = 0)
		With *attr
			If (size <= 0)
				size = \defaultSize
			EndIf
			If (size < 8)
				size = 8
			EndIf
			\blockLock\acquire()
			Protected *bb.BufferBlockS = BB_new(size)
			If (Not *bb)
				\blockLock\release()
				ProcedureReturn #False
			EndIf
			If (\last)	;Gib dem aktuell noch letzten Block den Pointer vom neuen
				\last\next = *bb
			Else	;Wenn noch gar nichts da war, dann setze den neuen Block als ersten
				\first = *bb
			EndIf
			;Setze den neuen auf jeden Fall als letzten Block
			\last = *bb
			\newBlockCond\signal()
			\blockLock\release()
		EndWith
		
		ProcedureReturn *bb
	EndProcedure
	
	Macro popType(SMALL, TYPE)
		Procedure.SMALL	pop#TYPE(*this.BufferQueue)
			Protected value.SMALL
			*this\popMemory(@value, SizeOf(TYPE))
			ProcedureReturn value
		EndProcedure
	EndMacro
	
	popType(a, Ascii)
	popType(b, Byte)
	popType(u, Unicode)
	popType(w, Word)
	popType(c, Character)
	popType(l, Long)
	popType(f, Float)
	popType(d, Double)
	popType(q, Quad)
	popType(i, Integer)
	
	Procedure.i popMemory(*attr.BufferQueueS, *buffer, length.i)
		With *attr
			\readLock\acquire()
			Debug "    pop readLock acquired", 5
			
			\blockLock\acquire()
			While (Not \first)
				Debug "    pop wait 1", 5
				\newBlockCond\wait()
				Debug "    pop wait 2", 5
			Wend
			
			Protected *bb.BufferBlockS = \first
			
			*bb\mutex\acquire()
			Debug "    pop block mutex acquired", 5
			\blockLock\release()
			Debug "    pop blockLock released", 5
			
			Debug "    pop state: iWrite=" + *bb\iWrite + " iRead=" + *bb\iRead + " length=" + length, 5
			Protected pos.i = 0
			While pos < length
				Protected max.i = *bb\iWrite - *bb\iRead
				If (length - pos < max) : max = length - pos : EndIf
				
				If (max = 0)
					;Es existiert noch kein Folgeblock, also warte bis neue Daten kommen
					If (Not *bb\next)
						Debug "    pop waiting for new data 1", 5
						*bb\newDataCond\wait()
						Debug "    pop waiting for new data arrived", 5
					Else
						Debug "    pop change to next block", 5
						Protected *previous.BufferBlockS = *bb
						*bb = *bb\next
						*bb\mutex\acquire()
						Debug "    pop next block mutex acquired", 5
						\first = *bb
						*previous\mutex\release()
						Debug "    pop old block mtex released", 5
						BB_free(*previous)
					EndIf
				Else
					;Kopiere, was geht
					Debug "    pop copy what is possible", 5
					CopyMemory(*bb\buffer + *bb\iRead, *buffer + pos, max)
					*bb\iRead + max
					pos + max
				EndIf
			Wend
			
			Debug "    pop block mutex released", 5
			*bb\mutex\release()
			
			Debug "    pop readLock realeased", 5
			\readLock\release()
			
			ProcedureReturn length
		EndWith
	EndProcedure
	
	Procedure.s popString(*attr.BufferQueueS, format.i = -1)
		Protected s.s = Space(#STRING_BUFFER_SIZE)
		Protected *c.Character = @s, diff.i, i.i
		
		With *attr
			\readLock\acquire()
			
			If (format = -1)
				CompilerIf #PB_Compiler_Unicode
					format = #PB_Unicode
				CompilerElse
					format = #PB_Ascii
				CompilerEndIf
			EndIf
			
			If format = #PB_Unicode
				Repeat
					*c\c = popUnicode(*attr)
					If (*c\c = 0) : Break : EndIf
					*c + SizeOf(Character)
					i + 1
					If (i % #STRING_BUFFER_SIZE = 0)
						diff = *c - @s
						s + Space(#STRING_BUFFER_SIZE)
						*c = @s + diff
					EndIf
				ForEver
			ElseIf format = #PB_Ascii
				Repeat
					*c\c = popAscii(*attr)
					If (*c\c = 0) : Break : EndIf
					*c + SizeOf(Character)
					i + 1
					If (i % #STRING_BUFFER_SIZE = 0)
						diff = *c - @s
						s + Space(#STRING_BUFFER_SIZE)
						*c = @s + diff
					EndIf
				ForEver
			Else
			EndIf
			
			\readLock\release()
			
			ProcedureReturn s
		EndWith
	EndProcedure
	
	Macro pushType(SMALL, TYPE)
		Procedure	push#TYPE(*this.BufferQueue, SMALL.SMALL)
			ProcedureReturn *this\pushMemory(@SMALL, SizeOf(TYPE))
		EndProcedure
	EndMacro
	
	pushType(a, Ascii)
	pushType(b, Byte)
	pushType(u, Unicode)
	pushType(w, Word)
	pushType(c, Character)
	pushType(l, Long)
	pushType(f, Float)
	pushType(d, Double)
	pushType(q, Quad)
	pushType(i, Integer)
	
	Procedure pushMemory(*attr.BufferQueueS, *buffer, length.i)
		Protected *bb.BufferBlockS
		
		With *attr
			Protected newSize.i = \defaultSize
			If (length > newSize)
				newSize = length
			EndIf
			
			\blockLock\acquire()
			Debug "push blockLock acquired", 5
			;Wenn kein Block vorhanden ist, erstelle einen
			If (Not \last)
				*bb = add(*attr, newSize)
				If (Not *bb)
					\blockLock\release()
					ProcedureReturn #False
				EndIf
				Debug "push new Block created", 5
				;\newBlockCond\signal()
				
				Debug "push new block mutex acquired", 5
				*bb\mutex\acquire()
			Else
				\last\mutex\acquire()
				Debug "push last block mutex acquired", 5
				;Wenn der restliche Platz im letzten Block nicht für den ganzen zu kopierenden Buffer ausreicht, erstelle einen neuen
				Debug "push size=" + \last\size + " iWrite=" + \last\iWrite + " length=" + length, 5
				If (\last\size - \last\iWrite < length)
					Protected *last.BufferBlockS = \last
					Debug "push add new Block", 5
					*bb = add(*attr, newSize)
					If (Not *bb)
						Debug "push last block mutex released", 5
						*last\mutex\release()
						Debug "push blockLock released", 5
						\blockLock\release()
						ProcedureReturn #False
					EndIf
					;Signalisiere einem wartenden Thread, dass ein neuer Block vorhanden ist
					;\newBlockCond\signal()
					
					;Signalisiere einem wartenden Thread, dass er jetzt von dem zweitletzten Block lesen kann
					Debug "push signal new data", 5
					*last\newDataCond\signal()
					
					*bb\mutex\acquire()
					
					Debug "push last block mutex release", 5
					*last\mutex\release()
					
				Else
					*bb = \last
				EndIf
			EndIf
			
			Debug "push blockLock released", 5
			\blockLock\release()
			
			CopyMemory(*buffer, *bb\buffer + *bb\iWrite, length)
			*bb\iWrite + length
			
			*bb\newDataCond\signal()
			*bb\mutex\release()
		EndWith
	EndProcedure
	
	Procedure pushString(*this.BufferQueue, s.s, format.i = -1)
		Protected length.i
		
		If (format = -1)
			length = StringByteLength(s) + SizeOf(Character)
			*this\pushMemory(@s, length)
		Else
			CompilerIf #PB_Compiler_Unicode
				If (format = #PB_Unicode)
					*this\pushMemory(@s, Len(s) + SizeOf(Unicode))
				Else
					length = Len(s)
					Protected *mem = AllocateMemory(2 * length + 1)
					length = PokeS(*mem, s, -1, #PB_UTF8) + 1
					*this\pushMemory(*mem, length)
					FreeMemory(*mem)
				EndIf
			CompilerElse
				If (format = #PB_Unicode)
					length = Len(s)
					Protected *mem = AllocateMemory(SizeOf(Unicode) * (length + 1))
					length = PokeS(*mem, s, -1, #PB_Unicode) + SizeOf(Unicode)
					*this\pushMemory(*mem, length)
					FreeMemory(*mem)
				Else
					*this\pushMemory(@s, Len(s) + 1)
				EndIf
			CompilerEndIf
		EndIf
	EndProcedure
	
	DataSection
		vTable_BufferQueue:
			Data.i	@free(),
					@popAscii(), @popByte(), @popUnicode(), @popWord(),
					@popCharacter(), @popLong(), @popFloat(), @popDouble(),
					@popQuad(), @popInteger(), @popString(), @popMemory(),
					@pushAscii(), @pushByte(), @pushUnicode(), @pushWord(),
					@pushCharacter(), @pushLong(), @pushFloat(), @pushDouble(),
					@pushQuad(), @pushInteger(), @pushString(), @pushMemory()
	EndDataSection
	
	;}
EndModule

CompilerIf #PB_Compiler_IsMainFile
	UseModule BufferQueue
	
	Procedure WriterThread(*bq.BufferQueue)
		Protected i.q
		For i = 1 To 4
			Debug "PUSH " + i
			*bq\pushQuad(i)
			Debug "PUSHED " + i
			;Delay(10)
		Next
		*bq\pushQuad(0)
		
		Protected c.c
		For i = 1 To 10
			c = Random(90, 65)
			Debug "PUSH " + c
			*bq\pushCharacter(c)
			Debug "PUSHED " + c
		Next
		Debug "PUSH 0"
		*bq\pushCharacter(0)
		Debug "PUSHED 0"
		*bq\pushString("Hallo STARGÅTE")
		CompilerIf Not #PB_Compiler_Unicode
			*bq\pushAscii(0)
			Debug "!! ACTIVATE UNICODE TO RETRIEVE THE CORRECT TEXT"
		CompilerEndIf
	EndProcedure
	
	Procedure ReaderThread(*bq.BufferQueue)
		Protected i.q
		
		Repeat
			Debug "    POP"
			i = *bq\popQuad()
			Debug "    POPPED " + i
		Until i = 0
		
		Protected s.s
		Debug "    POP String"
		s = *bq\popString()
		Debug "    POPPED '" + s + "'"
		Debug "    POP String"
		s = *bq\popString(#PB_Unicode)
		Debug "    POPPED '" + s + "'"
	EndProcedure
	
	Define *bq.BufferQueue = newBufferQueue()
	
	Define reader.i = CreateThread(@ReaderThread(), *bq)
	
	Delay(100)
	Define writer.i = CreateThread(@WriterThread(), *bq)
	
	WaitThread(writer)
	WaitThread(reader)
	Debug "ENDE"
	
CompilerEndIf
Versionen
12.03.2014a: Byte, Unicode und Character hinzugefügt.
12.03.2014b: Strings hinzugefügt.
SBond
Beiträge: 266
Registriert: 22.05.2013 20:35

Re: Threadsafe BufferQueue

Beitrag von SBond »

cool :)

ich denke so etwas könnte ich demnächst gut gebrauchen :D
41 6c 73 6f 20 77 65 6e 6e 20 64 75 20 73 6f 20 76 69 65 6c 20 4c 61 6e 67 65 77 65 69 6c 65 20 68 61 73 74 2c 20 64 61 6e 6e 20 6b 61 6e 6e 73 74 20 64 75 20 61 75 63 68 20 67 6c 65 69 63 68 20 7a 75 20 6d 69 72 20 6b 6f 6d 6d 65 6e 20 75 6e 64 20 61 62 77 61 73 63 68 65 6e 2e

:D
Benutzeravatar
STARGÅTE
Kommando SG1
Beiträge: 7031
Registriert: 01.11.2005 13:34
Wohnort: Glienicke
Kontaktdaten:

Re: Threadsafe BufferQueue

Beitrag von STARGÅTE »

Interessant, so kann man das "Problem" mit den Parametern und Rückgabewerten für Threads umgehen, indem man nun einfach seine Sachen hoch-pushed und dann im Thread runter-poped.

Was mir da allerdings noch fehlt würde wäre PushString und PopString, was ja praktisch (mit NUL-Endenden Strings und UTF8) auch machbar wäre.

Und wieso heißt es eigentlich popAscii (unsign) und popWord (sign)? Wäre nicht Byte und Word bzw. Ascii und Unicode sinnvoller?
PB 6.01 ― Win 10, 21H2 ― Ryzen 9 3900X, 32 GB ― NVIDIA GeForce RTX 3080 ― Vivaldi 6.0 ― www.unionbytes.de
Aktuelles Projekt: Lizard - Skriptsprache für symbolische Berechnungen und mehr
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8808
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 64 GB DDR4-3200
Ubuntu 24.04.2 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken

Re: Threadsafe BufferQueue

Beitrag von NicTheQuick »

STARGÅTE hat geschrieben:Und wieso heißt es eigentlich popAscii (unsign) und popWord (sign)? Wäre nicht Byte und Word bzw. Ascii und Unicode sinnvoller?
Guter Einwand. Aber das ist ja schnell erledigt. Kann ich gerne gleich machen. Oder ich baue gleich alle Typen ein, die Purebasic nativ zur Verfügung stellt.
Was mir da allerdings noch fehlt würde wäre PushString und PopString, was ja praktisch (mit NUL-Endenden Strings und UTF8) auch machbar wäre.
Das könnte man auch machen, indem man 'popAscii()' bzw 'popWord()' oft genug aufruft bis ein 0-Byte kommt. Das kann ich gerne hinzufügen und mit einem extra Lock schützen, damit es nicht von anderen einzelnen Aufrufen blockiert werden kann.
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8808
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 64 GB DDR4-3200
Ubuntu 24.04.2 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken

Re: Threadsafe BufferQueue

Beitrag von NicTheQuick »

So, Strings gehen jetzt auch. Sogar inklusive der Konvertierung in verschiedenen Formate ähnlich 'PeekS()' und 'PokeS()'. Das braucht zwar jeweils ein paar Umwege, aber wenn man nicht herum konvertiert, sollte es recht fix gehen.
Antworten