Page 1 of 1

OOP: Lock-free queue for two threads

Posted: Sun Apr 06, 2008 4:25 pm
by Trond
At least I hope it's safe... But no warranties!

Code: Select all

; Safe for either
;     1 pop/peek-only thread and 1 push-only thread
; or
;     1 thread (no restrictions)
; Note: \Free() is not threadsafe

Interface IQueue
  Push(*Data.l)
  Pop.l()
  Peek.l()
  Free.l()
EndInterface

Structure SQueue
  *Methods.l
  Size.l
  *Buffer.l
  *ReadPtr.Long
  *WritePtr.Long
EndStructure

Procedure CreateQueue(Items.l)
  Protected *Q.SQueue = AllocateMemory(SizeOf(SQueue))
  *Q\Methods = ?IQueue_VTable
  *Q\Size = Items*4
  *Q\Buffer = AllocateMemory(Items*4)
  *Q\ReadPtr = *Q\Buffer
  *Q\WritePtr = *Q\Buffer
  ProcedureReturn *Q
EndProcedure

Procedure IQueue_Push(*t.SQueue, *Dta.l)
  ; Don't write past the read pointer
  While *t\WritePtr+4 = *t\ReadPtr Or (*t\ReadPtr = *t\Buffer And *t\WritePtr+4 - *t\Size = *t\Buffer)
    Delay(1)
  Wend
  *t\WritePtr\l = *Dta
  If *t\WritePtr+4 - *t\Size = *t\Buffer
    *t\WritePtr = *t\Buffer
  Else
    *t\WritePtr+4
  EndIf
EndProcedure

Procedure IQueue_Peek(*t.SQueue)
  ; Don't read past the write pointer
  While *t\ReadPtr = *t\WritePtr
    Delay(1)
  Wend
  ProcedureReturn *t\ReadPtr\l
EndProcedure

Procedure IQueue_Pop(*t.SQueue)
  Protected *Dta.l
  *Dta = IQueue_Peek(*t)
  If *t\ReadPtr+4 - *t\Size = *t\Buffer
    *t\ReadPtr = *t\Buffer
  Else
    *t\ReadPtr+4
  EndIf
  ProcedureReturn *Dta
EndProcedure

Procedure IQueue_Free(*this.SQueue)
  FreeMemory(*this\Buffer)
  FreeMemory(*this) 
EndProcedure 

DataSection
  IQueue_VTable:
    Data.l @IQueue_Push()
    Data.l @IQueue_Pop()
    Data.l @IQueue_Peek()
    Data.l @IQueue_Free()
EndDataSection



;- Example ----------------

Procedure PushStuff(Queue.IQueue)
  Protected I
  For I = 0 To 20000
    Queue\Push(I)
  Next
EndProcedure

Procedure PopStuff(Queue.IQueue)
  Protected I
  OpenFile(0, "c:\out.txt")
  For I = 0 To 20000
    WriteStringN(0, Str(Queue\Pop()))
    ; Queue\Pop()
  Next
  CloseFile(0)
EndProcedure

Global Queue.IQueue = CreateQueue(1024*4)

T1 = CreateThread(@PushStuff(), Queue)
T2 = CreateThread(@PopStuff(), Queue)
WaitThread(T1)
WaitThread(T2)

Queue\Free()

Posted: Sun Apr 06, 2008 6:40 pm
by SFSxOI
OK, very neat. Thank You :)

Posted: Sun Apr 06, 2008 6:51 pm
by Hroudtwolf
Hi and thanks for sharing.

Would Enqueue and Dequeue not be the adequate nomenclature instead of Push and Pop for a queue?

Best regards

Wolf

Posted: Sun Apr 06, 2008 7:07 pm
by Trond
C++ uses push/pop according to Wikipedia, so I just used that.

Posted: Mon Apr 07, 2008 3:22 pm
by Rook Zimbabwe
PUSH and POP are ASM commandset are they not? :D

Posted: Mon Apr 07, 2008 7:45 pm
by Trond
Rook Zimbabwe wrote:PUSH and POP are ASM commandset are they not? :D
Well, yes, but that doesn't have anything to do with this.

I updated the \Free() function to work correctly, as it was leaking the whole buffer...

