Seite 1 von 1

Array für ungebremste Threads

Verfasst: 06.09.2013 17:28
von Lambda
Ich möchte die Warteschleife für Ausgang sowie Eingang für Netzwerkdaten beschleunigen. Bisher gesichert durch Mutex Objekte, wobei es sein kann, dass der Besitz wieder ergriffen wird, bevor die Warteschlange zum Zug kommt.

Da der Stapel jeweils ein Array darstellt, könnten beide Threads ungebremst darauf zugreifen, solange meines Wissen nicht beide gleichzeitig schreiben möchten.

Code: Alles auswählen

If \Queue(x)\Active
 .. hau raus...
 \Queue(x)\Active = #False
EndIf
Der Auftraggeber handhabt neue Elemente auf ähnliche Art:

Code: Alles auswählen

    For i=0 To 512
      If \Queue(i)\Active = #False
        \Queue(i)\Data     = *Data
             ...hau rein..
        \Queue(i)\Active   = #True
        Break
      EndIf
    Next
Die kritische stelle wäre "\Queue(i)\Active" da das der einzige Wert ist, welcher von beiden beschrieben/gelesen wird. Glaube nicht das es zur Kollision führen könnte, da beide erst lesen "If \Queue(i)\Active" und mit dem jeweils umgekehrten Zustand arbeiten.

Re: Array für ungebremste Threads

Verfasst: 06.09.2013 17:34
von STARGÅTE
Für soetwas gibt es CreateSemaphore,
dann brauchst du kein \Active

Re: Array für ungebremste Threads

Verfasst: 06.09.2013 17:37
von NicTheQuick
Nutze doch Semaphoren um herauszufinden, ob Platz im Array ist oder nicht. Dann bist du auf der sicheren Seite. Voraussetzung dafür ist aber, dass du das FIFO-Prinzip anwendest, also der Auftraggeber schreibt die Aufträge mit aufsteigendem Index in das Array und der andere Thread arbeitet sie auch in der selben Reihenfolge wieder ab.

Re: Array für ungebremste Threads

Verfasst: 06.09.2013 21:50
von Lambda
Der Sinn dahinter wäre, Mutex'los zu kommunizieren. Mit Semaphore müsste man die Ressource doch sperren? So wären es bspw. 512 Slots die belegt werden können, je nach Belastung würde es nicht alle benötigen da der Queue-Thread auch beim hinzufügen einer Ressource durch Empfänger-Thread, andere ausstehende verarbeitet.

Kritisch ist das ganze beim Server, der von allen Clienten empfängt, und pro Paket an alle senden muss. Wäre es evlt. effizienter pro Nutzer einen Queue(Sender)-Thread anzulegen?

Re: Array für ungebremste Threads

Verfasst: 06.09.2013 22:18
von STARGÅTE
Verstehe dein Anliegen, aber das Problem an deiner Variante ist (ohne auf das Shared-Read einzugehen), du verlierst die Reienfolge deiner Objekte und riskierst sogar, dass einige garnicht mehr verarbeitet werden.

Beispiel:
Sender belegt Slot 0, Slot 1 und Slot 2,
währenddessen liegt Empfänger auch Slot 0, Slot 1 und Slot 2 (u.U. etwas langsammer)
Sender belegt nun weiter Slot 3, nun wird der Empfänger mit Slot 0 fertig,
also wird Slot 0 vom Sendet belegt und dann erst Slot 4.
Der Empfänger verarbeitet nun weiter: Slot 1, Slot 2, Slot 3 und Slot 4 und kommt dann erst wieder zu Slot 0
Somit ist die Reihenfolge eine andere!

Dann nutze lieber Semaphores und eine FIFO Liste, und mit Mutex sperrst du immer nur kurz,
wenn der eine FirstElement() : DeleteElement() aufruft, bzw der andere LastElement() und AddElement()

Re: Array für ungebremste Threads

Verfasst: 06.09.2013 22:46
von NicTheQuick
Oder man nimmt einfach gar keine LinkedList, sondern eben ein Array mit fester Größe und einen Index zum Lesen und einen zum Schreiben. Der "Lesen"-Index darf dabei nie größer werden als der "Schreiben"-Index und umgekehrt das gleiche. Dann muss man auch nur die Indices mit Mutexen sperren und nicht das ganze Array ansich. Somit wird nie blockiert, außer der Puffer wird tatsächlich mal voll. Dann muss der "Schreiben"-Thread eben warten bis der "Lesen"-Thread weiter gearbeitet hat.

Re: Array für ungebremste Threads

Verfasst: 06.09.2013 23:35
von NicTheQuick
Hier mal ein Beispiel für einen Threadsicheren Fifo-Puffer:

Code: Alles auswählen

