Muilt-Threaded Job Managment System

Share your advanced PureBasic knowledge/code with the community.
Dreglor
Enthusiast
Enthusiast
Posts: 759
Joined: Sat Aug 02, 2003 11:22 pm
Location: OR, USA

Muilt-Threaded Job Managment System

Post by Dreglor »

The huge comment at the beginning of the code explains most of it,
but the basic idea of this code is to define a system to enable better use of Hyper-Threaded/Muilt-Core CPU's by handing "jobs" at them you can allows for faster/easier muilt-thread Application development.

as an example let's say you want to compress and encode a video file
you can set up a job per frame of video and audio sample
and let the job system take care of how to distribute the work.
despite the technical details of the example, it still a handy system

if you have any improvments or fixes you are free to edit the code

Code: Select all

;////////////////////////////////////////////////////////////////
;//
;// Project Title: Job Manager
;// Filename: JobManager.pbi
;// Version: 1.0.0.0
;// Date: 11-01-06
;// Author: Steven (Dreglor) Garcia
;//
;////////////////////////////////////////////////////////////////

;
;This file enables a easy way to make your program take advantage of Hyper-Threaded/Muilt-Core CPU's
;what it does it allow you to set up jobs that can be run on in a varitiy of ways
;one shot, event driven, or perstaint. What makes this useful is that the jobs are thread independent.
;which basicly meaning that none of the threads will be idle while jobs exist (unless jobs were defined to a specific handler)
;which is a effient way to process data asynchronously, even if you need a the data to be in sync
;there are ways to accomplish that, the easisest way is to set up an series of jobs that sends out completion events. monitor 
;for the events and then send out an event to initate the next step in the processing of the data
;another way is to set up a few more mutex handles to hold the job until it is ready for the data
;note: that when executing jobs you must be aware of that access of varaibles, arrays, lists, memory blocks  
;outside of the job procedure must be accessed one at a time so you must set up a mutex to protect them
;if still have crashes you can try to enable thread-safe in compiler options, also note that window/gadget libaries
;will not respond while in jobs, unless created in the same handler, setting Affinity can be used to make sure that the
;window/gadget functions execute from the same thread every time
;there are also a few things in the purebasic help file on threads that explain things that you should be aware of.
;

;- Constants

;Job type is how the job threads handle jobs on the list
Enumeration
  #JobType_Null ;is not processed, empty job, deleted when found
  #JobType_OneShot ;processed and is removed from the job list
  #JobType_Persistant ;processed when ever a thread is free
  #JobType_OnEventOnce ;procesed when event is found and is removed
  #JobType_OnEventPersistant ;processed when ever the given event is found
  #JobType_Shutdown ;this is a special event called shutting down the threads it makes sure that the threads are freed properly
  #JobType_Stop ;a special event to pause the job handling until the the start event is found again
  #JobType_Start ;another special event to start the previosly paused job handling
EndEnumeration  

;these are just defined time when a job is added the list is sorted based 
;on the Priority number so the user can make up there own system
;note there can only be 255 levels of Priorities (starting from negitive)
Enumeration -5
  #JobPriority_DoNotCare
  #JobPriority_Idle
  #JobPriority_VeryLow
  #JobPriority_Lower
  #JobPriority_Low
  #JobPriority_Normal
  #JobPriority_High
  #JobPriority_Higher
  #JobPriority_VeryHigh
  #JobPriority_RealTime
  #JobPriority_TimeCritcal
EndEnumeration

#JobHandlerCPUSaver = 1

;- ProtoType

