Seite 1 von 2

mehrere Aufgaben parallel abarbeiten lassen

Verfasst: 19.06.2008 10:38
von Danilo
Hier ein paar Funktionen um mehrere Aufgaben ("Jobs") auf Multiprozessor- und Multicore-
Systemen parallel abarbeiten zu lassen.

Mir ging es hier darum, die Parallelisierung von Aufgaben möglichst
einfach zu machen.
Die Aufgaben werden durch Threads abgearbeitet. Je nachdem wieviele
CPUs zur Laufzeit des Programmes verfügbar sind, werden so die Jobs
parallelisiert.

Man hat beispielsweise 8 unterschiedliche Berechnungen durchzuführen,
bevor man im Programm fortfahren kann.
Dann erstellt man daraus 8 Jobs (Aufgaben) und lässt sie abarbeiten.

Auf einem System mit 1 CPU und 1 Prozessorkern werden die Aufgaben
der Reihe nach abgearbeitet.
Auf einem System mit beispielsweise 2 QuadCore-CPUs, können dagegen
alle 8 Aufgaben parallel abgearbeitet werden.

Das Thema der Parallelisierung von Aufgaben ist heute sehr wichtig,
und wird in Zukunft noch viel wichtiger werden.

Code: Alles auswählen

;*----------------------------------------------------------------------------------
;*
;* JOBS.pbi
;*
;* Author: Danilo Krahn, 2008/06/19
;*
;* functions for running jobs simultaneously
;* on multiprocessor/multicore systems
;*
;* functions:
;*             jobID    = AddJob( @myProcedure(), optional_argument.l = 0 )    ; add a job to the queue
;*
;*             jobsDone = DoJobs()                                             ; run all jobs in the queue
;*
;*             result   = JobResult(jobID)                                     ; get the return value of a job
;*
;*             ClearJobs()                                                     ; clear the job queue
;*
;*
;* compiler options: [X] create threadsafe executable
;*
;*----------------------------------------------------------------------------------

Structure __JOB_INFO
  id.l
  jobProc.l
  jobArg.l
  jobResult.l
EndStructure


Global NewList __Jobs__.__JOB_INFO()
Global __Jobs__mutex.l


Procedure __GetNextJob(*job.__JOB_INFO)
  Shared    __Jobs__mutex
  Protected element.l, retval.l

  If Not __Jobs__mutex
    ProcedureReturn #False
  EndIf

  LockMutex(__Jobs__mutex)
    If *job
      element = NextElement(__Jobs__())
      If element
        *job\id         = __Jobs__()\id
        *job\jobProc    = __Jobs__()\jobProc
        *job\jobArg     = __Jobs__()\jobArg
        *job\jobResult  = element
        retval = #True
      EndIf
    EndIf
  UnlockMutex(__Jobs__mutex)
  ProcedureReturn retval
EndProcedure


Procedure __WorkerThread(id)
  Protected element.l, result.l, job.__JOB_INFO

  While __GetNextJob(@job)
    If job\jobProc
      element = job\jobResult
      result = CallFunctionFast(job\jobProc,job\jobArg)
      If element
        PokeL(element+(2*sizeOf(LONG))+OffsetOf(__JOB_INFO\jobResult),result)
      EndIf
    EndIf
  Wend
EndProcedure


Procedure.l JobResult(jobID.l)
  Protected result.l
  
  If Not __Jobs__mutex
    ProcedureReturn #False
  EndIf

  LockMutex(__Jobs__mutex)
    ForEach __Jobs__()
      If __Jobs__()\id = jobID
        result = __Jobs__()\jobResult
        Break
      EndIf
    Next
  UnlockMutex(__Jobs__mutex)
  ProcedureReturn result
EndProcedure


Procedure ClearJobs()
  Shared __Jobs__mutex
  
  If Not __Jobs__mutex
    __Jobs__mutex = CreateMutex()
    If Not __Jobs__mutex
      ProcedureReturn #False
    EndIf
  EndIf

  LockMutex(__Jobs__mutex)
    ClearList(__Jobs__())
  UnlockMutex(__Jobs__mutex)
EndProcedure


