Correct processing of threads with semaphore or user events

Just starting out? Need help? Post your questions and find answers here.
flashbob
User
User
Posts: 92
Joined: Sat May 11, 2024 4:04 pm

Correct processing of threads with semaphore or user events

Post by flashbob »

I would like to start a thread multiple times (e.g. loop). The processing of a thread is ensured by “mutex”. Ok so far.
When the respective thread is finished, the global variables changed in the thread should be processed in the main routine (e.g. window events).

I'm currently testing with "SignalSemaphore" and "PostEvent". Depending on the elapsed time of the thread, the global variable "string" is overwritten by the 2nd thread and the first value is lost. Here's a small example:

Code: Select all

EnableExplicit

Enumeration FormGadget
  #wnd
  #Btn_thread
EndEnumeration

Enumeration #PB_Event_FirstCustomValue
  #EventThreadBegins
  #EventThreadEnds
EndEnumeration

Global Mutex     = CreateMutex()
Global Semaphore = CreateSemaphore()
Global string.s  = ""
Define event, thread1, thread2, thread3


Procedure ThreadProcedure(*value)     
  Protected a,b 
  
  LockMutex(Mutex) 
  ; do something
  For a = 1 To 2      
    For b = 1 To 5
      Debug "Thread "+Str(*value)+" Line "+Str(b)    
      
      Delay(1)   ; ****** test with diff values (1,10,50)
    Next b          
  Next a   
  ; var to be changed and used again in main program
  string = "MyThread "+Str(*value)
  
  UnlockMutex(Mutex)  
  ; signal/event processing finished
  SignalSemaphore(Semaphore)   
  PostEvent(#EventThreadEnds)
EndProcedure

; -------

OpenWindow(#wnd,50,50,200,150, "Thread", #PB_Window_SystemMenu)
ButtonGadget(#Btn_thread,25,50,120,25, "start threads")
Repeat 
  event = WaitWindowEvent(25) 
  Select event  
      
    Case #EventThreadBegins
      Debug "#EventThreadBegins : "+ string      
    Case #EventThreadEnds       
      Debug "#EventThreadEnds : "  + string 
    Case 0
    If TrySemaphore(Semaphore) 
      Debug "Semaphore : " + string 
    EndIf
	    
    Case #PB_Event_Gadget
      Select EventGadget()
        Case #Btn_thread 
          ClearDebugOutput()
          ; ------- Start Threads (later in loop)
          thread1 = CreateThread(@ThreadProcedure(), 1)
          Delay(25) ;Delay To ensure threads running sequentially
          thread2 = CreateThread(@ThreadProcedure(), 2)
          Delay(25)
          thread3 = CreateThread(@ThreadProcedure(), 3)
      EndSelect 
  EndSelect
Until event = #PB_Event_CloseWindow  

FreeMutex(Mutex)
FreeSemaphore(Semaphore)
Try to change the "delay()" in procedure to see the results...

What do I have to do to ensure that the variables of the first thread can be processed first by e.g a "PostEvent" and are not immediately overwritten by the second thread? Mutex alone doesn't seem to be enough here... At this point, thread 2 should wait until it is released. I don't want to use "WaitThread" at this point because I don't want the program to stop.

Thx in advance!
.
Last edited by flashbob on Sun Jul 07, 2024 9:15 pm, edited 1 time in total.
User avatar
NicTheQuick
Addict
Addict
Posts: 1527
Joined: Sun Jun 22, 2003 7:43 pm
Location: Germany, Saarbrücken
Contact:

Re: Correct processing of threads with semaphore or user events

Post by NicTheQuick »

If you want it to do exactly that way, you would need to signal the Semaphore after you've processed the event and also you need to wait inside the thread before you are allowed to change the global string. And to be able to start you also have to signal the semaphore once at the beginning or set the initial counter to 1.
I also had to remove the TrySemaphore because it does not work with it.

Code: Select all

EnableExplicit

Enumeration FormGadget
	#wnd
	#Btn_thread
EndEnumeration

Enumeration #PB_Event_FirstCustomValue
	#EventThreadBegins
	#EventThreadEnds
EndEnumeration

Global Mutex     = CreateMutex()
Global Semaphore = CreateSemaphore(1)
Global string.s  = ""
Define event, thread1, thread2, thread3

Procedure ThreadProcedure(*value)

	Protected a,b

	Shared Mutex

	LockMutex(Mutex)

	; do something
	For a = 1 To 2

		For b = 1 To 5
			Debug "Thread "+Str(*value)+" Line "+Str(b)

			Delay(1)   ; ****** test with diff values (1,10,50)
		Next b

	Next a
	WaitSemaphore(Semaphore)
	; var to be changed and used again in main program
	string = "MyThread "+Str(*value)

	UnlockMutex(Mutex)

	; signal/event processing finished
	PostEvent(#EventThreadEnds)
EndProcedure

; -------

OpenWindow(#wnd,50,50,200,150, "Thread", #PB_Window_SystemMenu)
ButtonGadget(#Btn_thread,25,50,120,25, "start threads")
Repeat

	event = WaitWindowEvent(25)

	Select event

		Case #EventThreadBegins
			Debug "#EventThreadBegins : "+ string

		Case #EventThreadEnds

			Debug "#EventThreadEnds : "  + string
			SignalSemaphore(Semaphore)

		Case #PB_Event_Gadget
			Select EventGadget()
				Case #Btn_thread

					ClearDebugOutput()
					; ------- Start Threads (later in loop)
					thread1 = CreateThread(@ThreadProcedure(), 1)
					Delay(25) ;Delay To ensure threads running sequentially
					thread2 = CreateThread(@ThreadProcedure(), 2)
					Delay(25)
					thread3 = CreateThread(@ThreadProcedure(), 3)
			EndSelect

	EndSelect
Until event = #PB_Event_CloseWindow

FreeMutex(Mutex)
FreeSemaphore(Semaphore)
You can also create an individual string per thread and send it with your event instead of using the semaphore at all:

Code: Select all

EnableExplicit

Enumeration FormGadget
	#wnd
	#Btn_thread
EndEnumeration

Structure ThreadData
	string.s
EndStructure

Enumeration #PB_Event_FirstCustomValue
	#EventThreadBegins
	#EventThreadEnds
EndEnumeration

Global Mutex     = CreateMutex()
Define event, thread1, thread2, thread3

Procedure ThreadProcedure(*value)
	Protected a, b
	Protected *threadData.ThreadData = AllocateStructure(ThreadData)
	Shared Mutex

	LockMutex(Mutex)

	; do something
	For a = 1 To 2
		For b = 1 To 5
			Debug "Thread "+Str(*value)+" Line "+Str(b)
			Delay(1)   ; ****** test with diff values (1,10,50)
		Next b
	Next a

	; var to be changed and used again in main program
	*threadData\string = "MyThread "+Str(*value)

	UnlockMutex(Mutex)

	PostEvent(#EventThreadEnds, #PB_Ignore, #PB_Ignore, #PB_Ignore, *threadData)
EndProcedure

; -------

OpenWindow(#wnd,50,50,200,150, "Thread", #PB_Window_SystemMenu)
ButtonGadget(#Btn_thread,25,50,120,25, "start threads")
Define *threadData.ThreadData
Repeat

	event = WaitWindowEvent(25)

	*threadData = EventData()
	Select event

		Case #EventThreadBegins
			Debug "#EventThreadBegins"
		Case #EventThreadEnds
			Debug "#EventThreadEnds"  + *threadData\string
			FreeStructure(*threadData)

		Case #PB_Event_Gadget
			Select EventGadget()
				Case #Btn_thread
					ClearDebugOutput()
					; ------- Start Threads (later in loop)
					thread1 = CreateThread(@ThreadProcedure(), 1)
					Delay(25) ;Delay To ensure threads running sequentially
					thread2 = CreateThread(@ThreadProcedure(), 2)
					Delay(25)
					thread3 = CreateThread(@ThreadProcedure(), 3)
			EndSelect

	EndSelect
Until event = #PB_Event_CloseWindow

FreeMutex(Mutex)
The english grammar is freeware, you can use it freely - But it's not Open Source, i.e. you can not change it or publish it in altered way.
flashbob
User
User
Posts: 92
Joined: Sat May 11, 2024 4:04 pm

Re: Correct processing of threads with semaphore or user events

Post by flashbob »

OK, now I understand why that didn't work. Example 1 seems to fit my purpose.
I'll test it further... Thank you for the quick response!
User avatar
idle
Always Here
Always Here
Posts: 6026
Joined: Fri Sep 21, 2007 5:52 am
Location: New Zealand

Re: Correct processing of threads with semaphore or user events

Post by idle »

@flashbob, I think it would be better if you described what you want to do.

here's an example of a thread pool with a fifo job queue mutex is only needed to protect the postevent

Code: Select all

EnableExplicit

Enumeration FormGadget
  #wnd
  #Btn_thread
EndEnumeration

Enumeration #PB_Event_FirstCustomValue
  #EventThreadBegins
  #EventThreadEnds
EndEnumeration

Global Mutex     = CreateMutex()
Global Semaphore = CreateSemaphore()
Define event

Structure ThreadData
  index.i ;index of thread pool array
  tid.i   ;thread id 
  win.i   ;host window 
  sem.i   ;semephore to wait on job 
  value.s ;the threads data to work on  
  jobID.i ;job id 
  done.i  ;state of thread 0 means it's busy 
  kill.i  ;kill it off after it's done  
EndStructure   
  
Procedure ThreadProcedure(*t.ThreadData)     
  
  Protected a ,b
  Repeat 
    
    WaitSemaphore(*t\sem)   ;wait here until a job is available
    *t\done = 0             ;indicate we're busy   
    
    If *t\kill = 0 
      
      ; do something
      For a = 1 To 2      
        For b = 1 To 5
          Debug "Thread " + *t\value +" Line "+Str(b)    
          Delay(100)   
        Next b          
      Next a   
      ; var to be changed and used again in main program
      *t\value + Str(*t\jobID) ;set the result  
            
      LockMutex(mutex) 
        PostEvent(#EventThreadEnds,*t\win,0,0,*t)
      UnlockMutex(Mutex)  
      
      *t\done = 1  ;say were ready 
      
    EndIf 
    
  Until *t\kill 
  
EndProcedure

Global Dim ThreadPool.ThreadData(0), *t.ThreadData ,jobct 
Global NewList jobs.i()

Procedure CreateThreadPool(ct,*address,window)
  Protected a 
  ct-1
  ReDim ThreadPool(ct) 
  For a = 0 To ct
    ThreadPool(a)\done = 1 
    ThreadPool(a)\sem = CreateSemaphore() 
    ThreadPool(a)\index = a
    ThreadPool(a)\tid = CreateThread(*address,@ThreadPool(a))  
  Next 
EndProcedure   

OpenWindow(#wnd,50,50,200,150, "Thread", #PB_Window_SystemMenu)
ButtonGadget(#Btn_thread,25,50,120,25, "start threads")

CreateThreadPool(3,@ThreadProcedure(),#wnd) 

Define a 

Repeat 
  event = WaitWindowEvent(25) 
  Select event  
      
    Case #EventThreadBegins
      *t.ThreadData = EventData() 
      Debug "#EventThreadBegins : "+ *t\value  +  " " + *t\index     
    Case #EventThreadEnds       
      *t.ThreadData = EventData() 
      Debug "#EventThreadEnds : "  + *t\value  +  " " + *t\index 
        	    
    Case #PB_Event_Gadget
      Select EventGadget()
        Case #Btn_thread 
          ClearDebugOutput()    
          LastElement(jobs())    ;add a job to the queue   
          AddElement(jobs()) 
          jobs() = jobct             
          jobct+1 
      EndSelect
    Case 0 
      If ListSize(jobs())   ;try to dequeue jobs                    
        FirstElement(jobs())         
        For a = 0 To ArraySize(ThreadPool()) ;find a free thread if any   
          If ThreadPool(a)\done 
            ThreadPool(a)\value = "hello " ;set job data here copy structure  
            ThreadPool(a)\jobID = jobs() 
            DeleteElement(jobs()) 
            SignalSemaphore(ThreadPool(a)\sem) ;process the job 
            Break 
          EndIf 
        Next 
      EndIf   
            
  EndSelect
Until event = #PB_Event_CloseWindow  

For a = 0 To ArraySize(ThreadPool()) 
  ThreadPool(a)\kill = 1 
  SignalSemaphore(ThreadPool(a)\sem) 
  WaitThread(ThreadPool(a)\tid,1000) 
  FreeSemaphore(ThreadPool(a)\sem) 
Next 
Debug "done"  
  
FreeMutex(Mutex)



hdt888
User
User
Posts: 57
Joined: Sun Jul 07, 2024 8:42 am

Re: Correct processing of threads with semaphore or user events

Post by hdt888 »

using Mini Thread Control (mk-soft) ^^
PB 5.x + 6.x + Win10. Feel the ...Pure... Power.
flashbob
User
User
Posts: 92
Joined: Sat May 11, 2024 4:04 pm

Re: Correct processing of threads with semaphore or user events

Post by flashbob »

@Idle

I would like to run a task several times using a button or a procedure (loop). However, the main program should not stand still. And I use only one procedure running as thread. I think that a thread manager would be too oversized here.

My example at the top shows that under certain circumstances it can be the case that the second thread starts before the first one is finished. This means that events from the first thread may not arrive.

The goal was to process a thread cleanly, to process the changed variables in the event loop or a procedure (e.g. updating gadgets or updating a database). This seems to work well with "Semaphore". Sure, structures can also be used, but more memory would be required if larger amounts of data were used in the structures...
User avatar
NicTheQuick
Addict
Addict
Posts: 1527
Joined: Sun Jun 22, 2003 7:43 pm
Location: Germany, Saarbrücken
Contact:

Re: Correct processing of threads with semaphore or user events

Post by NicTheQuick »

Today nobody cares about about a bit more data in an additional structure. :wink:
The english grammar is freeware, you can use it freely - But it's not Open Source, i.e. you can not change it or publish it in altered way.
User avatar
spikey
Enthusiast
Enthusiast
Posts: 778
Joined: Wed Sep 22, 2010 1:17 pm
Location: United Kingdom

Re: Correct processing of threads with semaphore or user events

Post by spikey »

It sounds to me like you want a queue. If you make your queue structure more complex than I have, or implement several different #UpdateAvailable event types you should be able to achieve whatever degree of control you need.

Code: Select all

EnableExplicit

#UpdateAvailable = #PB_Event_FirstCustomValue
; #UpdateAvailableGadget..#UpdateAvailableDatabase...

Global wdwDemo
Global lvwMessages, btnStart, btnStop
Global QuitFlag, StopFlag, MessagesMutex
Define ThreadCounter, Event, Gadget
Threaded MyNumber, MyDelay

Global NewList Messages.s()
NewList Threads.i()

Procedure thrTest(Number)
  
  MyNumber = Number
  
  ; Add a startup message.
  LockMutex(MessagesMutex)
  LastElement(Messages())
  AddElement(Messages())
  Messages() = "Thread number " + Str(MyNumber) + " has started up."
  PostEvent(#UpdateAvailable, wdwDemo, 0)
  UnlockMutex(MessagesMutex)
  
  Repeat 
    
    MyDelay = Random(5, 2) * 1000
    
    Delay(MyDelay)
    
    ; Add in-process message
    LockMutex(MessagesMutex)
    LastElement(Messages())
    AddElement(Messages())
    Messages() = "Hello from thread " + Str(MyNumber) + ". Sleeping for " + Str(MyDelay) + "ms."
    PostEvent(#UpdateAvailable, wdwDemo, 0)
    UnlockMutex(MessagesMutex)
    
  Until StopFlag = #True
  
  ; Add an end message.
  LockMutex(MessagesMutex)
  LastElement(Messages())
  AddElement(Messages())
  Messages() = "Thread number " + Str(MyNumber) + " is ending."
  PostEvent(#UpdateAvailable, wdwDemo, 0)
  UnlockMutex(MessagesMutex)
  
EndProcedure

;- Main
MessagesMutex = CreateMutex()
wdwDemo = OpenWindow(#PB_Any, 50, 50, 600, 400, "Queue Demo", #PB_Window_SystemMenu | #PB_Window_SizeGadget)
lvwMessages = ListViewGadget(#PB_Any, 10, 10, 580, 340)
btnStart = ButtonGadget(#PB_Any, 10, 360, 140, 30, "Start a new thread")
btnStop = ButtonGadget(#PB_Any, 170, 360, 140, 30, "Stop all threads")

Repeat
  
  Event = WaitWindowEvent()
  
  Select Event
      
    Case #PB_Event_CloseWindow
      StopFlag = #True
      QuitFlag = #True
      
    Case #PB_Event_SizeWindow
      ResizeGadget(btnStart, #PB_Ignore, WindowHeight(wdwDemo) - 40, #PB_Ignore, #PB_Ignore)
      ResizeGadget(btnStop, #PB_Ignore, WindowHeight(wdwDemo) - 40, #PB_Ignore, #PB_Ignore)
      ResizeGadget(lvwMessages, #PB_Ignore, #PB_Ignore, WindowWidth(wdwDemo) - 20, WindowHeight(wdwDemo) - 60)
      
    Case #PB_Event_Gadget
      Gadget = EventGadget()
      Select Gadget
          
        Case btnStart
          StopFlag = #False
          ThreadCounter + 1
          AddElement(Threads())
          Threads() = CreateThread(@thrTest(), ThreadCounter)
          
        Case btnStop
          StopFlag = #True
          
      EndSelect
      
    Case #UpdateAvailable
      ; Send all pending messages to the gadget and then clear the list.
      LockMutex(MessagesMutex)
      ForEach(Messages())
        AddGadgetItem(lvwMessages, -1, Messages())
      Next Messages()
      ClearList(Messages())
      UnlockMutex(MessagesMutex)
      
  EndSelect
  
Until QuitFlag = #True

; Wait for threads to end naturally, then kill any left (there shouldn't be any unless one went to sleep for longer than 3000ms at this point.)
AddGadgetItem(lvwMessages, -1, "Waiting for threads to end.")
Delay(3000)
ForEach Threads()
  If IsThread(Threads())
    Debug "Killing " + Str(Threads())
    KillThread(Threads())
  EndIf
Next Threads()
User avatar
Erolcum
User
User
Posts: 51
Joined: Fri Jun 07, 2024 10:45 am
Location: Turkiye
Contact:

Re: Correct processing of threads with semaphore or user events

Post by Erolcum »

@idle, your code, thread pool is so good. It is working and very useful. Thank you so much…
I try to understand when to use mutex, so a postevent should be protected in a multi thread procedure
You may visit my new Purebasic blog here..
:arrow: https://erolcum-github-io.translate.goo ... r_pto=wapp
User avatar
mk-soft
Always Here
Always Here
Posts: 6320
Joined: Fri May 12, 2006 6:51 pm
Location: Germany

Re: Correct processing of threads with semaphore or user events

Post by mk-soft »

The PostEvent is already ThreadSafe.
Access to global variables, lists, maps and arrays must be protected with mutex.
My Projects ThreadToGUI / OOP-BaseClass / EventDesigner V3
PB v3.30 / v5.75 - OS Mac Mini OSX 10.xx - VM Window Pro / Linux Ubuntu
Downloads on my Webspace / OneDrive
User avatar
idle
Always Here
Always Here
Posts: 6026
Joined: Fri Sep 21, 2007 5:52 am
Location: New Zealand

Re: Correct processing of threads with semaphore or user events

Post by idle »

The mutex probably isn't needed if compiled with thread safe. Any time you have multiple writer threads, a mutex is a good idea, I don't know what's under the hood of postevent so better safe than sorry.
User avatar
Erolcum
User
User
Posts: 51
Joined: Fri Jun 07, 2024 10:45 am
Location: Turkiye
Contact:

Re: Correct processing of threads with semaphore or user events

Post by Erolcum »

I extented the thread pool code. But strange things happened.. Can you guide me if possible ? About the usage of mutex.
You can see the codes and a problematic picture, in the link (after 6-7 clicks to button) ..
error : 2 datas has been sent to 1 printer in the picture (info: 900) with 1 click.
You need to wait between clicks (to finish process) Client sends 10 different jobs to 10 different printers. To simulate printers, you can use the server code in the same link. Printed data and printer name should match. Printer-01 should get data 100, Printer-02 should get data 200 etc..
Each printer should only get 1 job with a button click.
I played with the timeout value in OpenNetworkConnection to catch the problems. (It should be 5000 normally I know)
https://erolcum.blogspot.com/2024/07/for-testing.html
You may visit my new Purebasic blog here..
:arrow: https://erolcum-github-io.translate.goo ... r_pto=wapp
User avatar
idle
Always Here
Always Here
Posts: 6026
Joined: Fri Sep 21, 2007 5:52 am
Location: New Zealand

Re: Correct processing of threads with semaphore or user events

Post by idle »

Erolcum wrote: Thu Jul 25, 2024 11:44 am I extented the thread pool code. But strange things happened.. Can you guide me if possible ? About the usage of mutex.
You can see the codes and a problematic picture, in the link (after 6-7 clicks to button) ..
error : 2 datas has been sent to 1 printer in the picture (info: 900) with 1 click.
You need to wait between clicks (to finish process) Client sends 10 different jobs to 10 different printers. To simulate printers, you can use the server code in the same link. Printed data and printer name should match. Printer-01 should get data 100, Printer-02 should get data 200 etc..
Each printer should only get 1 job with a button click.
I played with the timeout value in OpenNetworkConnection to catch the problems. (It should be 5000 normally I know)
https://erolcum.blogspot.com/2024/07/for-testing.html
I'll PM you the code since you didn't post it here
Post Reply