Threadsafe Queue without strict blocking is unstable?
Posted: Thu Sep 15, 2011 10:36 pm
situation:
I have one thread who generates data and another thread (mainthread),
that consumes that data. To shift data from the first thread to the other,
i have made a queue saved by a mutex. This threads are used inside of a
game, so they need to be fast as possible, don't waisting time on waiting
for each other.
problem:
Because this queue is heavily used, the mutex is blocking much on peak.
As i understand, inside of a queue there is only collisions, if there is
one element left or nothing. If the queue is full, the writer and reader
don't collide by calling dequeue() and enqueue() at the same time.
Because dequeue() will only change the first entry, and enqueue() only
the last one. But it looks like my code gets unstable when i use it like
that.
known (not) problem:
Because the counter of the queue element is read and written at same time
on both threads, it would need full mutexprotection to get the right
number at any time. But thats ok, i only use it to get the stored elements
in one step and check against the result on dequeue(), so i know if there
was really on element or not. If the counter gets under zero, it will correct
by the queue itself, when there comes a new element. If the counter gets
to high, there will only be elements left till there are new ones, but thats
also ok for now.
code with example:
Don't forget to set "Threadsafe" option in compiler.
But it could be that you need to run this code for ages, to
get an error. So there is no need to run it.
question:
Can some one explain, why it gets unstable? Is there anything not atomic,
that i think it is? Could PB generate ASM-Code, where it gets problems
if it is called from 2 threads? Anyone here with some internal informations?
Would be glad, if anyone could help here.
edit:
after much more tests, it seems that after my last changes on this queue,
it is stable and the error comes from some where else. But if it is, i would
be glad, if anyone could confirm that, too.
Thank you for any try.
MFG PMV
I have one thread who generates data and another thread (mainthread),
that consumes that data. To shift data from the first thread to the other,
i have made a queue saved by a mutex. This threads are used inside of a
game, so they need to be fast as possible, don't waisting time on waiting
for each other.
problem:
Because this queue is heavily used, the mutex is blocking much on peak.
As i understand, inside of a queue there is only collisions, if there is
one element left or nothing. If the queue is full, the writer and reader
don't collide by calling dequeue() and enqueue() at the same time.
Because dequeue() will only change the first entry, and enqueue() only
the last one. But it looks like my code gets unstable when i use it like
that.
known (not) problem:
Because the counter of the queue element is read and written at same time
on both threads, it would need full mutexprotection to get the right
number at any time. But thats ok, i only use it to get the stored elements
in one step and check against the result on dequeue(), so i know if there
was really on element or not. If the counter gets under zero, it will correct
by the queue itself, when there comes a new element. If the counter gets
to high, there will only be elements left till there are new ones, but thats
also ok for now.
code with example:
Don't forget to set "Threadsafe" option in compiler.
But it could be that you need to run this code for ages, to
get an error. So there is no need to run it.