Procedure.l AddJob(proc.l,arg.l=0)
  Protected retval.l
  
  If Not __Jobs__mutex
    __Jobs__mutex = CreateMutex()
    If Not __Jobs__mutex
      ProcedureReturn #False
    EndIf
  EndIf

  LockMutex(__Jobs__mutex)
    LastElement(__Jobs__())
    If AddElement(__Jobs__())
       __Jobs__()\jobProc    = proc
       __Jobs__()\jobArg     = arg
       __Jobs__()\id.l       = CountList(__Jobs__())
       retval = __Jobs__()\id
    EndIf
  UnlockMutex(__Jobs__mutex)
  ProcedureReturn retval
EndProcedure

Procedure.l DoJobs()
  CompilerIf #PB_Compiler_OS = #PB_OS_Windows
    Protected si.SYSTEM_INFO
  CompilerEndIf
  
  Protected numCPUs.l, job_count.l, i.l, thread_running.l
  
  If Not __Jobs__mutex
    __Jobs__mutex = CreateMutex()
    If Not __Jobs__mutex
      ProcedureReturn #False
    EndIf
  EndIf

  CompilerIf #PB_Compiler_OS = #PB_OS_Windows
    GetSystemInfo_(@si)
    numCPUs = si\dwNumberOfProcessors
  CompilerElse
    numCPUs = 4 ; run 4 threads simultanously by default
  CompilerEndIf

  ;Debug "number of CPUs: "+Str(numCPUs)
  ;numCPUs = 1

  Dim threads(numCPUs)

  job_count = CountList(__Jobs__())
  If job_count = 0
    ProcedureReturn 0
  ElseIf job_count < numCPUs
    numCPUs = job_count
  EndIf
  ;Debug "starting "+Str(numCPUs)+" worker threads."

  ResetList(__Jobs__())

  For i = 1 To numCPUs
    threads(i) = CreateThread(@__WorkerThread(),i)
  Next i

  ; wait until all worker threads are done
  Repeat
    thread_running = 0
    For i = 1 To numCPUs
      If IsThread(threads(i))
        thread_running = 1
      EndIf
    Next i
    Delay(10)
  Until thread_running = 0
  ProcedureReturn CountList(__Jobs__())
EndProcedure

;*----------------------------------------------------------------------------------
;*--[ END OF INCLUDE 'JOBS.pbi' ]---------------------------------------------------
;*----------------------------------------------------------------------------------




;*-------------------------------------
;----[ Program ]-----------------------
;*-------------------------------------

EnableExplicit

;XIncludeFile "JOBS.pbi"

Procedure.l MyJob(arg.l)
  Protected a.l, b.l
  Debug "running job "+Str(arg)
  ; dummy loop
  For a = 1 To 10000000
    b=a*2
  Next a
  ProcedureReturn arg * arg
EndProcedure



Define.l a, b
Dim jobs.l(10)

For a = 1 To 10
  jobs(a) = AddJob(@MyJob(),a)
Next a

a = DoJobs()

Debug "------------------------"
Debug Str(a)+" jobs done."
Debug "results:"

For a = 1 To 10
  Debug JobResult( jobs(a) )
Next a

ClearJobs()

; IDE Options = PureBasic 4.20 (Windows - x86)
; CursorPosition = 194
; FirstLine = 178

; Folding = --
; EnableThread
; EnableXP
Die Compiler-Option '[X] create threadsafe executable' sollte angeschalten
werden, da sonst die Debug-Ausgabe (und noch mehr) durcheinander kommen kann.


Ich hoffe das jemand etwas damit anfangen kann und es vielleicht
etwas hilft die Parallelisierung von bestimmten Aufgaben etwas leichter
zu machen.

Man sollte damit alles mögliche leicht parallelisieren können:
Man möchte 8 Dateien oder 8 Speicherbereiche packen? OK, 8 Jobs
erstellen und den Rechner parallel arbeiten lassen.
Man muß 3 Arrays mit Werten füllen? OK, warum nicht parallel erledigen lassen, wenn möglich?

Die Möglichkeiten sollten groß sein, die Benutzung (hoffentlich) einfach. :)


