Page 1 of 1

Threadsafe Queue without strict blocking is unstable?

Posted: Thu Sep 15, 2011 10:36 pm
by PMV
situation:
I have one thread who generates data and another thread (mainthread),
that consumes that data. To shift data from the first thread to the other,
i have made a queue saved by a mutex. This threads are used inside of a
game, so they need to be fast as possible, don't waisting time on waiting
for each other.

problem:
Because this queue is heavily used, the mutex is blocking much on peak.
As i understand, inside of a queue there is only collisions, if there is
one element left or nothing. If the queue is full, the writer and reader
don't collide by calling dequeue() and enqueue() at the same time.
Because dequeue() will only change the first entry, and enqueue() only
the last one. But it looks like my code gets unstable when i use it like
that.

known (not) problem:
Because the counter of the queue element is read and written at same time
on both threads, it would need full mutexprotection to get the right
number at any time. But thats ok, i only use it to get the stored elements
in one step and check against the result on dequeue(), so i know if there
was really on element or not. If the counter gets under zero, it will correct
by the queue itself, when there comes a new element. If the counter gets
to high, there will only be elements left till there are new ones, but thats
also ok for now.

code with example:
Don't forget to set "Threadsafe" option in compiler.
But it could be that you need to run this code for ages, to
get an error. So there is no need to run it. :)

Code: Select all

; -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
;
;
;          PointerQueue - include
;
;
; -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
;
; description:
; little synchronized queue to save one pointer per element
;
; {|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}
;
; © 2008-2011 by PMV 
;
; {|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}


Interface PointerQueue
  Enqueue.i(*Pointer)
  Get.i()
  Dequeue.i()
  Free()
  Count.i()
EndInterface

Structure PointerQueue_ElementStruc
  *Pointer  ;saved pointer from the element
  *Next.PointerQueue_ElementStruc
EndStructure

Structure PointerQueue_Struc
  VTable.i ;pointer to the functions
  
  ;Queue data
  *_First.PointerQueue_ElementStruc
  *_Last.PointerQueue_ElementStruc
  Num.i
  FreeMemory.i
  Mutex.i
  ; ---
EndStructure

Procedure.i PointerQueue_Enqueue(*PointerQueue.PointerQueue_Struc, *Pointer)
  Protected *New.PointerQueue_ElementStruc
  If *Pointer
    *New = AllocateMemory(SizeOf(PointerQueue_ElementStruc))
    If *New
      *New\Pointer = *Pointer

      LockMutex(*PointerQueue\Mutex)
        If *PointerQueue\_Last
          *PointerQueue\_Last\Next = *New
          *PointerQueue\_Last = *New
          *PointerQueue\Num + 1
        Else
          *PointerQueue\_First = *New
          *PointerQueue\_Last = *New
          *PointerQueue\Num = 1
        EndIf
      UnlockMutex(*PointerQueue\Mutex)
      
      ProcedureReturn *New\Pointer
    EndIf
  EndIf
  ProcedureReturn #False
EndProcedure