Posted: Mon Apr 07, 2008 7:55 pm
by Trond
This version would be slower, but it should work with any number of threads:

Code: Select all

; Safe for any number of threads
; Note: \Free() is not threadsafe

Interface IQueue
  Push(*Data.l)
  Pop.l()
  Peek.l()
  Free.l()
EndInterface

Structure SQueue
  *Methods.l
  Size.l
  *Buffer.l
  *ReadPtr.Long
  *WritePtr.Long
  WriteMutex.l
  ReadMutex.l
EndStructure

Procedure CreateQueue(Items.l)
  Protected *Q.SQueue = AllocateMemory(SizeOf(SQueue))
  *Q\Methods = ?IQueue_VTable
  *Q\Size = Items*4
  *Q\Buffer = AllocateMemory(Items*4)
  *Q\ReadPtr = *Q\Buffer
  *Q\WritePtr = *Q\Buffer
  *Q\WriteMutex = CreateMutex()
  *Q\ReadMutex = CreateMutex()
  ProcedureReturn *Q
EndProcedure

Procedure IQueue_Push(*t.SQueue, *Dta.l)
  ; Don't write past the read pointer
  LockMutex(*t\WriteMutex)
  While *t\WritePtr+4 = *t\ReadPtr Or (*t\ReadPtr = *t\Buffer And *t\WritePtr+4 - *t\Size = *t\Buffer)
    Delay(1)
  Wend
  *t\WritePtr\l = *Dta
  If *t\WritePtr+4 - *t\Size = *t\Buffer
    *t\WritePtr = *t\Buffer
  Else
    *t\WritePtr+4
  EndIf
  UnlockMutex(*t\WriteMutex)
EndProcedure

Procedure IQueue_Peek(*t.SQueue)
  ; Don't read past the write pointer
  LockMutex(*t\ReadMutex)
  While *t\ReadPtr = *t\WritePtr
    Delay(1)
  Wend
  UnlockMutex(*t\ReadMutex)
  ProcedureReturn *t\ReadPtr\l
EndProcedure

Procedure IQueue_Pop(*t.SQueue)
  Protected *Dta.l
  LockMutex(*t\ReadMutex)
  While *t\ReadPtr = *t\WritePtr
    Delay(1)
  Wend
  *Dta = *t\ReadPtr\l
  If *t\ReadPtr+4 - *t\Size = *t\Buffer
    *t\ReadPtr = *t\Buffer
  Else
    *t\ReadPtr+4
  EndIf
  UnlockMutex(*t\ReadMutex)
  ProcedureReturn *Dta
EndProcedure

Procedure IQueue_Free(*this.SQueue)
  FreeMutex(*this\WriteMutex)
  FreeMutex(*this\ReadMutex)
  FreeMemory(*this\Buffer)
  FreeMemory(*this) 
EndProcedure 

DataSection
  IQueue_VTable:
    Data.l @IQueue_Push()
    Data.l @IQueue_Pop()
    Data.l @IQueue_Peek()
    Data.l @IQueue_Free()
EndDataSection



;- Example ----------------

Procedure PushStuff(Queue.IQueue)
  Protected I
  For I = 0 To 20000
    Queue\Push(I)
  Next
EndProcedure

Procedure PopStuff(Queue.IQueue)
  Protected I
  For I = 0 To 20000
    WriteStringN(0, Str(Queue\Pop()))
    ; Queue\Pop()
  Next
EndProcedure

Global Queue.IQueue = CreateQueue(1024*4)

OpenFile(0, "c:\out.txt")

T1 = CreateThread(@PushStuff(), Queue)
T1 = CreateThread(@PushStuff(), Queue)
T2 = CreateThread(@PopStuff(), Queue)
T2 = CreateThread(@PopStuff(), Queue)
WaitThread(T1)
WaitThread(T2)

CloseFile(0)

Queue\Free()

Posted: Sun Apr 20, 2008 1:23 pm
by mk-soft
Is not running without compileroptions threadsafe. why?

I have write Push and Pop Object with my OOP-PreCompiler. That's right :wink:

Stack.pb include

Code: Select all