EDIT: Kompatibel zu PB-Threads gemacht, die Job-Procedure muß immer
1 Parameter haben, den man optional bei AddJob() angeben kann.

Verfasst: 19.06.2008 13:17
von dige
Schöner sauberer Ansatz! Was mir noch fehlt ist eine Kontrollmöglichkeit
ob ein Thread noch läuft oder schon abgestorben ist und demzufolge
müsste ClearJobs() auch noch um ein KillThread() erweitert werden.

Ich habe das so gelöst, das eine Statische Variable Alive.l innerhalb
des Jobthreads immer wieder mit ElapsedMilliseconds() überschrieben
wird und bei CallFunctionFast(job\jobProc, -1) dieser Wert zurückgegeben wird.

Wie würdest Du das lösen?

Verfasst: 19.06.2008 13:48
von AND51
dige hat geschrieben:müsste [...] um ein KillThread() erweitert werden
[...]
Wie würdest Du das lösen?
Meine Lösung wäre, dir auf die Finger zu hauen, wenn du KillThread() benutzt.
Man müsste ganz einfach nur dem Thread ein globales (bzw. geSharedtes Flag zur Verfügung stellen; der Job bzw. die eigentliche Prozedur muss dieses selbst überprüfen und sich ggf. beenden.
Glaube nicht, dass man das "von außen" lösen kann, höchstens per API.

Verfasst: 19.06.2008 18:50
von SoS
Hallo Danilo

Super Arbeit nur bekommst Du ein Problem wenn der letzte Thread der 1. ist ,der fertig ist. ;)
Habe mal eine Procedur angepasst wie ich es prüfen würde ob alle Threads fertig sind.

Code: Alles auswählen

Procedure.l DoJobs() 
  CompilerIf #PB_Compiler_OS = #PB_OS_Windows 
    Protected si.SYSTEM_INFO 
  CompilerEndIf 
  
  Protected numCPUs.l, job_count.l, i.l, thread_running.l 
  
  If Not __Jobs__mutex 
    __Jobs__mutex = CreateMutex() 
    If Not __Jobs__mutex 
      ProcedureReturn #False 
    EndIf 
  EndIf 
  
  CompilerIf #PB_Compiler_OS = #PB_OS_Windows 
    GetSystemInfo_(@si) 
    numCPUs = si\dwNumberOfProcessors 
  CompilerElse 
    numCPUs = 4 ; run 4 threads simultanously by default 
  CompilerEndIf 
  
  ;Debug "number of CPUs: "+Str(numCPUs) 
  ;numCPUs = 1 
  
  Dim threads(numCPUs) 
  
  job_count = CountList(__Jobs__()) 
  If job_count = 0 
    ProcedureReturn 0 
  ElseIf job_count < numCPUs 
    numCPUs = job_count 
  EndIf 
  ;Debug "starting "+Str(numCPUs)+" worker threads." 
  
  ResetList(__Jobs__()) 
  
  For i = 1 To numCPUs 
    threads(i) = CreateThread(@__WorkerThread(),i) 
  Next i 
  
  ; wait until all worker threads are done 
  Repeat 
    thread_running = 0 
    For i = 1 To numCPUs 
      If IsThread(threads(i)) 
        thread_running + 1          ; geändert ,um sicherzustellen das wirklich kein Thread mehr läuft
      EndIf 
    Next i 
    Delay(10) 
  Until thread_running = 0 
  ProcedureReturn CountList(__Jobs__()) 
EndProcedure 

Verfasst: 19.06.2008 20:38
von Danilo
dige hat geschrieben:Was mir noch fehlt ist eine Kontrollmöglichkeit
ob ein Thread noch läuft oder schon abgestorben ist und demzufolge
müsste ClearJobs() auch noch um ein KillThread() erweitert werden.
DoJobs() kehrt doch eh nicht zurück, bevor alle Threads und Jobs fertig
sind. Ein Job sollte also keine Schleife sein, die läuft und läuft, da dann
DoJobs() nie beendet werden würde.

Die 4 Befehle sollte genau in dieser Reihenfolge benutzt werden:

Code: Alles auswählen

AddJob(...)
DoJobs()
x = JobResult(..) wenn das Ergebnis gebraucht wird
ClearJobs() bevor man neue Jobs laufen lässt

