OOP: Lock-free queue for two threads

Share your advanced PureBasic knowledge/code with the community.
Trond
Always Here
Always Here
Posts: 7446
Joined: Mon Sep 22, 2003 6:45 pm
Location: Norway

OOP: Lock-free queue for two threads

Post by Trond »

At least I hope it's safe... But no warranties!

Code: Select all

; Safe for either
;     1 pop/peek-only thread and 1 push-only thread
; or
;     1 thread (no restrictions)
; Note: \Free() is not threadsafe

Interface IQueue
  Push(*Data.l)
  Pop.l()
  Peek.l()
  Free.l()
EndInterface

Structure SQueue
  *Methods.l
  Size.l
  *Buffer.l
  *ReadPtr.Long
  *WritePtr.Long
EndStructure

Procedure CreateQueue(Items.l)
  Protected *Q.SQueue = AllocateMemory(SizeOf(SQueue))
  *Q\Methods = ?IQueue_VTable
  *Q\Size = Items*4
  *Q\Buffer = AllocateMemory(Items*4)
  *Q\ReadPtr = *Q\Buffer
  *Q\WritePtr = *Q\Buffer
  ProcedureReturn *Q
EndProcedure

Procedure IQueue_Push(*t.SQueue, *Dta.l)
  ; Don't write past the read pointer
  While *t\WritePtr+4 = *t\ReadPtr Or (*t\ReadPtr = *t\Buffer And *t\WritePtr+4 - *t\Size = *t\Buffer)
    Delay(1)
  Wend
  *t\WritePtr\l = *Dta
  If *t\WritePtr+4 - *t\Size = *t\Buffer
    *t\WritePtr = *t\Buffer
  Else
    *t\WritePtr+4
  EndIf
EndProcedure

Procedure IQueue_Peek(*t.SQueue)
  ; Don't read past the write pointer
  While *t\ReadPtr = *t\WritePtr
    Delay(1)
  Wend
  ProcedureReturn *t\ReadPtr\l
EndProcedure

Procedure IQueue_Pop(*t.SQueue)
  Protected *Dta.l
  *Dta = IQueue_Peek(*t)
  If *t\ReadPtr+4 - *t\Size = *t\Buffer
    *t\ReadPtr = *t\Buffer
  Else
    *t\ReadPtr+4
  EndIf
  ProcedureReturn *Dta
EndProcedure

Procedure IQueue_Free(*this.SQueue)
  FreeMemory(*this\Buffer)
  FreeMemory(*this) 
EndProcedure 

DataSection
  IQueue_VTable:
    Data.l @IQueue_Push()
    Data.l @IQueue_Pop()
    Data.l @IQueue_Peek()
    Data.l @IQueue_Free()
EndDataSection



;- Example ----------------

Procedure PushStuff(Queue.IQueue)
  Protected I
  For I = 0 To 20000
    Queue\Push(I)
  Next
EndProcedure

Procedure PopStuff(Queue.IQueue)
  Protected I
  OpenFile(0, "c:\out.txt")
  For I = 0 To 20000
    WriteStringN(0, Str(Queue\Pop()))
    ; Queue\Pop()
  Next
  CloseFile(0)
EndProcedure

Global Queue.IQueue = CreateQueue(1024*4)

T1 = CreateThread(@PushStuff(), Queue)
T2 = CreateThread(@PopStuff(), Queue)
WaitThread(T1)
WaitThread(T2)

Queue\Free()
Last edited by Trond on Mon Apr 07, 2008 7:44 pm, edited 1 time in total.
SFSxOI
Addict
Addict
Posts: 2970
Joined: Sat Dec 31, 2005 5:24 pm
Location: Where ya would never look.....

Post by SFSxOI »

OK, very neat. Thank You :)
User avatar
Hroudtwolf
Addict
Addict
Posts: 803
Joined: Sat Feb 12, 2005 3:35 am
Location: Germany(Hessen)
Contact:

Post by Hroudtwolf »

Hi and thanks for sharing.

Would Enqueue and Dequeue not be the adequate nomenclature instead of Push and Pop for a queue?

Best regards

Wolf
Trond
Always Here
Always Here
Posts: 7446
Joined: Mon Sep 22, 2003 6:45 pm
Location: Norway

Post by Trond »

C++ uses push/pop according to Wikipedia, so I just used that.
User avatar
Rook Zimbabwe
Addict
Addict
Posts: 4322
Joined: Tue Jan 02, 2007 8:16 pm
Location: Cypress TX
Contact:

Post by Rook Zimbabwe »

PUSH and POP are ASM commandset are they not? :D
Binarily speaking... it takes 10 to Tango!!!