Code: Select all
; -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
;
;
; PointerQueue - include
;
;
; -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
;
; description:
; little synchronized queue to save one pointer per element
;
; {|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}
;
; © 2008-2011 by PMV
;
; {|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}
Interface PointerQueue
Enqueue.i(*Pointer)
Get.i()
Dequeue.i()
Free()
Count.i()
EndInterface
Structure PointerQueue_ElementStruc
*Pointer ;saved pointer from the element
*Next.PointerQueue_ElementStruc
EndStructure
Structure PointerQueue_Struc
VTable.i ;pointer to the functions
;Queue data
*_First.PointerQueue_ElementStruc
*_Last.PointerQueue_ElementStruc
Num.i
FreeMemory.i
Mutex.i
; ---
EndStructure
Procedure.i PointerQueue_Enqueue(*PointerQueue.PointerQueue_Struc, *Pointer)
Protected *New.PointerQueue_ElementStruc
If *Pointer
*New = AllocateMemory(SizeOf(PointerQueue_ElementStruc))
If *New
*New\Pointer = *Pointer
LockMutex(*PointerQueue\Mutex)
If *PointerQueue\_Last
*PointerQueue\_Last\Next = *New
*PointerQueue\_Last = *New
*PointerQueue\Num + 1
Else
*PointerQueue\_First = *New
*PointerQueue\_Last = *New
*PointerQueue\Num = 1
EndIf
UnlockMutex(*PointerQueue\Mutex)
ProcedureReturn *New\Pointer
EndIf
EndIf
ProcedureReturn #False
EndProcedure
Procedure.i PointerQueue_Dequeue(*PointerQueue.PointerQueue_Struc)
Protected *Pointer
Protected *Del.PointerQueue_ElementStruc = *PointerQueue\_First
If *Del
If *PointerQueue\_Last = *PointerQueue\_First
LockMutex(*PointerQueue\Mutex)
If *Del = *PointerQueue\_Last
*PointerQueue\_Last = #False
*PointerQueue\Num = 0
Else
*PointerQueue\Num - 1
EndIf
*PointerQueue\_First = *Del\Next
UnlockMutex(*PointerQueue\Mutex)
Else
*PointerQueue\_First = *Del\Next
*PointerQueue\Num - 1
EndIf
*Pointer = *Del\Pointer
FreeMemory(*Del)
EndIf
ProcedureReturn *Pointer
EndProcedure
Procedure.i PointerQueue_Get(*PointerQueue.PointerQueue_Struc)
Protected *Pointer
If *PointerQueue\_First
*Pointer = *PointerQueue\_First\Pointer
EndIf
ProcedureReturn *Pointer
EndProcedure
Procedure PointerQueue_Free(*PointerQueue.PointerQueue_Struc)
Protected *Del.PointerQueue_ElementStruc
While *PointerQueue\_First
*Del = *PointerQueue\_First
*PointerQueue\_First = *PointerQueue\_First\Next
If *PointerQueue\FreeMemory : FreeMemory(*Del\Pointer) : EndIf
FreeMemory(*Del)
Wend
FreeMutex(*PointerQueue\Mutex)
FreeMemory(*PointerQueue)
EndProcedure
Procedure PointerQueue_Count(*PointerQueue.PointerQueue_Struc)
ProcedureReturn *PointerQueue\Num
EndProcedure
;the counter will not have the right number of elements!
Procedure.i CreatePointerQueue(FreeMemory.i = #False)
Protected *PointerQueue.PointerQueue_Struc = AllocateMemory(SizeOf(PointerQueue_Struc))
If *PointerQueue
*PointerQueue\VTable = ?PointerQueue_VTable
*PointerQueue\FreeMemory = FreeMemory
*PointerQueue\Mutex = CreateMutex()
If *PointerQueue\Mutex
ProcedureReturn *PointerQueue
Else
FreeMemory(*PointerQueue)
EndIf
EndIf
ProcedureReturn #False
EndProcedure
DataSection
PointerQueue_VTable:
Data.i @PointerQueue_Enqueue()
Data.i @PointerQueue_Get()
Data.i @PointerQueue_Dequeue()
Data.i @PointerQueue_Free()
Data.i @PointerQueue_Count()
EndDataSection
; example
Global *Queue.PointerQueue = CreatePointerQueue(#True)
Global EndProgram.i = #False
Structure SomeData
i.i
Time.i
EndStructure
Procedure ReadThread(Dummy.i)
Protected *SomeData.SomeData
Protected Count.i
Repeat
Count = *Queue\Count() - 1
Debug "Count: " + Str(Count)
For i = 0 To Count
*SomeData = *Queue\Dequeue()
If *SomeData
If Random(Count) = 0
Debug "Data: " + Str(*SomeData\i) + ". " + Str(*SomeData\Time)
EndIf
FreeMemory(*SomeData)
EndIf
Next
Delay(Random(20))
Until EndProgram
EndProcedure
Procedure WriteThread(Dummy.i)
Protected *SomeData.SomeData
Protected i.i
Repeat
*SomeData = AllocateMemory(SizeOf(SomeData))
*SomeData\i = i
*SomeData\Time = ElapsedMilliseconds()
*Queue\Enqueue(*SomeData)
If Random(10) = 0
Delay(Random(2))
EndIf
i + 1
Until EndProgram
EndProcedure
Procedure WaitTime(Time.i)
Delay(Time)
EndProgram = #True
EndProcedure
Define Thread1.i = CreateThread(@WriteThread(), #Null)
Define Thread2.i = CreateThread(@WaitTime(), 10000)
ReadThread(#Null)
WaitThread(Thread1)
WaitThread(Thread2)
Debug "End"
Can some one explain, why it gets unstable? Is there anything not atomic,
that i think it is? Could PB generate ASM-Code, where it gets problems
if it is called from 2 threads? Anyone here with some internal informations?
Would be glad, if anyone could help here.
edit:
after much more tests, it seems that after my last changes on this queue,
it is stable and the error comes from some where else. But if it is, i would
be glad, if anyone could confirm that, too.

Thank you for any try.

MFG PMV