but the basic idea of this code is to define a system to enable better use of Hyper-Threaded/Muilt-Core CPU's by handing "jobs" at them you can allows for faster/easier muilt-thread Application development.
as an example let's say you want to compress and encode a video file
you can set up a job per frame of video and audio sample
and let the job system take care of how to distribute the work.
despite the technical details of the example, it still a handy system
if you have any improvments or fixes you are free to edit the code
Code: Select all
;////////////////////////////////////////////////////////////////
;//
;// Project Title: Job Manager
;// Filename: JobManager.pbi
;// Version: 1.0.0.0
;// Date: 11-01-06
;// Author: Steven (Dreglor) Garcia
;//
;////////////////////////////////////////////////////////////////
;
;This file enables a easy way to make your program take advantage of Hyper-Threaded/Muilt-Core CPU's
;what it does it allow you to set up jobs that can be run on in a varitiy of ways
;one shot, event driven, or perstaint. What makes this useful is that the jobs are thread independent.
;which basicly meaning that none of the threads will be idle while jobs exist (unless jobs were defined to a specific handler)
;which is a effient way to process data asynchronously, even if you need a the data to be in sync
;there are ways to accomplish that, the easisest way is to set up an series of jobs that sends out completion events. monitor
;for the events and then send out an event to initate the next step in the processing of the data
;another way is to set up a few more mutex handles to hold the job until it is ready for the data
;note: that when executing jobs you must be aware of that access of varaibles, arrays, lists, memory blocks
;outside of the job procedure must be accessed one at a time so you must set up a mutex to protect them
;if still have crashes you can try to enable thread-safe in compiler options, also note that window/gadget libaries
;will not respond while in jobs, unless created in the same handler, setting Affinity can be used to make sure that the
;window/gadget functions execute from the same thread every time
;there are also a few things in the purebasic help file on threads that explain things that you should be aware of.
;
;- Constants
;Job type is how the job threads handle jobs on the list
Enumeration
#JobType_Null ;is not processed, empty job, deleted when found
#JobType_OneShot ;processed and is removed from the job list
#JobType_Persistant ;processed when ever a thread is free
#JobType_OnEventOnce ;procesed when event is found and is removed
#JobType_OnEventPersistant ;processed when ever the given event is found
#JobType_Shutdown ;this is a special event called shutting down the threads it makes sure that the threads are freed properly
#JobType_Stop ;a special event to pause the job handling until the the start event is found again
#JobType_Start ;another special event to start the previosly paused job handling
EndEnumeration
;these are just defined time when a job is added the list is sorted based
;on the Priority number so the user can make up there own system
;note there can only be 255 levels of Priorities (starting from negitive)
Enumeration -5
#JobPriority_DoNotCare
#JobPriority_Idle
#JobPriority_VeryLow
#JobPriority_Lower
#JobPriority_Low
#JobPriority_Normal
#JobPriority_High
#JobPriority_Higher
#JobPriority_VeryHigh
#JobPriority_RealTime
#JobPriority_TimeCritcal
EndEnumeration
#JobHandlerCPUSaver = 1
;- ProtoType
Prototype JobFunction(Parameters.l, Handler.b = #Null) ;if you need more just use a structure and pass a pointer
;- Structures
Structure Job
Type.b ;Job type, defines what type of job it is and how it is handled
Priority.b ;job priority, higher the Priority the higher on the list it will be and closer to being the first one to be executed
Affinity.b ;if a given job wants to run on a certain thread it can, otherwise it ussally is #PB_Any to use any thread
OnEvent.l ;if the job is OnEvent this is set to the event the user has set the job will only execute when that event is found
CompletionEvent.l ;user defined event when completed, use #NULL not to push an event
Function.JobFunction ;function to call when job is processed
Parameter.l ;the parameter to pass to the job function
EndStructure
;- Globals
Global NewList OnEvent.l()
Global NewList CompletionEvent.l()
Global NewList Job.Job()
Global JobMutex
Global OnEventMutex
Global CompletionEventMutex
;- Procedures
Procedure JobManager_HandlerThread(Affinity.b)
Debug "Job Handler: " + Str(Affinity) + " Initilized"
MyJob.Job
JobProcessing.b = #True
Repeat
Delay(#JobHandlerCPUSaver) ;simple way to reduce CPU usage
;we don't want to cook the CPU not doing anything
LockMutex(JobMutex)
ForEach Job() ;look for a job that this thread can process
If Job()\Affinity <> #PB_Any And Job()\Affinity <> Affinity
Continue ;next element if not this job is for another thread
EndIf
If Job()\Type = #JobType_OnEventOnce Or Job()\Type = #JobType_OnEventPersistant
Event.l = #False
LockMutex(OnEventMutex)
ForEach OnEvent()
If Job()\OnEvent = OnEvent()
Event = #True
DeleteElement(OnEvent())
Break
EndIf
Next
UnlockMutex(OnEventMutex)
If Event = #False
Continue ;next element if this job event has not been pushed
EndIf
EndIf
If Job()\Type = #JobType_Start
Debug "Job Handler: " + Str(Affinity) + " Resumed Processing"
JobProcessing = #True
ElseIf Job()\Type = #JobType_Stop
Debug "Job Handler: " + Str(Affinity) + " Paused Processing"
JobProcessing = #False
MyJob\Type = #JobType_Null ;Make sure that nothing will be processed
ElseIf Job()\Type = #JobType_Shutdown
Debug "Job Handler: " + Str(Affinity) + " Shutting Down"
UnlockMutex(JobMutex)
ProcedureReturn
EndIf
If JobProcessing = #False
Continue
EndIf
;the current job can be processed
CopyMemory(@Job(), MyJob, SizeOf(Job)) ;we copy it over to process the job after we release the mutex
;it allows for faster job processing
If Job()\Type <> #JobType_Persistant And Job()\Type <> #JobType_OnEventPersistant And Job()\Type <> #JobType_Shutdown ;if not Persistant remove job
DeleteElement(Job())
EndIf
Break ;no need to process the rest of the jobs
Next
UnlockMutex(JobMutex)
Select MyJob\Type
Case #JobType_Null
;nothing to be done here
Continue
Case #JobType_OneShot, #JobType_Persistant, #JobType_OnEventOnce, #JobType_OnEventPersistant
If JobProcessing = #True
MyJob\Function(Parameter, Affinity) ;do the job
If MyJob\CompletionEvent <> #Null
LockMutex(CompletionEventMutex)
AddElement(CompletionEvent())
CompletionEvent() = MyJob\CompletionEvent ;put the user defined completion event into the Completion Event stack
UnlockMutex(CompletionEventMutex)
EndIf
EndIf
Case #JobType_Shutdown
;should be taken care of already
Case #JobType_Stop
;should be taken care of already
Case #JobType_Start
;should be taken care of already
Default
Debug "Job Handler: " + Str(Affinity) + " Error: Unknown Job Type Passed"
EndSelect
MyJob\Type = #Null ;make sure that the job is processed once
ForEver
EndProcedure
Procedure JobManager_CreateHandler()
Static Affinity
CreateThread(@JobManager_HandlerThread(), Affinity)
Affinity + 1
EndProcedure
Procedure JobManager_AddJob(Function.JobFunction, Parameter.l = #Null, Type.b = #JobType_OneShot, OnEvent.l = #Null, CompletionEvent.l = #Null, Priority.b = #JobPriority_Normal, Affinity.b = #PB_Any)
LockMutex(JobMutex)
AddElement(Job())
Job()\Type = Type
Job()\Priority = Priority
Job()\Affinity = Affinity
Job()\OnEvent = OnEvent
Job()\CompletionEvent = CompletionEvent
Job()\Function = Function
Job()\Parameter = Parameter
SortStructuredList(Job(), 1, OffsetOf(Job\Priority), #PB_Sort_Byte)
Output.l = @Job()
UnlockMutex(JobMutex)
ProcedureReturn @Job()
EndProcedure
Procedure JobManager_RemoveJob(JobHandle)
LockMutex(JobMutex)
ChangeCurrentElement(Job(), JobHandle)
DeleteElement(Job())
SortStructuredList(Job(), 1, OffsetOf(Job\Priority), #PB_Sort_Byte)
UnlockMutex(JobMutex)
EndProcedure
Procedure JobManager_EditJob(JobHandle, Function.JobFunction, Parameter.l = #Null, Type.b = #JobType_OneShot, OnEvent.l = #Null, CompletionEvent.l = #Null, Priority.b = #JobPriority_Normal, Affinity.b = #PB_Any)
LockMutex(JobMutex)
ChangeCurrentElement(Job(), JobHandle)
Job()\Type = Type
Job()\Priority = Priority
Job()\Affinity = Affinity
Job()\OnEvent = OnEvent
Job()\CompletionEvent = CompletionEvent
Job()\Function = Function
Job()\Parameter = Parameter
SortStructuredList(Job(), 1, OffsetOf(Job\Priority), #PB_Sort_Byte)
UnlockMutex(JobMutex)
EndProcedure
Procedure JobManager_PushEvent(Event.l)
LockMutex(OnEventMutex)
AddElement(OnEvent())
OnEvent() = Event
UnlockMutex(OnEventMutex)
EndProcedure
Procedure JobManager_CompletionEvent() ;Returns #NULL if there is no event
LockMutex(CompletionEventMutex)
If FirstElement(CompletionEvent()) = #Null
ProcedureReturn #Null
EndIf
Output.l = CompletionEvent()
LastElement(CompletionEvent())
ProcedureReturn Output
UnlockMutex(CompletionEventMutex)
EndProcedure
Procedure JobManager_PauseProcessing(Handler.b)
JobManager_AddJob(#Null, #Null, #JobType_Stop, #Null, #Null, $7F, Handler)
EndProcedure
Procedure JobManager_ResumeProcessing(Handler.b)
JobManager_AddJob(#Null, #Null, #JobType_Start, #Null, #Null, $7F, Handler)
EndProcedure
Procedure JobManager_ShutDown()
JobManager_AddJob(#Null, #Null, #JobType_Shutdown, #Null, #Null, $7F, #PB_Any)
Delay(1000) ;wait for threads to clean up
LockMutex(JobMutex) ;make sure no one is using the mutex handles
LockMutex(OnEventMutex)
LockMutex(CompletionEventMutex)
UnlockMutex(JobMutex)
UnlockMutex(OnEventMutex)
UnlockMutex(CompletionEventMutex)
FreeMutex(JobMutex)
FreeMutex(OnEventMutex)
FreeMutex(CompletionEventMutex)
ProcedureReturn
EndProcedure
Procedure JobManager_Initilize(HandlerCount)
JobMutex = CreateMutex()
OnEventMutex = CreateMutex()
CompletionEventMutex = CreateMutex()
For Handler = 0 To HandlerCount - 1
JobManager_CreateHandler()
Next
EndProcedure
Code: Select all
Procedure JobOnce(Parameter.l, Handler.b)
Debug "I am a job that only is executed once, I was called from job handler " + Str(Handler)
EndProcedure
Procedure JobAlways(Parameter.l, Handler.b)
Debug "I am a job that is executed when a thread is available to execute it, I was called from job handler " + Str(Handler)
EndProcedure
Procedure JobEventOnce(Parameter.l, Handler.b)
Debug "I am a job that only is executed once by an event, I was called from job handler " + Str(Handler)
EndProcedure
Procedure JobEventAlways(Parameter.l, Handler.b)
Debug "I am a job that only is executed when an event arises, I was called from job handler " + Str(Handler)
EndProcedure
JobManager_Initilize(2) ; you can change the number of handlers
JobManager_AddJob(@JobOnce())
JobManager_AddJob(@JobAlways(), #Null, #JobType_Persistant, #Null, #Null, #JobPriority_Low) ;careful when using #JobType_Persistant it can overtake the job managment
;system completely. it is good To put it on a lowest Priority on the list to make the managment system execute that last
;you can uncomment above line to reduce debugger spam
JobManager_AddJob(@JobEventOnce(), #Null, #JobType_OnEventOnce, 1)
JobManager_AddJob(@JobEventAlways(), #Null, #JobType_OnEventPersistant, 2)
JobManager_PushEvent(1)
JobManager_PushEvent(2)
JobManager_PushEvent(2)
Delay(100)
JobManager_PushEvent(2)
JobManager_PushEvent(2)
Delay(2000)
JobManager_ShutDown()