Image
http://www.bluemesapc.com/
Trond
Always Here
Always Here
Posts: 7446
Joined: Mon Sep 22, 2003 6:45 pm
Location: Norway

Post by Trond »

Rook Zimbabwe wrote:PUSH and POP are ASM commandset are they not? :D
Well, yes, but that doesn't have anything to do with this.

I updated the \Free() function to work correctly, as it was leaking the whole buffer...
Trond
Always Here
Always Here
Posts: 7446
Joined: Mon Sep 22, 2003 6:45 pm
Location: Norway

Post by Trond »

This version would be slower, but it should work with any number of threads:

Code: Select all

; Safe for any number of threads
; Note: \Free() is not threadsafe

Interface IQueue
  Push(*Data.l)
  Pop.l()
  Peek.l()
  Free.l()
EndInterface

Structure SQueue
  *Methods.l
  Size.l
  *Buffer.l
  *ReadPtr.Long
  *WritePtr.Long
  WriteMutex.l
  ReadMutex.l
EndStructure

Procedure CreateQueue(Items.l)
  Protected *Q.SQueue = AllocateMemory(SizeOf(SQueue))
  *Q\Methods = ?IQueue_VTable
  *Q\Size = Items*4
  *Q\Buffer = AllocateMemory(Items*4)
  *Q\ReadPtr = *Q\Buffer
  *Q\WritePtr = *Q\Buffer
  *Q\WriteMutex = CreateMutex()
  *Q\ReadMutex = CreateMutex()
  ProcedureReturn *Q
EndProcedure

Procedure IQueue_Push(*t.SQueue, *Dta.l)
  ; Don't write past the read pointer
  LockMutex(*t\WriteMutex)
  While *t\WritePtr+4 = *t\ReadPtr Or (*t\ReadPtr = *t\Buffer And *t\WritePtr+4 - *t\Size = *t\Buffer)
    Delay(1)
  Wend
  *t\WritePtr\l = *Dta
  If *t\WritePtr+4 - *t\Size = *t\Buffer
    *t\WritePtr = *t\Buffer
  Else
    *t\WritePtr+4
  EndIf
  UnlockMutex(*t\WriteMutex)
EndProcedure

Procedure IQueue_Peek(*t.SQueue)
  ; Don't read past the write pointer
  LockMutex(*t\ReadMutex)
  While *t\ReadPtr = *t\WritePtr
    Delay(1)
  Wend
  UnlockMutex(*t\ReadMutex)
  ProcedureReturn *t\ReadPtr\l
EndProcedure

Procedure IQueue_Pop(*t.SQueue)
  Protected *Dta.l
  LockMutex(*t\ReadMutex)
  While *t\ReadPtr = *t\WritePtr
    Delay(1)
  Wend
  *Dta = *t\ReadPtr\l
  If *t\ReadPtr+4 - *t\Size = *t\Buffer
    *t\ReadPtr = *t\Buffer
  Else
    *t\ReadPtr+4
  EndIf
  UnlockMutex(*t\ReadMutex)
  ProcedureReturn *Dta
EndProcedure

Procedure IQueue_Free(*this.SQueue)
  FreeMutex(*this\WriteMutex)
  FreeMutex(*this\ReadMutex)
  FreeMemory(*this\Buffer)
  FreeMemory(*this) 
EndProcedure 

DataSection
  IQueue_VTable:
    Data.l @IQueue_Push()
    Data.l @IQueue_Pop()
    Data.l @IQueue_Peek()
    Data.l @IQueue_Free()
EndDataSection



;- Example ----------------

Procedure PushStuff(Queue.IQueue)
  Protected I
  For I = 0 To 20000
    Queue\Push(I)
  Next
EndProcedure

Procedure PopStuff(Queue.IQueue)
  Protected I
  For I = 0 To 20000
    WriteStringN(0, Str(Queue\Pop()))
    ; Queue\Pop()
  Next
EndProcedure

Global Queue.IQueue = CreateQueue(1024*4)

OpenFile(0, "c:\out.txt")

T1 = CreateThread(@PushStuff(), Queue)
T1 = CreateThread(@PushStuff(), Queue)
T2 = CreateThread(@PopStuff(), Queue)
T2 = CreateThread(@PopStuff(), Queue)
WaitThread(T1)
WaitThread(T2)

CloseFile(0)

Queue\Free()
User avatar
mk-soft
Always Here
Always Here
Posts: 6253
Joined: Fri May 12, 2006 6:51 pm
Location: Germany

Post by mk-soft »

Is not running without compileroptions threadsafe. why?

I have write Push and Pop Object with my OOP-PreCompiler. That's right :wink:

Stack.pb include

Code: Select all

;-TOP
; Kommentar     : Stack Function (Threadsafe)
; Author        : mk-soft
; Second Author : 
; Datei         : Stack.pb
; Version       : 1.01
; Erstellt      : 
; Geändert      :
; 
; Compilermode  :
;
; ***************************************************************************************


; ***************************************************************************************

EnableExplicit

#maxstack = 100

Class Stack Extends BaseClass
  mutex.l             ; Mutex handle
  size.l              ; Size of Data
  pstack.l            ; Stackpostion
  stack.l[#maxstack]  ; Array of Pointer to Data
EndClass

Method Stack_Init(*this.Stack, Size.l)
  *this\mutex = CreateMutex()
  *this\size = size
EndMethod

Method Stack_Push(*this.Stack, *pdata)
  LockMutex(*this\mutex)
  If *this\pstack < #maxstack
    *this\pstack + 1
    *this\stack[*this\pstack] = AllocateMemory(*this\size)
    If *this\stack[*this\pstack] 
      CopyMemory(*pdata, *this\stack[*this\pstack], *this\size)
      UnlockMutex(*this\mutex)
      ProcedureReturn #True
    Else
      *this\pstack - 1
      UnlockMutex(*this\mutex)
      ProcedureReturn #False
    EndIf
  Else
    UnlockMutex(*this\mutex)
    ProcedureReturn #False
  EndIf
EndMethod

Method Stack_Pop(*this.Stack, *pdata)
  LockMutex(*this\mutex)
  If *this\pstack > 0
    CopyMemory(*this\stack[*this\pstack], *pdata, *this\size)
    FreeMemory(*this\stack[*this\pstack])
    *this\pstack - 1
    UnlockMutex(*this\mutex)
    ProcedureReturn #True
  Else
    UnlockMutex(*this\mutex)
    ProcedureReturn #False
  EndIf
EndMethod

Method Stack_Peek(*this.Stack, *pdata)
  LockMutex(*this\mutex)
  If *this\pstack > 0
    CopyMemory(*this\stack[*this\pstack], *pdata, *this\size)
    UnlockMutex(*this\mutex)
    ProcedureReturn #True
  Else
    UnlockMutex(*this\mutex)
    ProcedureReturn #False
  EndIf

EndMethod

Method Stack_Free(*this.Stack)
  Protected i
  LockMutex(*this\mutex)
  For i = 1 To *this\pstack
    FreeMemory(*this\stack[i])
  Next
  *this\pstack = 0
  UnlockMutex(*this\mutex)
  ProcedureReturn #True  
EndMethod

Method Overwrite Stack_Release(*this.Stack)
  Protected *self.IStack = *this
  ; Decrease internal refcounter
  If *this\__Ref > 1 
    *this\__Ref - 1
    ProcedureReturn *this\__Ref
  Else
    *self\free()
    If *this\mutex
      FreeMutex(*this\mutex)
    EndIf
    ; Release Object
    ProcedureReturn DeleteObject(*this)
  EndIf

EndMethod

DisableExplicit

; ***************************************************************************************
Sample

Code: Select all

; Stack Sample

IncludeFile "Stack.pb"

Global exit

Define.IStack *Stack

Procedure WriteThread(*Stack.IStack)
  Protected value
  
  For value = 1 To 20
    *stack\Push(@value)
    Delay(500)
  Next
  
EndProcedure

Procedure ReadThread(*Stack.IStack)
  Protected Value
  Repeat
    If *Stack\Pop(@value)
      Debug Value
    EndIf
    Delay(100)
  Until Exit
EndProcedure


*stack.IStack = NewObject(Stack)
*stack\Init(SizeOf(long))

hThread0 = CreateThread(@WriteThread(), *Stack)
hThread1 = CreateThread(@WriteThread(), *Stack)
Delay(2000)
hThread2 = CreateThread(@ReadThread(), *Stack)

WaitThread(hThread0)
WaitThread(hThread1)
Exit = 1
WaitThread(hThread2)

GT :wink:
My Projects ThreadToGUI / OOP-BaseClass / EventDesigner V3
PB v3.30 / v5.75 - OS Mac Mini OSX 10.xx - VM Window Pro / Linux Ubuntu
Downloads on my Webspace / OneDrive
Trond
Always Here
Always Here
Posts: 7446
Joined: Mon Sep 22, 2003 6:45 pm
Location: Norway

Post by Trond »

mk-soft wrote:Is not running without compileroptions threadsafe. why?
Because of the file/string manipulations: WriteStringN(0, Str(Queue\Pop())). That must be protected with mutexes if you don't use threadsafe.
Post Reply