EnableExplicit

DeclareModule FifoBuffer
	Declare.i create(size.i)
	Declare free(*fifo)
	Declare pushBack(*fifo, *data)
	Declare.i getNext(*fifo)
EndDeclareModule

Module FifoBuffer

	Structure FifoElement
		*p[0]
	EndStructure

	Structure FifoBuffer
		size.i
		iRead.i
		iWrite.i
		writeSemaphore.i
		readSemaphore.i
		writeLock.i
		readLock.i
		*buffer.FifoElement
	EndStructure
	
	Procedure.i create(size.i)
		Protected *fifo.FifoBuffer = AllocateMemory(SizeOf(FifoBuffer))
		
		If (size > 2147483647)
			ProcedureReturn #False
		EndIf
		
		If (Not *fifo)
			ProcedureReturn #False
		EndIf
		
		With *fifo
			\size					= size
			\iWrite				= 0
			\iRead				= 0
			\readSemaphore		= CreateSemaphore(0)
			\writeSemaphore	= CreateSemaphore(size)
			\readLock			= CreateMutex()
			\writeLock			= CreateMutex()
			\buffer				= AllocateMemory(size * SizeOf(Integer))
			If (Not \buffer)
				FreeMemory(*fifo)
				ProcedureReturn #False
			EndIf
		EndWith
		
		ProcedureReturn *fifo
	EndProcedure
	
	Procedure free(*fifo.FifoBuffer)
		With *fifo
			FreeSemaphore(\readSemaphore)
			FreeSemaphore(\writeSemaphore)
			FreeMutex(\readLock)
			FreeMutex(\writeLock)
			FreeMemory(\buffer)
		EndWith
		FreeMemory(*fifo)
	EndProcedure
	
	Procedure pushBack(*fifo.FifoBuffer, *data)
		With *fifo
			LockMutex(\writeLock)
				WaitSemaphore(\writeSemaphore)
				Debug "Push " + Str(*data) + " back (iWrite=" + Str(\iWrite) + ")"
				\buffer\p[\iWrite] = *data
				\iWrite = (\iWrite + 1) % \size
				SignalSemaphore(\readSemaphore)
			UnlockMutex(\writeLock)
		EndWith
	EndProcedure
	
	Procedure.i getNext(*fifo.FifoBuffer)
		Protected *data
		With *fifo
			LockMutex(\readLock)
				WaitSemaphore(\readSemaphore)
				*data = \buffer\p[\iRead]
				\iRead = (\iRead + 1) % \size
				Debug "Got " + Str(*data) + "(iRead=" + Str(\iRead) + ")"
				SignalSemaphore(\writeSemaphore)
			UnlockMutex(\readLock)
		EndWith
		
		ProcedureReturn *data
	EndProcedure
EndModule

; E X A M P L E
Procedure WriterThread(*fifo)
	Protected i.i
	For i = 1 To 22
		FifoBuffer::pushBack(*fifo, i)
	Next
	FifoBuffer::pushBack(*fifo, 0)
EndProcedure

Procedure ReaderThread(*fifo)
	Protected i.i
	Repeat
		i = FifoBuffer::getNext(*fifo)
	Until i = 0
EndProcedure

Define *fifo = FifoBuffer::create(10)
Define *t1, *t2

*t2 = CreateThread(@WriterThread(), *fifo)
;Erstmal den Puffer vollschreiben lassen und 10 ms warten
Delay(10)
*t1 = CreateThread(@ReaderThread(), *fifo)

WaitThread(*t1)
WaitThread(*t2)

FifoBuffer::free(*fifo)

Re: Array für ungebremste Threads

Verfasst: 06.09.2013 23:55
von Lambda
Da plagt mich doch eine andere Frage. Lässt sich sowas im eigenen Netzwerk überhaupt zuverlässig testen? Pro verbundenen Client wird die Übertragung langsamer (wenn jeder in Bewegung ist - Position über UDP), bzw. der "Spieler" zieht bei jeweils anderen Clienten nach als würde der Stack nacharbeiten, das ist aber nicht der Fall. (Stapel erhöht sich nicht mehr als das erste Element)

Evlt. überlastet da alles an der selben Maschine sowie Port? Solange sich nur ein Client bewegt gibt es keine Verzögerung.

Re: Array für ungebremste Threads

Verfasst: 07.09.2013 00:28
von PMV
http://www.purebasic.fr/english/viewtop ... 74#p424174

-> CreatePointerQueueUnsave()
Eine klassische Queue, welche von mehreren Thread
beschrieben, aber nur von einem Thread gelesen werden
darf. Nützlich, wenn man keine maximale Anzahl haben
möchte, wie bei Nics Beispiel der Fall ist.

MFG PMG