SoS hat geschrieben:Mit den Handles hab ich nun wieder überhaupt kein Problem,die Anzahl schwankt zwar,wird aber nicht ständig mehr.
Die Cpulast schwnkt ebenfalls zwischen 66 und 84%.
Bei mir ist die CPU-Last auf allen Cores bei 100%. Nur bei der ersten
Version war ein Warte-Delay drin, was aber keinen Sinn macht wenn
man es schnell haben möchte.
Der Vollständigkeit halber hier noch Version 3 mit X Worker-Threads
im Hintergrund. So werden also nicht dauernd neue Threads erstellt
und ist somit viel Resourcenschonender.
Das ganze mit plain PB, bis auf GetSystemInfo_() um die Anzahl der
CPUs/Cores auf Windows herauszubekommen.
Für mehr muß man dann wohl doch mit mehr APIs/Libs ran, aber das
haben freak und andere ja schon gemacht:
Anyway... vielleicht möchte jemand noch bissl damit rumspielen:
Code: Alles auswählen
;*----------------------------------------------------------------------------------
;*
;* JOBS.pbi
;*
;* Author: Danilo Krahn, 2008/06/19
;*
;* functions for running jobs simultaneously
;* on multiprocessor/multicore systems
;*
;* Version 2, 2008/06/20: - removed JobResult() and added optional
;* pointer to result to AddJob()
;*
;* - The JobQueue is now cleared by default
;* after all jobs are done.
;*
;* - added optional flag #PRESERVE_JOBLIST
;* for DoJobs(#PRESERVE_JOBLIST) if you
;* want to run the same jobs again
;*
;* Version 3, 2008/06/22 - start X worker threads in background (X = available CPUs/Cores)
;* and pause this threads after all jobs are done
;*
;* - added EndJobThreads() for stopping
;* worker threads at program end
;*
;* functions:
;*
;* AddJob( @myProcedure(), optional_argument.l = 0, optional_pointerToResult = 0 ) ; add a job to the queue
;*
;* jobsDone = DoJobs() ; run all jobs in the queue and clear job queue (no ClearJobs() needed)
;*
;* jobsDone = DoJobs(#PRESERVE_JOBLIST) ; run all jobs in the queue without clearing the job list
;*
;* ClearJobs() ; clear the job queue
;*
;* EndJobThreads() ; end worker threads before program ends
;*
;*
;* compiler options: [X] create threadsafe executable
;*
;*----------------------------------------------------------------------------------
CompilerIf #PB_Compiler_Thread=0
;******************************************************
;* remove this only if you know what you are doing ;) *
;******************************************************
CompilerError "please use compiler option: [X] create threadsafe executable" ; required for using PB-Strings (also with Debug)
CompilerEndIf
;EnableExplicit
#PRESERVE_JOBLIST = 1
#__JOBS__THREAD__RUNNING = 0
#__JOBS__THREAD__PAUSED = 1
#__JOBS__SIGNAL__QUIT = -1
Structure __JOB_INFO
jobProc.l
jobArg.l
*pjobResult.LONG
EndStructure
Structure __JOB_THREAD
thread_id.l
thread_paused.l
thread_signal.l
EndStructure
Global NewList __Jobs__.__JOB_INFO()
Global Dim __Jobs__threads.__JOB_THREAD(0)
Global __Jobs__mutex.l = 0
Global __Jobs__CPUs.l = 0
Procedure __GetNextJob(*job.__JOB_INFO)
Protected retval.l
;
; return with no mutex
;
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
;
; get next job from linked list
;
LockMutex(__Jobs__mutex)
If *job
If NextElement(__Jobs__())
*job\jobProc = __Jobs__()\jobProc
*job\jobArg = __Jobs__()\jobArg
*job\pjobResult = __Jobs__()\pjobResult
retval = #True
EndIf
EndIf
UnlockMutex(__Jobs__mutex)
ProcedureReturn retval
EndProcedure
Procedure __WorkerThread(*me.__JOB_THREAD)
Protected result.l, job.__JOB_INFO
;
; Wait until thread_id is written to structure
;
Repeat : Until *me\thread_id
Debug "worker thread START"
Repeat
;
; do all jobs
;
While __GetNextJob(@job)
If job\jobProc
result = CallFunctionFast(job\jobProc,job\jobArg)
If job\pjobResult
job\pjobResult\l = result
EndIf
EndIf
Wend
;
; dont pause after quit signal! (possible deadlock)
;
If *me\thread_signal <> #__JOBS__SIGNAL__QUIT
;
; all jobs done, so take a break
;
Delay(0) ; request new time slice because next two lines should not get interrupted
*me\thread_paused = #__JOBS__THREAD__PAUSED
PauseThread(*me\thread_id)
*me\thread_paused = #__JOBS__THREAD__RUNNING
EndIf
;
; end worker threads with signal quit from EndJobThreads()
;
Until *me\thread_signal = #__JOBS__SIGNAL__QUIT
Debug "worker thread STOP"
;ExitThread_(0)
EndProcedure
Procedure ClearJobs()
;
; check mutex for simultaneously access
; to the job linked list
;
If Not __Jobs__mutex
__Jobs__mutex = CreateMutex()
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
EndIf
;
; clear linked list with jobs
;
LockMutex(__Jobs__mutex)
ClearList(__Jobs__())
UnlockMutex(__Jobs__mutex)
EndProcedure
Procedure.l AddJob( JobProcedure.l, Arg.l=0, PointerToResult.l=0 )
Protected retval.l
;
; check mutex for simultaneously access
; to the job linked list
;
If Not __Jobs__mutex
__Jobs__mutex = CreateMutex()
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
EndIf
;
; add job to linked list
;
LockMutex(__Jobs__mutex)
LastElement(__Jobs__())
If AddElement(__Jobs__())
__Jobs__()\jobProc = JobProcedure
__Jobs__()\jobArg = Arg
__Jobs__()\pjobResult = PointerToResult
retval = #True
EndIf
UnlockMutex(__Jobs__mutex)
ProcedureReturn retval
EndProcedure
Procedure.l DoJobs(preserve_joblist.l=0)
Protected job_count.l, i.l, thread_running.l
;
; check mutex for simultaneously access
; to the job linked list
;
If Not __Jobs__mutex
__Jobs__mutex = CreateMutex()
If Not __Jobs__mutex
ProcedureReturn #False
EndIf
EndIf
;
; start worker threads 1st time this procedure is called
;
If Not __Jobs__CPUs
CompilerIf #PB_Compiler_OS = #PB_OS_Windows
Protected si.SYSTEM_INFO
GetSystemInfo_(@si)
__Jobs__CPUs = si\dwNumberOfProcessors
CompilerElse
__Jobs__CPUs = 4 ; run 4 threads simultaneously by default
CompilerEndIf
;
; Dim array for CPU count
;
Dim __Jobs__threads.__JOB_THREAD(__Jobs__CPUs)
;
; start X worker threads
;
For i = 1 To __Jobs__CPUs
__Jobs__threads(i)\thread_signal = 0
__Jobs__threads(i)\thread_id = CreateThread(@__WorkerThread(),@__Jobs__threads(i))
Next i
EndIf
;
; reset list before start and count jobs
;
LockMutex(__Jobs__mutex)
job_count = CountList(__Jobs__())
ResetList(__Jobs__())
UnlockMutex(__Jobs__mutex)
;
; no jobs, return
;
If Not job_count
ProcedureReturn #False
EndIf
;
; resume worker threads to do the jobs
;
For i = 1 To __Jobs__CPUs
ResumeThread(__Jobs__threads(i)\thread_id)
Next i
;
; wait until all worker threads are done
;
Repeat
thread_running = 0
For i = 1 To __Jobs__CPUs
If __Jobs__threads(i)\thread_paused <> #__JOBS__THREAD__PAUSED
thread_running = 1
EndIf
Next i
Until thread_running = 0
;
; clear list without optional flag #PRESERVE_JOBLIST
;
If Not (preserve_joblist = #PRESERVE_JOBLIST)
LockMutex(__Jobs__mutex)
ClearList(__Jobs__())
UnlockMutex(__Jobs__mutex)
EndIf
ProcedureReturn job_count
EndProcedure
Procedure EndJobThreads()
Protected i.l
;
ClearJobs()
;
If __Jobs__CPUs
;
; set signal quit
; and resume threads
;
For i = 1 To __Jobs__CPUs
__Jobs__threads(i)\thread_signal = #__JOBS__SIGNAL__QUIT
ResumeThread(__Jobs__threads(i)\thread_id)
Next i
;
; wait for all worker threads to end
;
For i = 1 To __Jobs__CPUs
WaitThread(__Jobs__threads(i)\thread_id,500)
__Jobs__threads(i)\thread_id = 0
Next i
__Jobs__CPUs = 0
EndIf
EndProcedure
;*----------------------------------------------------------------------------------
;*--[ END OF INCLUDE 'JOBS.pbi' ]---------------------------------------------------
;*----------------------------------------------------------------------------------
;*-------------------------------------
;----[ Program ]-----------------------
;*-------------------------------------
;
; PB 4.20 doesnt include StringManager Library
; when compiling threadsafe and not using strings
;
; workaround by ts-soft, thanks :)
;
CompilerIf #PB_Compiler_Thread
CompilerIf #PB_Compiler_Unicode
Import "StringManagerThreadUnicode.lib" : EndImport
CompilerElse
Import "StringManagerThread.lib" : EndImport
CompilerEndIf
CompilerEndIf
EnableExplicit
;XIncludeFile "JOBS.pbi"
Procedure.l MyJob(arg.l)
Protected a.l, b.l
;Debug "running job "+Str(arg)
; dummy loop
For a = 1 To 5000000
b=a*2
Next a
ProcedureReturn arg * arg
EndProcedure
Define.l a, b
Dim results.l(10)
For a = 1 To 10
AddJob(@MyJob(),a,@results(a))
Next a
MessageRequester("INFO","START")
CompilerIf #PB_Compiler_Debugger
For a = 1 To 2 ; with debugger it is sooo much slower
CompilerElse
For a = 1 To 300
CompilerEndIf
DoJobs(#PRESERVE_JOBLIST)
Next a
a = DoJobs()
;Debug "------------------------"
;Debug Str(a)+" jobs done."
;Debug "results:"
For a = 1 To 10
Debug results(a)
Next a
EndJobThreads()
MessageRequester("INFO","THE END")
; IDE Options = PureBasic 4.20 (Windows - x86)
; CursorPosition = 337
; FirstLine = 309
; Folding = --
; EnableThread
; EnableXP
Sollte eigentlich leicht verständlich sein, hab noch paar Kommentare hinzugefügt.