danach wieder von vorn

AddJob(...)
DoJobs()
....
Meine Idee war es also nur kleine Aufgaben automatisch, je nach
vorhandenen CPUs, parallelisieren zu können.
Ein Job ist dann einfach ein Stück Code (eine For-Schleife, ein SortArray, etc.),
das mit anderen Jobs parallelisiert werden kann. Der Job sollte also abgearbeitet
werden und dann zurückkehren, nicht (unendlich) loopen.

Die Idee kam mir durch Intel® Threading Building Blocks (TBB) für C++.
Dort hat man z.B. ein Paralleles For, While, Sort uvm., um diese Sachen
automatisch (je nachdem wieviel CPUs zur Laufzeit vorhanden sind) zu
parallelisieren.
Intel TBB ist etwas ganz anderes und komplexeres. Nur das Grundkonzept,
verschiedene Aufgaben leicht parallelisieren zu können, hat mich dann
auf die Jobs gebracht.

Hat man heute einen ganz normalen linearen PB-Code:

Code: Alles auswählen

SortArray(...)

For i = 0 to 100000
  DoSomething
Next i

InvertImage(img)
...und diese Dinge sind nicht voneinander abhängig, dann kann man
alle 3 Sachen auch parallel abarbeiten lassen.
Oder man splittet ein FillArray() in 4 Jobs, wobei jeder Job 25% des
Arrays mit den entsprechenden Werten füllt.

In PB programmieren die meisten Leute ganz einfach so:

Code: Alles auswählen

x = BerechneX()
y = BerechneY()
z = BerechneZ()

MachwasMitXYZ(x,y,z)
Auf Multicore-Systemen läuft das genauso langsam wie auf SingleCores.
Wenn man aber die 3 Berechne.() als Jobs nimmt, könnten die auch
parallel abgearbeitet werden, wodurch diese Berechnungen im Idealfall
insgesamt fast 3mal schneller gehen könnten (wenn 3 Cores vorhanden).

Neu würde es ganz einfach so aussehen:

Code: Alles auswählen

job1 = AddJob( @BerechneX() )
job2 = AddJob( @BerechneY() )
job3 = AddJob( @BerechneZ() )
DoJobs() ; alles parallel ausführen, wenn möglich (andernfalls linear, wie bisher)

MachwasMitXYZ( JobResult(job1), JobResult(job2), JobResult(job3) )

@SoS: Ich weiß nicht was Du meinst, "thread_running + 1" und
"thread_running = 1" macht doch keinen Unterschied.
Bei beiden Versionen ist thread_running danach nicht 0, also wird
die Abbruchbedingung "Until thread_running = 0" nicht erfüllt.

Verfasst: 19.06.2008 20:58
von SoS
Danilo hat geschrieben:@SoS: Ich weiß nicht was Du meinst
Ganz einfach.

Deine Methode

Code: Alles auswählen

    For i = 1 To numCPUs 
      If IsThread(threads(i)) 
        thread_running = 1 
      EndIf 
    Next i 
Es wird ja nicht nur ein Thread geprüft sondern z.b. 4
Die 1. 3 sind noch am werkeln und geben thread_running = 1 zurück der 4. ist aber schon fertig und gibt thread_running = 0 zurück.
Das ist die bedingung die Procedur zu verlassen,in wirklichkeit laufen aber noch 3 threads.

meine Methode

Code: Alles auswählen

    For i = 1 To numCPUs 
      If IsThread(threads(i)) 
        thread_running + 1          ; geändert ,um sicherzustellen das wirklich kein Thread mehr läuft 
      EndIf 
    Next i 
1+1+1+0 <> 0 ;)

Verfasst: 19.06.2008 21:04
von Danilo
SoS hat geschrieben:Deine Methode

Code: Alles auswählen

    For i = 1 To numCPUs 
      If IsThread(threads(i)) 
        thread_running = 1 
      EndIf 
    Next i 