Procedure.i PointerQueue_Dequeue(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer
  Protected *Del.PointerQueue_ElementStruc = *PointerQueue\_First
  
  If *Del
    
    If *PointerQueue\_Last = *PointerQueue\_First
      LockMutex(*PointerQueue\Mutex)
        If *Del = *PointerQueue\_Last
          *PointerQueue\_Last = #False
          *PointerQueue\Num = 0
        Else
          *PointerQueue\Num - 1
        EndIf
        *PointerQueue\_First = *Del\Next
      UnlockMutex(*PointerQueue\Mutex)
    Else
      *PointerQueue\_First = *Del\Next
      *PointerQueue\Num - 1
    EndIf
    *Pointer = *Del\Pointer
    FreeMemory(*Del)
  EndIf
  
  ProcedureReturn *Pointer
EndProcedure

Procedure.i PointerQueue_Get(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer
  
  If *PointerQueue\_First
    *Pointer = *PointerQueue\_First\Pointer
  EndIf
  ProcedureReturn *Pointer
EndProcedure

Procedure PointerQueue_Free(*PointerQueue.PointerQueue_Struc)
  Protected *Del.PointerQueue_ElementStruc
  While *PointerQueue\_First
    *Del = *PointerQueue\_First
    *PointerQueue\_First = *PointerQueue\_First\Next
    If *PointerQueue\FreeMemory : FreeMemory(*Del\Pointer) : EndIf
    FreeMemory(*Del)
  Wend
  FreeMutex(*PointerQueue\Mutex)
  FreeMemory(*PointerQueue)
EndProcedure

Procedure PointerQueue_Count(*PointerQueue.PointerQueue_Struc)
  ProcedureReturn *PointerQueue\Num
EndProcedure

;the counter will not have the right number of elements!
Procedure.i CreatePointerQueue(FreeMemory.i = #False)
  Protected *PointerQueue.PointerQueue_Struc = AllocateMemory(SizeOf(PointerQueue_Struc))
  
  If *PointerQueue
    *PointerQueue\VTable = ?PointerQueue_VTable
    
    *PointerQueue\FreeMemory = FreeMemory
    *PointerQueue\Mutex = CreateMutex()
    
    If *PointerQueue\Mutex
      ProcedureReturn *PointerQueue
    Else
      FreeMemory(*PointerQueue)
    EndIf
  EndIf
  
  ProcedureReturn #False
EndProcedure

DataSection
  PointerQueue_VTable:
  Data.i @PointerQueue_Enqueue()
  Data.i @PointerQueue_Get()
  Data.i @PointerQueue_Dequeue()
  Data.i @PointerQueue_Free()
  Data.i @PointerQueue_Count()
EndDataSection



; example
Global *Queue.PointerQueue = CreatePointerQueue(#True)
Global EndProgram.i = #False
Structure SomeData
  i.i
  Time.i
EndStructure


Procedure ReadThread(Dummy.i)
  Protected *SomeData.SomeData
  Protected Count.i
  
  Repeat
    Count = *Queue\Count() - 1
    Debug "Count: " + Str(Count)
    For i = 0 To Count
      *SomeData = *Queue\Dequeue()
      If *SomeData
        If Random(Count) = 0
          Debug "Data: " + Str(*SomeData\i) + ". " + Str(*SomeData\Time)
        EndIf
        FreeMemory(*SomeData)
      EndIf
    Next
    Delay(Random(20))
  Until EndProgram
EndProcedure

Procedure WriteThread(Dummy.i)
  Protected *SomeData.SomeData
  Protected i.i
  
  Repeat
    *SomeData = AllocateMemory(SizeOf(SomeData)) 
    *SomeData\i = i
    *SomeData\Time = ElapsedMilliseconds()
    *Queue\Enqueue(*SomeData)
    If Random(10) = 0
      Delay(Random(2))
    EndIf
    i + 1
  Until EndProgram
EndProcedure

Procedure WaitTime(Time.i)
  Delay(Time)
  EndProgram = #True
EndProcedure

Define Thread1.i = CreateThread(@WriteThread(), #Null)
Define Thread2.i = CreateThread(@WaitTime(), 10000)
ReadThread(#Null)
WaitThread(Thread1)
WaitThread(Thread2)
Debug "End"
question:
Can some one explain, why it gets unstable? Is there anything not atomic,
that i think it is? Could PB generate ASM-Code, where it gets problems
if it is called from 2 threads? Anyone here with some internal informations?
Would be glad, if anyone could help here.

edit:
after much more tests, it seems that after my last changes on this queue,
it is stable and the error comes from some where else. But if it is, i would
be glad, if anyone could confirm that, too. :D

Thank you for any try. :)

MFG PMV

Re: Threadsafe Queue without strict blocking is unstable

Posted: Fri Sep 16, 2011 12:53 am
by idle
I'm not really sure what the trouble is that your having but if it's a case of performance
maybe you should use a semephore to do the deque so you can safely lock your mutex

Re: Threadsafe Queue without strict blocking is unstable

Posted: Fri Sep 16, 2011 1:54 am
by idle
Don't know if this is 100% safe or stable, what do you think?

Code: Select all

Global NewList myqueue()
Global dequeueSeme = CreateSemaphore()
Global queueSeme = CreateSemaphore(1)
Global mutex = CreateMutex()
Global  EndProgram

Structure SomeData
  i.i
  Time.i
EndStructure

Procedure dequeue(void)
   Protected *SomeData.SomeData
  Repeat
    WaitSemaphore(dequeueSeme)
     count = ListSize(myqueue())
     If count > 0
       FirstElement(myqueue())
     Repeat
       item = myqueue()
       *SomeData = item
        Debug "Data: " + Str(*SomeData\i) + ". " + Str(*SomeData\Time) + " " + Str(count)
        count-1
        DeleteElement(myqueue())
        SignalSemaphore(queueSeme)
     Until NextElement(myqueue()) = 0 
     EndIf 
     SignalSemaphore(queueSeme) 
  Until EndProgram 
   
EndProcedure

Procedure queue(item.i)
  WaitSemaphore(queueSeme) 
  LastElement(myqueue())
  AddElement(myqueue())
  myqueue() = item
  SignalSemaphore(dequeueSeme)
EndProcedure   

;--------------------------------------------

Procedure WriteThread(Dummy.i)
  Protected *SomeData.SomeData
  Protected i.i 

  Repeat
    *SomeData = AllocateMemory(SizeOf(SomeData))
    *SomeData\i = i
    *SomeData\Time = ElapsedMilliseconds()
    queue(*SomeData)
    If Random(10) = 0
      Delay(Random(20))
    EndIf
    i + 1
  Until EndProgram = #True
  Debug "Items added " + Str(i) 
EndProcedure

Procedure WaitTime(Time.i)
  Delay(Time)
  EndProgram = #True
  SignalSemaphore(dequeueSeme)
EndProcedure

Define thread0.i = CreateThread(@dequeue(),#Null)
Define Thread1.i = CreateThread(@WriteThread(), #Null)
Define Thread2.i = CreateThread(@WaitTime(), 10000)

WaitThread(Thread0)
WaitThread(Thread1)
WaitThread(Thread2)
Debug "End"

Re: Threadsafe Queue without strict blocking is unstable

Posted: Sat Sep 17, 2011 3:09 am
by PMV
Thanks for your try idle, but i want to have a non strict blocking version.
Like i have posted. With yours, enqueue() and dequeue() will always
wait for each other to be finished ... in my version, they will be complete
parallel as long there is more then one element in the queue. :D
And because of that, linked lists can't be used. They can't be accessed
at the same time from 2 threads.


I have now tested a little more and it seems to be stable know with that
version, i have posted. So, it would be helpfull too, if some one could
confirm, that there should be no problem with that. :D
Because then i would know, that the bug must come from some where else.

MFG PMV

Re: Threadsafe Queue without strict blocking is unstable?

Posted: Fri Sep 06, 2013 11:12 pm
by PMV
completely forgotten this thread ...
of course the semaphore is the only solution.
But as that, i miss the possibility to read the
counter of a semaphore ... so i have made a stupid
count-function that has to walk through all elements and
even needs to lock the queue. Not really usfull. :lol:

But because maybe it will be helpfull for someone else, here
is my complete include of my PointerQueue-Include
with 3 variants:
1. single-thread use (CreatePointerQueue())
2. full multi-thread use (CreatePointerQueueSave())
3. multiple writers but only one reader (CreatePointerQueueUnsave())

Code: Select all

; -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
;
;
;          PointerQueue - include
;
;
; -:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-:-
;
; description:
; little synchronized queue to save one pointer per element
;
; {|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}
;
; © 2008-2013 by PMV 
;
; {|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}-{|}

Interface PointerQueue
  Enqueue.i(*Pointer)
  EnqueueString.i(String.s)
  Get.i()
  GetString.s()
  Dequeue.i()
  DequeueString.s()
  Free()
  Count.i()
EndInterface

Structure PointerQueue_ElementStruc
  *Pointer  ;saved pointer from the element
  *Next.PointerQueue_ElementStruc
EndStructure

Structure PointerQueue_Struc
  VTable.i ;pointer to the functions
  
  ;Queue data
  *_First.PointerQueue_ElementStruc
  *_Last.PointerQueue_ElementStruc
  StructureUnion
    Num.i
    Semaphore.i
  EndStructureUnion
  FreeMemory.i
  Mutex.i
  ; ---
EndStructure

; normal procedure for single-thread use
Procedure.i PointerQueue_Enqueue(*PointerQueue.PointerQueue_Struc, *Pointer)
  Protected *New.PointerQueue_ElementStruc
  If *Pointer
    *New = AllocateMemory(SizeOf(PointerQueue_ElementStruc))
    If *New
      *New\Pointer = *Pointer

      If *PointerQueue\_Last
        *PointerQueue\_Last\Next = *New
        *PointerQueue\_Last = *New
        *PointerQueue\Num + 1
      Else
        *PointerQueue\_First = *New
        *PointerQueue\_Last = *New
        *PointerQueue\Num = 1
      EndIf
     
      ProcedureReturn *New\Pointer
    EndIf
  EndIf
  ProcedureReturn #False
EndProcedure

; mutex protected procedure for multi-thread use
Procedure.i PointerQueue_Enqueue2(*PointerQueue.PointerQueue_Struc, *Pointer)
  Protected *New.PointerQueue_ElementStruc
  If *Pointer
    *New = AllocateMemory(SizeOf(PointerQueue_ElementStruc))
    If *New
      *New\Pointer = *Pointer

      LockMutex(*PointerQueue\Mutex)
        If *PointerQueue\_Last
          *PointerQueue\_Last\Next = *New
          *PointerQueue\_Last = *New
          *PointerQueue\Num + 1
        Else
          *PointerQueue\_First = *New
          *PointerQueue\_Last = *New
          *PointerQueue\Num = 1
        EndIf
      UnlockMutex(*PointerQueue\Mutex)
      
      ProcedureReturn *New\Pointer
    EndIf
  EndIf
  ProcedureReturn #False
EndProcedure

; mutex protected procedure for multi-thread use with a single reader
; single reader means: only one thread calls Dequeue() and Get() on that queue!
Procedure.i PointerQueue_Enqueue3(*PointerQueue.PointerQueue_Struc, *Pointer)
  Protected *New.PointerQueue_ElementStruc
  If *Pointer
    *New = AllocateMemory(SizeOf(PointerQueue_ElementStruc))
    If *New
      *New\Pointer = *Pointer

      LockMutex(*PointerQueue\Mutex)
        If *PointerQueue\_Last
          *PointerQueue\_Last\Next = *New
          *PointerQueue\_Last = *New
        Else
          *PointerQueue\_First = *New
          *PointerQueue\_Last = *New
        EndIf
      UnlockMutex(*PointerQueue\Mutex)
      SignalSemaphore(*PointerQueue\Semaphore)
      
      ProcedureReturn *New\Pointer
    EndIf
  EndIf
  ProcedureReturn #False
EndProcedure

; normal procedure for single-thread use
Procedure.i PointerQueue_Dequeue(*PointerQueue.PointerQueue_Struc)
  Protected *Del.PointerQueue_ElementStruc = *PointerQueue\_First
  Protected *Pointer
  
  If *Del
    *PointerQueue\Num - 1

    *Pointer = *Del\Pointer
    *PointerQueue\_First = *Del\Next
    

    If *Del = *PointerQueue\_Last
      *PointerQueue\_Last = #False
    EndIf
    FreeMemory(*Del)
  EndIf
  
  ProcedureReturn *Pointer
EndProcedure

; mutex protected procedure for multi-thread use
Procedure.i PointerQueue_Dequeue2(*PointerQueue.PointerQueue_Struc)
  LockMutex(*PointerQueue\Mutex)
    Protected *Del.PointerQueue_ElementStruc = *PointerQueue\_First
    Protected *Pointer
    
    If *Del
      *PointerQueue\Num - 1

      *Pointer = *Del\Pointer
      *PointerQueue\_First = *Del\Next
      

      If *Del = *PointerQueue\_Last
        *PointerQueue\_Last = #False
      EndIf
      FreeMemory(*Del)
    EndIf
  
  UnlockMutex(*PointerQueue\Mutex)
  ProcedureReturn *Pointer
EndProcedure

; mutex protected procedure for multi-thread use with a single reader
; single reader means: only one thread calls Dequeue() and Get() on that queue!
Procedure.i PointerQueue_Dequeue3(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer
  Protected *Del.PointerQueue_ElementStruc
  
  If TrySemaphore(*PointerQueue\Semaphore)
    *Del = *PointerQueue\_First
    If *Del
      If *PointerQueue\_Last = *PointerQueue\_First
        LockMutex(*PointerQueue\Mutex)
          If *Del = *PointerQueue\_Last
            *PointerQueue\_Last = #False
          EndIf
          *PointerQueue\_First = *Del\Next
        UnlockMutex(*PointerQueue\Mutex)
      Else
        *PointerQueue\_First = *Del\Next
      EndIf
      *Pointer = *Del\Pointer
      FreeMemory(*Del)
    EndIf
  EndIf
  
  ProcedureReturn *Pointer
EndProcedure

; normal procedure for single-thread use
Procedure.i PointerQueue_Get(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer
  
  If *PointerQueue\_First
    *Pointer = *PointerQueue\_First\Pointer
  EndIf
  ProcedureReturn *Pointer
EndProcedure

; mutex protected procedure for multi-thread use
Procedure.i PointerQueue_Get2(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer
  
  LockMutex(*PointerQueue\Mutex)
    If *PointerQueue\_First
      *Pointer = *PointerQueue\_First\Pointer
    EndIf
  UnlockMutex(*PointerQueue\Mutex)
  ProcedureReturn *Pointer
EndProcedure

; normal procedure for single-thread use
Procedure.s PointerQueue_GetString(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer = PointerQueue_Get(*PointerQueue)
  
  If *Pointer
    ProcedureReturn PeekS(*Pointer)
  EndIf
  ProcedureReturn ""
EndProcedure

; mutex protected procedure for multi-thread use
Procedure.s PointerQueue_GetString2(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer = PointerQueue_Get2(*PointerQueue)
  
  If *Pointer
    ProcedureReturn PeekS(*Pointer)
  EndIf
  ProcedureReturn ""
EndProcedure

; normal procedure for single-thread use
Procedure.i PointerQueue_EnqueueString(*PointerQueue.PointerQueue_Struc, String.s)
  Protected *Pointer = AllocateMemory((Len(String) + 1) * SizeOf(CHARACTER))
  Protected Result.l

  If *Pointer
    PokeS(*Pointer, String)
    Result = PointerQueue_Enqueue(*PointerQueue, *Pointer)
    If Result
      ProcedureReturn Result
    Else
      FreeMemory(*Pointer)
    EndIf
  EndIf
  
  ProcedureReturn #False
EndProcedure

; mutex protected procedure for multi-thread use
Procedure.i PointerQueue_EnqueueString2(*PointerQueue.PointerQueue_Struc, String.s)
  Protected *Pointer = AllocateMemory((Len(String) + 1) * SizeOf(CHARACTER))
  Protected Result.l

  If *Pointer
    PokeS(*Pointer, String)
    Result = PointerQueue_Enqueue2(*PointerQueue, *Pointer)
    If Result
      ProcedureReturn Result
    Else
      FreeMemory(*Pointer)
    EndIf
  EndIf
  
  ProcedureReturn #False
EndProcedure

; normal procedure for single-thread use
Procedure.s PointerQueue_DequeueString(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer = PointerQueue_Dequeue(*PointerQueue)
  Protected Result.s = ""
  
  If *Pointer
    Result = PeekS(*Pointer)
    If *PointerQueue\FreeMemory : FreeMemory(*Pointer) : EndIf
  EndIf
  
  ProcedureReturn Result
EndProcedure

; mutex protected procedure for multi-thread use
Procedure.s PointerQueue_DequeueString2(*PointerQueue.PointerQueue_Struc)
  Protected *Pointer = PointerQueue_Dequeue2(*PointerQueue)
  Protected Result.s = ""
  
  If *Pointer
    Result = PeekS(*Pointer)
    If *PointerQueue\FreeMemory : FreeMemory(*Pointer) : EndIf
  EndIf
  
  ProcedureReturn Result
EndProcedure

Procedure PointerQueue_Free(*PointerQueue.PointerQueue_Struc)
  Protected *Del.PointerQueue_ElementStruc
  While *PointerQueue\_First
    *Del = *PointerQueue\_First
    *PointerQueue\_First = *PointerQueue\_First\Next
    If *PointerQueue\FreeMemory : FreeMemory(*Del\Pointer) : EndIf
    FreeMemory(*Del)
  Wend
  If *PointerQueue\Mutex : FreeMutex(*PointerQueue\Mutex) : EndIf
  If *PointerQueue\VTable = ?PointerQueue3_VTable
    If *PointerQueue\Semaphore : FreeSemaphore(*PointerQueue\Semaphore) : EndIf
  EndIf
  FreeMemory(*PointerQueue)
EndProcedure

Procedure PointerQueue_Count(*PointerQueue.PointerQueue_Struc)
  ProcedureReturn *PointerQueue\Num
EndProcedure

Procedure PointerQueue_Count2(*PointerQueue.PointerQueue_Struc)
  Protected Counter.i = 0
  
  LockMutex(*PointerQueue\Mutex)
  Protected *Element.PointerQueue_ElementStruc = *PointerQueue\_First
  While *Element
    Counter + 1
    *Element = *Element\Next
  Wend
  UnlockMutex(*PointerQueue\Mutex)
  
  ProcedureReturn Counter
EndProcedure


; This queue is for single-thread use. It is NOT threadsafe!
;
;if FreeMemory = #True, the memory from the saved pointer of the elemente will be
;  automaticaly freed by calling DequeueString() and Free()
Procedure.i CreatePointerQueue(FreeMemory.i = #False)
  Protected *PointerQueue.PointerQueue_Struc = AllocateMemory(SizeOf(PointerQueue_Struc))
  
  If *PointerQueue
    *PointerQueue\VTable = ?PointerQueue_VTable
    *PointerQueue\FreeMemory = FreeMemory
    ProcedureReturn *PointerQueue
  EndIf
  
  ProcedureReturn #False
EndProcedure

; This queue is for multi-threading use. It is threadsafe in any way.
;
; if FreeMemory = #True, the memory from the saved pointer of the elemente will be
;   automaticaly freed by calling DequeueString() and Free()
Procedure.i CreatePointerQueueSave(FreeMemory.i = #False)
  Protected *PointerQueue.PointerQueue_Struc = AllocateMemory(SizeOf(PointerQueue_Struc))
  
  If *PointerQueue
    *PointerQueue\VTable = ?PointerQueue2_VTable
    
    *PointerQueue\FreeMemory = FreeMemory
    *PointerQueue\Mutex = CreateMutex()
    
    If *PointerQueue\Mutex
      ProcedureReturn *PointerQueue
    Else
      FreeMemory(*PointerQueue)
    EndIf
  EndIf
  
  ProcedureReturn #False
EndProcedure

; This queue can be used in multiple threads, but only one reading thread is
; allowed. This means only one thread is allowed to call Dequeue() and Get()!
; Anyway, the call of Enqueue() is threadsafe. This queue has only the following
; procedures: Enqueue(), Dequeue(), Get(), Free() and Count()
; But Count() will truly go through all elements to get the number of elements!
;
; if FreeMemory = #True, the memory from the saved pointer of the elemente will be
;   automaticaly freed by calling Free()
Procedure.i CreatePointerQueueUnsave(FreeMemory.i = #False)
  Protected *PointerQueue.PointerQueue_Struc = AllocateMemory(SizeOf(PointerQueue_Struc))
  
  If *PointerQueue
    *PointerQueue\VTable = ?PointerQueue3_VTable
    
    *PointerQueue\FreeMemory = FreeMemory
    *PointerQueue\Mutex = CreateMutex()
    *PointerQueue\Semaphore = CreateSemaphore()
    
    If *PointerQueue\Mutex
      ProcedureReturn *PointerQueue
    Else
      FreeMemory(*PointerQueue)
    EndIf
  EndIf
  
  ProcedureReturn #False
EndProcedure

DataSection
  PointerQueue_VTable:
  Data.i @PointerQueue_Enqueue()
  Data.i @PointerQueue_EnqueueString()
  Data.i @PointerQueue_Get()
  Data.i @PointerQueue_GetString()
  Data.i @PointerQueue_Dequeue()
  Data.i @PointerQueue_DequeueString()
  Data.i @PointerQueue_Free()
  Data.i @PointerQueue_Count()
  
  PointerQueue2_VTable:
  Data.i @PointerQueue_Enqueue2()
  Data.i @PointerQueue_EnqueueString2()
  Data.i @PointerQueue_Get2()
  Data.i @PointerQueue_GetString2()
  Data.i @PointerQueue_Dequeue2()
  Data.i @PointerQueue_DequeueString2()
  Data.i @PointerQueue_Free()
  Data.i @PointerQueue_Count()
  
  PointerQueue3_VTable:
  Data.i @PointerQueue_Enqueue3()
  Data.i 0
  Data.i @PointerQueue_Get()
  Data.i 0
  Data.i @PointerQueue_Dequeue3()
  Data.i 0
  Data.i @PointerQueue_Free()
  Data.i @PointerQueue_Count2()
EndDataSection


; example
; don't forget threadsafe-compiler-option!
CompilerIf #PB_Compiler_IsMainFile
  EnableExplicit
  Global *Queue.PointerQueue = CreatePointerQueueUnsave(#False)
  Global EndProgram.i = #False
  Structure SomeData
    i.i
    Time.i
  EndStructure
  
  
  Procedure ReadThread(Dummy.i)
    Protected *SomeData.SomeData
    Protected Count.i
    Protected i.i
    
    Repeat
      Count = *Queue\Count() - 1
      Debug "Count: " + Str(Count)
      For i = 0 To Count
        *SomeData = *Queue\Dequeue()
        If *SomeData
          If Random(Count) = 0
            Debug "Data: " + Str(*SomeData\i) + ". " + Str(*SomeData\Time)
          EndIf
          FreeMemory(*SomeData)
        EndIf
      Next
      Delay(Random(20))
    Until EndProgram
  EndProcedure
  
  Procedure WriteThread(Dummy.i)
    Protected *SomeData.SomeData
    Protected i.i
    
    Repeat
      *SomeData = AllocateMemory(SizeOf(SomeData)) 
      *SomeData\i = i
      *SomeData\Time = ElapsedMilliseconds()
      *Queue\Enqueue(*SomeData)
      If Random(10) = 0
        Delay(Random(2))
      EndIf
      i + 1
    Until EndProgram
  EndProcedure
  
  Procedure WaitTime(Time.i)
    Delay(Time)
    EndProgram = #True
  EndProcedure
  
  Define Thread1.i = CreateThread(@WriteThread(), #Null)
  Define Thread2.i = CreateThread(@WaitTime(), 10000)
  Define Thread3.i = CreateThread(@WriteThread(), #Null)
  ReadThread(#Null)
  WaitThread(Thread1)
  WaitThread(Thread2)
  WaitThread(Thread3)
  Debug "End"
CompilerEndIf
MFG PMV