;-TOP
; Kommentar     : Stack Function (Threadsafe)
; Author        : mk-soft
; Second Author : 
; Datei         : Stack.pb
; Version       : 1.01
; Erstellt      : 
; GeƤndert      :
; 
; Compilermode  :
;
; ***************************************************************************************


; ***************************************************************************************

EnableExplicit

#maxstack = 100

Class Stack Extends BaseClass
  mutex.l             ; Mutex handle
  size.l              ; Size of Data
  pstack.l            ; Stackpostion
  stack.l[#maxstack]  ; Array of Pointer to Data
EndClass

Method Stack_Init(*this.Stack, Size.l)
  *this\mutex = CreateMutex()
  *this\size = size
EndMethod

Method Stack_Push(*this.Stack, *pdata)
  LockMutex(*this\mutex)
  If *this\pstack < #maxstack
    *this\pstack + 1
    *this\stack[*this\pstack] = AllocateMemory(*this\size)
    If *this\stack[*this\pstack] 
      CopyMemory(*pdata, *this\stack[*this\pstack], *this\size)
      UnlockMutex(*this\mutex)
      ProcedureReturn #True
    Else
      *this\pstack - 1
      UnlockMutex(*this\mutex)
      ProcedureReturn #False
    EndIf
  Else
    UnlockMutex(*this\mutex)
    ProcedureReturn #False
  EndIf
EndMethod

Method Stack_Pop(*this.Stack, *pdata)
  LockMutex(*this\mutex)
  If *this\pstack > 0
    CopyMemory(*this\stack[*this\pstack], *pdata, *this\size)
    FreeMemory(*this\stack[*this\pstack])
    *this\pstack - 1
    UnlockMutex(*this\mutex)
    ProcedureReturn #True
  Else
    UnlockMutex(*this\mutex)
    ProcedureReturn #False
  EndIf
EndMethod

Method Stack_Peek(*this.Stack, *pdata)
  LockMutex(*this\mutex)
  If *this\pstack > 0
    CopyMemory(*this\stack[*this\pstack], *pdata, *this\size)
    UnlockMutex(*this\mutex)
    ProcedureReturn #True
  Else
    UnlockMutex(*this\mutex)
    ProcedureReturn #False
  EndIf

EndMethod

Method Stack_Free(*this.Stack)
  Protected i
  LockMutex(*this\mutex)
  For i = 1 To *this\pstack
    FreeMemory(*this\stack[i])
  Next
  *this\pstack = 0
  UnlockMutex(*this\mutex)
  ProcedureReturn #True  
EndMethod

Method Overwrite Stack_Release(*this.Stack)
  Protected *self.IStack = *this
  ; Decrease internal refcounter
  If *this\__Ref > 1 
    *this\__Ref - 1
    ProcedureReturn *this\__Ref
  Else
    *self\free()
    If *this\mutex
      FreeMutex(*this\mutex)
    EndIf
    ; Release Object
    ProcedureReturn DeleteObject(*this)
  EndIf

EndMethod

DisableExplicit

; ***************************************************************************************
Sample

Code: Select all

; Stack Sample

IncludeFile "Stack.pb"

Global exit

Define.IStack *Stack

Procedure WriteThread(*Stack.IStack)
  Protected value
  
  For value = 1 To 20
    *stack\Push(@value)
    Delay(500)
  Next
  
EndProcedure

Procedure ReadThread(*Stack.IStack)
  Protected Value
  Repeat
    If *Stack\Pop(@value)
      Debug Value
    EndIf
    Delay(100)
  Until Exit
EndProcedure


*stack.IStack = NewObject(Stack)
*stack\Init(SizeOf(long))

hThread0 = CreateThread(@WriteThread(), *Stack)
hThread1 = CreateThread(@WriteThread(), *Stack)
Delay(2000)
hThread2 = CreateThread(@ReadThread(), *Stack)

WaitThread(hThread0)
WaitThread(hThread1)
Exit = 1
WaitThread(hThread2)

GT :wink:

Posted: Sun Apr 20, 2008 5:26 pm
by Trond
mk-soft wrote:Is not running without compileroptions threadsafe. why?
Because of the file/string manipulations: WriteStringN(0, Str(Queue\Pop())). That must be protected with mutexes if you don't use threadsafe.