Es wird ja nicht nur ein Thread geprüft sondern z.b. 4
Die 1. 3 sind noch am werkeln und geben thread_running = 1 zurück der 4. ist aber schon fertig und gibt thread_running = 0 zurück.
Das ist die bedingung die Procedur zu verlassen,in wirklichkeit laufen aber noch 3 threads.
Wenn einer der ersten 3 Threads "thread_running = 1" setzt, und dann
der 4. IsThread(threads(4)) 0 zurück gibt, dann wird thread_running
*nicht* wieder auf 0 gesetzt. Dadurch wird gewartet bis *alle* Threads
beendet sind.

Was Du meinst wäre richtig mit:

Code: Alles auswählen

    For i = 1 To numCPUs
      thread_running + IsThread(threads(i))
    Next i

Verfasst: 19.06.2008 21:10
von SoS
Oops , sorry da hatte ich nen Denkfehler.

Meine Änderung tut ja nichts anderes.
Jetzt frage ich mich aber warum meine Änderung nicht mehr dazu führt das mein Program zu einem Speicherfressendem Monster wird.

Verfasst: 20.06.2008 08:49
von Danilo
SoS hat geschrieben:Meine Änderung tut ja nichts anderes.
Jetzt frage ich mich aber warum meine Änderung nicht mehr dazu führt das mein Program zu einem Speicherfressendem Monster wird.
Mit dieser Änderung dürfte das eigentlich nichts zu tun haben.

Jeder Thread der fertig ist, hinterlässt in Windows noch ein Handle.
Damit kann man dann noch den Rückgabewert des Threads ermitteln
und sowas.
Wenn ich bei MyJob den Dummy-Loop entferne, so dass der Thread
schnell abläuft, und dann 1.000.000 mal DoJobs(#PRESERVE_JOBLIST)
aufrufe, dann gibts bei mir Probleme. Asus PC Probe II hat sich jedenfalls
verabschiedet und meinte vorher noch die Systemresourcen seien hin.
Speicher wird bei mir nicht gefressen, aber durch die vielen Threads
die Handles.

Im Task Manager steigen die benutzten Handles rasant an. Ist auch
klar, da das Handle jedes beendeten Threads eben noch weiter im
Speicher verbleibt, bis man es mit CloseHandle_(Thread) freigibt.

Das hab ich probiert, crasht aber bei verschiedenen Tests, so als wenn
der Thread noch lief, obwohl IsThread() 0 zurück gegeben hat.
Vielleicht braucht das System zwischen IsThread() = 0 und CloseHandle_(Thread)
noch bissl Zeit zum verschnaufen? ;)


Anyway... mir hat das gestern nicht gefallen:

Code: Alles auswählen

job1 = AddJob( @BerechneX() )
job2 = AddJob( @BerechneY() )
job3 = AddJob( @BerechneZ() )
DoJobs() ; alles parallel ausführen, wenn möglich (andernfalls linear, wie bisher)

MachwasMitXYZ( JobResult(job1), JobResult(job2), JobResult(job3) )
JobResult() ging immer mit ForEach die LinkedList durch, was hier also
3 mal passieren würde. Das macht mir die Sache zu lahm, weshalb ich
ein paar Dinge geändert habe.

Das Resultat des Threads wird jetzt direkt in eine Variable geschrieben,
deren Adresse man optional als letzten Parameter von AddJobs() angeben kann.

Dann sieht das obige Beispiel so aus:

Code: Alles auswählen

AddJob( @BerechneX(), 0, @x )
AddJob( @BerechneY(), 0, @y )
AddJob( @BerechneZ(), 0, @z )
DoJobs() ; alles parallel ausführen, wenn möglich (andernfalls linear, wie bisher)

MachwasMitXYZ( x, y, z )
JobResult() fällt also weg. Und die JobListe wird default automatisch
gelöscht, wenn die Jobs abgearbeitet wurden. Man braucht also normal
ClearJobs() nicht mehr aufzurufen.
Mit dem Flag #PRESERVE_JOBLIST bei DoJobs() wird die Liste nicht
gelöscht, und man kann die Jobs nochmal laufen lassen.

Code: Alles auswählen