Prototype JobFunction(Parameters.l, Handler.b = #Null) ;if you need more just use a structure and pass a pointer

;- Structures

Structure Job
  Type.b ;Job type, defines what type of job it is and how it is handled
  Priority.b ;job priority, higher the Priority the higher on the list it will be and closer to being the first one to be executed
  Affinity.b ;if a given job wants to run on a certain thread it can, otherwise it ussally is #PB_Any to use any thread
  OnEvent.l ;if the job is OnEvent this is set to the event the user has set the job will only execute when that event is found
  CompletionEvent.l ;user defined event when completed, use #NULL not to push an event
  Function.JobFunction ;function to call when job is processed
  Parameter.l ;the parameter to pass to the job function
EndStructure

;- Globals

Global NewList OnEvent.l()
Global NewList CompletionEvent.l()
Global NewList Job.Job()

Global JobMutex
Global OnEventMutex
Global CompletionEventMutex

;- Procedures

Procedure JobManager_HandlerThread(Affinity.b)
  Debug "Job Handler: " + Str(Affinity) + " Initilized"
  
  MyJob.Job
  JobProcessing.b = #True
  
  Repeat
    Delay(#JobHandlerCPUSaver) ;simple way to reduce CPU usage
    ;we don't want to cook the CPU not doing anything
    
    LockMutex(JobMutex)
    
    ForEach Job() ;look for a job that this thread can process
      
      If Job()\Affinity <> #PB_Any And Job()\Affinity <> Affinity
        Continue ;next element if not this job is for another thread
      EndIf
      
      If Job()\Type = #JobType_OnEventOnce Or Job()\Type = #JobType_OnEventPersistant
        Event.l = #False
        LockMutex(OnEventMutex)
        
        ForEach OnEvent()
          If Job()\OnEvent = OnEvent()
            Event = #True
            DeleteElement(OnEvent())
            Break
          EndIf
        Next
        
        UnlockMutex(OnEventMutex)
        
        If Event = #False
          Continue ;next element if this job event has not been pushed
        EndIf
        
      EndIf
      
      If Job()\Type = #JobType_Start
        
        Debug "Job Handler: " + Str(Affinity) + " Resumed Processing"
        JobProcessing = #True
        
      ElseIf Job()\Type = #JobType_Stop
        
        Debug "Job Handler: " + Str(Affinity) + " Paused Processing"
        JobProcessing = #False
        MyJob\Type = #JobType_Null ;Make sure that nothing will be processed
        
      ElseIf Job()\Type = #JobType_Shutdown
        Debug "Job Handler: " + Str(Affinity) + " Shutting Down"
        UnlockMutex(JobMutex)
        ProcedureReturn
        
      EndIf
      
      If JobProcessing = #False
        Continue
      EndIf
      
      ;the current job can be processed
      
      CopyMemory(@Job(), MyJob, SizeOf(Job)) ;we copy it over to process the job after we release the mutex
      ;it allows for faster job processing
      
      If Job()\Type <> #JobType_Persistant And Job()\Type <> #JobType_OnEventPersistant And Job()\Type <> #JobType_Shutdown ;if not Persistant remove job
        DeleteElement(Job())
      EndIf
      
      Break ;no need to process the rest of the jobs
    Next
    
    UnlockMutex(JobMutex)
    
    Select MyJob\Type
      Case #JobType_Null
        ;nothing to be done here
        Continue
      Case #JobType_OneShot, #JobType_Persistant, #JobType_OnEventOnce, #JobType_OnEventPersistant
        If JobProcessing = #True
          MyJob\Function(Parameter, Affinity) ;do the job
          
          If MyJob\CompletionEvent <> #Null
            LockMutex(CompletionEventMutex)
            
            AddElement(CompletionEvent())
            CompletionEvent() = MyJob\CompletionEvent ;put the user defined completion event into the Completion Event stack
            
            UnlockMutex(CompletionEventMutex)
          EndIf
        EndIf
      Case #JobType_Shutdown
        ;should be taken care of already
      Case #JobType_Stop
        ;should be taken care of already
      Case #JobType_Start
        ;should be taken care of already
      Default
        Debug "Job Handler: " + Str(Affinity) + " Error: Unknown Job Type Passed"
    EndSelect
    
    MyJob\Type = #Null ;make sure that the job is processed once
  ForEver
EndProcedure

Procedure JobManager_CreateHandler()
  Static Affinity
  CreateThread(@JobManager_HandlerThread(), Affinity)
  Affinity + 1
EndProcedure

Procedure JobManager_AddJob(Function.JobFunction, Parameter.l = #Null, Type.b = #JobType_OneShot, OnEvent.l = #Null, CompletionEvent.l = #Null,  Priority.b = #JobPriority_Normal, Affinity.b = #PB_Any)
  LockMutex(JobMutex)
  
  AddElement(Job())
  Job()\Type = Type
  Job()\Priority = Priority
  Job()\Affinity = Affinity
  Job()\OnEvent = OnEvent
  Job()\CompletionEvent = CompletionEvent
  Job()\Function = Function
  Job()\Parameter = Parameter
  SortStructuredList(Job(), 1, OffsetOf(Job\Priority), #PB_Sort_Byte)
  
  Output.l = @Job()
  UnlockMutex(JobMutex)
  
  ProcedureReturn @Job()
EndProcedure

Procedure JobManager_RemoveJob(JobHandle)
  LockMutex(JobMutex)
  
  ChangeCurrentElement(Job(), JobHandle)
  DeleteElement(Job())
  SortStructuredList(Job(), 1, OffsetOf(Job\Priority), #PB_Sort_Byte)
  
  UnlockMutex(JobMutex)
EndProcedure

Procedure JobManager_EditJob(JobHandle, Function.JobFunction, Parameter.l = #Null, Type.b = #JobType_OneShot, OnEvent.l = #Null, CompletionEvent.l = #Null,  Priority.b = #JobPriority_Normal, Affinity.b = #PB_Any)
  LockMutex(JobMutex)
  
  ChangeCurrentElement(Job(), JobHandle)
  Job()\Type = Type
  Job()\Priority = Priority
  Job()\Affinity = Affinity
  Job()\OnEvent = OnEvent
  Job()\CompletionEvent = CompletionEvent
  Job()\Function = Function
  Job()\Parameter = Parameter
  SortStructuredList(Job(), 1, OffsetOf(Job\Priority), #PB_Sort_Byte)
  
  UnlockMutex(JobMutex)
EndProcedure

Procedure JobManager_PushEvent(Event.l)
  LockMutex(OnEventMutex)
  
  AddElement(OnEvent())
  OnEvent() = Event
  
  UnlockMutex(OnEventMutex)
EndProcedure

Procedure JobManager_CompletionEvent() ;Returns #NULL if there is no event
  LockMutex(CompletionEventMutex)
  
  If FirstElement(CompletionEvent()) = #Null
    ProcedureReturn #Null
  EndIf
  Output.l = CompletionEvent()
  LastElement(CompletionEvent())
  ProcedureReturn Output
  
  UnlockMutex(CompletionEventMutex)
EndProcedure

Procedure JobManager_PauseProcessing(Handler.b)
  JobManager_AddJob(#Null, #Null, #JobType_Stop, #Null, #Null, $7F, Handler)
EndProcedure

Procedure JobManager_ResumeProcessing(Handler.b)
  JobManager_AddJob(#Null, #Null, #JobType_Start, #Null, #Null, $7F, Handler)
EndProcedure

Procedure JobManager_ShutDown()
  JobManager_AddJob(#Null, #Null, #JobType_Shutdown, #Null, #Null, $7F, #PB_Any)
  Delay(1000) ;wait for threads to clean up
  LockMutex(JobMutex) ;make sure no one is using the mutex handles
  LockMutex(OnEventMutex)
  LockMutex(CompletionEventMutex)
  UnlockMutex(JobMutex)
  UnlockMutex(OnEventMutex)
  UnlockMutex(CompletionEventMutex)
  FreeMutex(JobMutex)
  FreeMutex(OnEventMutex)
  FreeMutex(CompletionEventMutex)
  ProcedureReturn
EndProcedure

Procedure JobManager_Initilize(HandlerCount)
  JobMutex = CreateMutex()
  OnEventMutex = CreateMutex()
  CompletionEventMutex = CreateMutex()
  
  For Handler = 0 To HandlerCount - 1
    JobManager_CreateHandler()
  Next
EndProcedure
here is a small example (not the above example spoken of...)

Code: Select all

Procedure JobOnce(Parameter.l, Handler.b)
  Debug "I am a job that only is executed once, I was called from job handler " + Str(Handler)
EndProcedure

Procedure JobAlways(Parameter.l, Handler.b)
  Debug "I am a job that is executed when a thread is available to execute it, I was called from job handler " + Str(Handler)
EndProcedure

Procedure JobEventOnce(Parameter.l, Handler.b)
  Debug "I am a job that only is executed once by an event, I was called from job handler " + Str(Handler)
EndProcedure

Procedure JobEventAlways(Parameter.l, Handler.b)
  Debug "I am a job that only is executed when an event arises, I was called from job handler " + Str(Handler)
EndProcedure

JobManager_Initilize(2) ; you can change the number of handlers

JobManager_AddJob(@JobOnce())
JobManager_AddJob(@JobAlways(), #Null, #JobType_Persistant, #Null, #Null, #JobPriority_Low) ;careful when using #JobType_Persistant it can overtake the job managment
;system completely. it is good To put it on a lowest Priority on the list to make the managment system execute that last
;you can uncomment above line to reduce debugger spam

JobManager_AddJob(@JobEventOnce(), #Null, #JobType_OnEventOnce, 1)
JobManager_AddJob(@JobEventAlways(), #Null, #JobType_OnEventPersistant, 2)

JobManager_PushEvent(1)

JobManager_PushEvent(2)
JobManager_PushEvent(2)
Delay(100)
JobManager_PushEvent(2)
JobManager_PushEvent(2)

Delay(2000)

JobManager_ShutDown()
~Dreglor
Shannara
Addict
Addict
Posts: 1808
Joined: Thu Oct 30, 2003 11:19 pm
Location: Emerald Cove, Unformed

Post by Shannara »

Nobody commented on this? I'll give it a whirl when I get home :) Thank you for the code!
Dummy
Enthusiast
Enthusiast
Posts: 162
Joined: Wed Jun 09, 2004 11:10 am
Location: Germany
Contact:

Post by Dummy »

This is also useful for all kinds of mass demand servers, eg. MMO gameservers.

Thx for this snippet!
Bonne_den_kule
Addict
Addict
Posts: 841
Joined: Mon Jun 07, 2004 7:10 pm

Post by Bonne_den_kule »

Looks very nice, indeed! :D
Shannara
Addict
Addict
Posts: 1808
Joined: Thu Oct 30, 2003 11:19 pm
Location: Emerald Cove, Unformed

Post by Shannara »

Yep, this will prove wonderful on those 8 core systems in March.
Intrigued
Enthusiast
Enthusiast
Posts: 501
Joined: Thu Jun 02, 2005 3:55 am
Location: U.S.A.

Post by Intrigued »

I have a long way to go before I will be able to appreciate each facet of this code chunk, nontheless, thank you for sharing such. It's gives some of something to strive for (too).
Intrigued - Registered PureBasic, lifetime updates user
User avatar
NoahPhense
Addict
Addict
Posts: 1999
Joined: Thu Oct 16, 2003 8:30 pm
Location: North Florida

Post by NoahPhense »

Very nice.. threading is all that matters anymore.. nice work

- np
Post Reply