;*----------------------------------------------------------------------------------
;*
;* JOBS.pbi
;*
;* Author: Danilo Krahn, 2008/06/19
;*
;* functions for running jobs simultaneously
;* on multiprocessor/multicore systems
;*
;* Version 2, 2008/06/20:   - removed JobResult() and added optional
;*                            pointer to result to AddJob()
;*
;*                          - The JobQueue is now cleared by default
;*                            after all jobs are done.
;*
;*                          - added optional flag #PRESERVE_JOBLIST
;*                            for DoJobs(#PRESERVE_JOBLIST) if you
;*                            want to run the same jobs again
;*
;* functions:
;*
;*   AddJob( @myProcedure(), optional_argument.l = 0, optional_pointerToResult = 0 )  ; add a job to the queue
;*
;*   jobsDone = DoJobs()                   ; run all jobs in the queue and clear job queue (no ClearJobs() needed)
;*
;*   jobsDone = DoJobs(#PRESERVE_JOBLIST)  ; run all jobs in the queue without clearing the job list
;*
;*   ClearJobs()                           ; clear the job queue
;*
;*
;* compiler options: [X] create threadsafe executable
;*
;*----------------------------------------------------------------------------------
CompilerIf #PB_Compiler_Thread=0
  ;******************************************************
  ;* remove this only if you know what you are doing ;) *
  ;******************************************************
  CompilerError "please use compiler option: [X] create threadsafe executable"
CompilerEndIf


#PRESERVE_JOBLIST = 1

Structure __JOB_INFO
   jobProc.l
   jobArg.l
   p_jobResult.l
EndStructure


Global NewList __Jobs__.__JOB_INFO()
Global __Jobs__mutex.l


Procedure __GetNextJob(*job.__JOB_INFO)
  Protected retval.l

  If Not __Jobs__mutex
    ProcedureReturn #False
  EndIf

  LockMutex(__Jobs__mutex)
    If *job
      If NextElement(__Jobs__())
        *job\jobProc     = __Jobs__()\jobProc
        *job\jobArg      = __Jobs__()\jobArg
        *job\p_jobResult = __Jobs__()\p_jobResult
        retval = #True
      EndIf
    EndIf
  UnlockMutex(__Jobs__mutex)
  ProcedureReturn retval
EndProcedure


Procedure __WorkerThread(id)
  Protected result.l, job.__JOB_INFO

  While __GetNextJob(@job)
    If job\jobProc
      result = CallFunctionFast(job\jobProc,job\jobArg)
      If job\p_jobResult
        PokeL(job\p_jobResult,result)
      EndIf
    EndIf
  Wend
  
  ;ExitThread_(0)
EndProcedure


Procedure ClearJobs()
  If Not __Jobs__mutex
    __Jobs__mutex = CreateMutex()
    If Not __Jobs__mutex
      ProcedureReturn #False
    EndIf
  EndIf

  LockMutex(__Jobs__mutex)
    ClearList(__Jobs__())
  UnlockMutex(__Jobs__mutex)
EndProcedure


Procedure.l AddJob( JobProcedure.l, Arg.l=0, PointerToResult.l=0 )
  Protected retval.l
  
  If Not __Jobs__mutex
    __Jobs__mutex = CreateMutex()
    If Not __Jobs__mutex
      ProcedureReturn #False
    EndIf
  EndIf

  LockMutex(__Jobs__mutex)
    LastElement(__Jobs__())
    If AddElement(__Jobs__())
       __Jobs__()\jobProc     = JobProcedure
       __Jobs__()\jobArg      = Arg
       __Jobs__()\p_jobResult = PointerToResult
       retval = #True
    EndIf
  UnlockMutex(__Jobs__mutex)
  ProcedureReturn retval
EndProcedure

Procedure.l DoJobs(preserve_joblist.l=0)
  Protected numCPUs.l, job_count.l, i.l, thread_running.l, retval.l
  
  If Not __Jobs__mutex
    __Jobs__mutex = CreateMutex()
    If Not __Jobs__mutex
      ProcedureReturn #False
    EndIf
  EndIf

  CompilerIf #PB_Compiler_OS = #PB_OS_Windows
    Protected si.SYSTEM_INFO
    GetSystemInfo_(@si)
    numCPUs = si\dwNumberOfProcessors
  CompilerElse
    numCPUs = 4 ; run 4 threads simultanously by default
  CompilerEndIf

  ;Debug "number of CPUs: "+Str(numCPUs)
  ;numCPUs = 1

  Dim threads(numCPUs)

  job_count = CountList(__Jobs__())
  If job_count = 0
    ProcedureReturn 0
  ElseIf job_count < numCPUs
    numCPUs = job_count
  EndIf
  ;Debug "starting "+Str(numCPUs)+" worker threads."

  LockMutex(__Jobs__mutex)
    ResetList(__Jobs__())
  UnlockMutex(__Jobs__mutex)

  For i = 1 To numCPUs
    threads(i) = CreateThread(@__WorkerThread(),i)
  Next i

  ; wait until all worker threads are done
  Repeat
    thread_running = 0
    For i = 1 To numCPUs
      If IsThread(threads(i))
        thread_running = 1
      EndIf
    Next i
    ;Delay(5) ; slows down too much
  Until thread_running = 0

  ;CompilerIf #PB_Compiler_OS = #PB_OS_Windows
  ;  For i = 1 To numCPUs
  ;    If threads(i)
  ;      CloseHandle_(threads(i)) ; CRASH!
  ;    EndIf
  ;  Next i
  ;CompilerEndIf

  LockMutex(__Jobs__mutex)
    retval = CountList(__Jobs__())
    If Not preserve_joblist
        ClearList(__Jobs__())
    EndIf
  UnlockMutex(__Jobs__mutex)

  ProcedureReturn retval
EndProcedure

;*----------------------------------------------------------------------------------
;*--[ END OF INCLUDE 'JOBS.pbi' ]---------------------------------------------------
;*----------------------------------------------------------------------------------




;*-------------------------------------
;----[ Program ]-----------------------
;*-------------------------------------

EnableExplicit

;XIncludeFile "JOBS.pbi"

Procedure.l MyJob(arg.l)
  Protected a.l, b.l
  Debug "running job "+Str(arg)
  ; dummy loop
  For a = 1 To 5000000
    b=a*2
  Next a
  ProcedureReturn arg * arg
EndProcedure



Define.l a, b
Dim results.l(10)

For a = 1 To 10
  AddJob(@MyJob(),a,@results(a))
Next a

DoJobs(#PRESERVE_JOBLIST)

a = DoJobs()


Debug "------------------------"
Debug Str(a)+" jobs done."
Debug "results:"

For a = 1 To 10
  Debug results(a)
Next a

ClearJobs()

; IDE Options = PureBasic 4.20 (Windows - x86)
; CursorPosition = 207
; FirstLine = 192
; Folding = --
; EnableThread
; EnableXP

Verfasst: 20.06.2008 10:11
von dige
@AND51: vielen Dank für Deinen Beitrag, dem ich im Grunde genommen
leider gar nichts entnehmen konnte..

Ich habe hier die Problematik das etwas im Hintergrund bearbeitet werden
soll, aber ggf. vorzeitig abgebrochen werden muss, da eine andere
Nutzeraktion eingetroffen ist und das Ergebnis des Threads nicht mehr
benötigt wird.

Zum Bsp. wird schon mal das nächste RAW-Bild decodiert usw.
Da diese Aktionen teilweise mehrere Sekunden benötigen oder weil das
externe Tool zum decodieren sich aufgehängt hat, ist es erforderlich
zu wissen ob der Thread noch wirklich arbeitet.. und muss eben gekillt
werden.

Deine Idee mit einem globales Flag ist Blödsinn und widerspricht dem
dynamischen Design beliebig viele Threads flexibel zu handeln.

Ich verwende hierfür eine Statische Variabe innerhalb des Threads die
über den erneuten Aufruf per Argument gesteuert wird. D.h. ist wird
zunächst ein BeendenSignal an den Thread gesendet und wenn der sich
nich innerhalb eines gewissen Zeitfensters selbst terminiert oder der
TimeOut Check schon fehl schlägt .. dann muss er eben per KillThread()
abgeschossen werden..

Vielleicht kannst Du ja nochmal etwas detailierter erklären, bevor Du
hier jemand "auf die Finger haust"! :twisted: