Spielerei mit Threads und Queues
Verfasst: 04.01.2014 20:42
Ich habe mir mal Gedanken gemacht, wie man eine Performance-Steigerung durch Parallelisierung erreichen kann.
Als Beispiel habe ich ein ganz einfaches Programm erstellt, das rekursiv eine Verzeichnisstruktur durchläuft und dabei aus gefundenen Bildern Thumbnails erstellt.
Als erstes habe ich die Bearbeitung in 4 einfache Aufgaben geteilt:
1. Bilder finden
2. Bilder laden
3. Thumbnail erzeugen
4. Thumbnail speichern
Wie das einfache Programm diese Aufgaben sequentiell abarbeitet kann man sich leicht vorstellen.
Bei der Verarbeitung mehrerer Bilder ist aber auch klar, dass die Aufgaben eigentlich auch parallel (wenn auch nicht alle Aufgaben für ein und dasselbe Bild) ablaufen könnten.
Weil ich schon immer mal was mit Threads und Mutex ausprobieren wollte, habe ich für jede Aufgabe ein Thread angelegt. Blieb noch die Notwendigkeit, dass sich die einzelnen Threads verständigen müssen. Da die einzelnen Aufgaben durchaus mit unterschiedlichen Laufzeiten arbeiten, schien es mir sinnvoll die Kommunikation über Queues (oder wäre Stacks der bessere Ausdruck? egal!) zu realisieren.
Während also Thread 1 fleissig Bilder sammelt, holt sich der zweite Thread nach und nach die Bilder in den Speicher. Thread 3 nimmt die geladen Bilder und erzeugt daraus Thumbnails, während der vierte Thread das Ergebnis dann abspeichert.
Um die Verarbeitung etwas zu visualisieren, habe ich noch ein paar Progressbars dazugebaut.
Unterm Strich habe ich auf meinem System bei gleicher Ressourcen-Auslastung eine Performance-Steigerung von 20% erzielt und das, obwohl der Code zum einfachen sequenziellen Programm deutlich umfangreicher geworden ist und auch noch eine schönere GUI bekommen hat.
Hier also mal der Code. Man kann sicherlich noch etliches verbessern, aber es soll ja nur eine Anregung sein.
Beim kompilieren daran denken, ein "Thread-sicheres Executable" zu erstellen.
Viel Spaß
Paul
Als Beispiel habe ich ein ganz einfaches Programm erstellt, das rekursiv eine Verzeichnisstruktur durchläuft und dabei aus gefundenen Bildern Thumbnails erstellt.
Als erstes habe ich die Bearbeitung in 4 einfache Aufgaben geteilt:
1. Bilder finden
2. Bilder laden
3. Thumbnail erzeugen
4. Thumbnail speichern
Wie das einfache Programm diese Aufgaben sequentiell abarbeitet kann man sich leicht vorstellen.
Bei der Verarbeitung mehrerer Bilder ist aber auch klar, dass die Aufgaben eigentlich auch parallel (wenn auch nicht alle Aufgaben für ein und dasselbe Bild) ablaufen könnten.
Weil ich schon immer mal was mit Threads und Mutex ausprobieren wollte, habe ich für jede Aufgabe ein Thread angelegt. Blieb noch die Notwendigkeit, dass sich die einzelnen Threads verständigen müssen. Da die einzelnen Aufgaben durchaus mit unterschiedlichen Laufzeiten arbeiten, schien es mir sinnvoll die Kommunikation über Queues (oder wäre Stacks der bessere Ausdruck? egal!) zu realisieren.
Während also Thread 1 fleissig Bilder sammelt, holt sich der zweite Thread nach und nach die Bilder in den Speicher. Thread 3 nimmt die geladen Bilder und erzeugt daraus Thumbnails, während der vierte Thread das Ergebnis dann abspeichert.
Um die Verarbeitung etwas zu visualisieren, habe ich noch ein paar Progressbars dazugebaut.
Unterm Strich habe ich auf meinem System bei gleicher Ressourcen-Auslastung eine Performance-Steigerung von 20% erzielt und das, obwohl der Code zum einfachen sequenziellen Programm deutlich umfangreicher geworden ist und auch noch eine schönere GUI bekommen hat.
Hier also mal der Code. Man kann sicherlich noch etliches verbessern, aber es soll ja nur eine Anregung sein.

Code: Alles auswählen
EnableExplicit
; Nötige En- und Decoder laden
UseJPEGImageDecoder()
UseJPEGImageEncoder()
UseTIFFImageDecoder()
Global thread_find, thread_load, thread_resize, thread_save
Global picture_path.s ; Pfad der Bilder
Global thumb_path.s ; Pfad der Thumbs
Global queuesize ; Anzahl der max. Einträge in einer Queue
Global timer ; Zähler für die Bearbeitungsdauer
Global f,l,r ; Füllstand der Queues für die GUI
; Queues enthalten einen String mit <handle>|<bildname>
Global NewList found.s() ; Queue mit gefundenen Bildernamen (ganzer Pfad)
Global NewList loaded.s() ; Queue mit geladenen Bildern
Global NewList resized.s() ; Queue mit Thumbs
; Mutexe, um den Zugriff auf die einzelnen Queues zu sperren
Global mutex_found, mutex_loaded, mutex_resized
; Prozeduren zum befüllen (push) und entnehmen (pop) von Einträgen in den Queues
Declare.i push_found(entry.s)
Declare.s pop_found()
Declare.i push_loaded(entry.s)
Declare.s pop_loaded()
Declare.i push_resized(entry.s)
Declare.s pop_resized()
; Threads zur Bearbeitung der Queues
Declare find_picture(*wert) ; Bilddateien in der Verzeichnisstruktur suchen und in die Found-Queue setzen
Declare load_picture(*wert) ; Einträge aus der Found-Queue holen, Bilder laden und Ergebnis in die Loadad-Queue setzen
Declare resize_picture(*wert) ; Einträge aus der Loaded-Queue holen, Thumbs generieren und Ergebnis in die Resized-Queue setzen
Declare save_thumb(*wert) ; Einträge aus der Resized-Queue holen und Thumbs speichern
; rekursives durchsuchen einer Verzeichnisstruktur
Declare dir_runner(Path.s) ; Kein Thread. Wird vom Thread find_picture() aufgerufen (wg. Rekursion)
; GUI-Definitionen
Global event
Enumeration FormWindow
#Fenster
EndEnumeration
Enumeration FormGadget
#Titel
#Protokoll
#Fill_found
#Fill_loaded
#Fill_resized
#Count_found
#Count_loaded
#Count_resized
#Fill_found_titel
#Fill_loaded_titel
#Fill_resized_titel
EndEnumeration
Enumeration FormFont
#Font_0
#Font_1
#Font_2
#Font_3
EndEnumeration
LoadFont(#Font_0,"Arial", 14, #PB_Font_Bold)
LoadFont(#Font_1,"Arial", 10)
LoadFont(#Font_2,"Arial", 10, #PB_Font_Bold)
LoadFont(#Font_3,"Arial", 8)
Procedure OpenFenster(x = 0, y = 0, width = 600, height = 400)
OpenWindow(#Fenster, x, y, width, height, "Thumbnailer (Queue-Test)", #PB_Window_SystemMenu)
TextGadget(#Titel, 10, 10, 580, 40, "Thumbnailer (Queue-Test)", #PB_Text_Center)
SetGadgetFont(#Titel, FontID(#Font_0))
TextGadget(#Fill_found_titel, 30, 60, 150, 15, "Gefundene Dateien")
SetGadgetFont(#Fill_found_titel, FontID(#font_3))
ProgressBarGadget(#Fill_found, 30, 80, 480, 30, 0, 0)
TextGadget(#Count_found, 530, 80, 60, 30, "", #PB_Text_Right)
SetGadgetFont(#Count_found, FontID(#Font_2))
TextGadget(#Fill_loaded_titel, 30, 120, 150, 15, "Geladene Dateien")
SetGadgetFont(#Fill_loaded_titel, FontID(#font_3))
ProgressBarGadget(#Fill_loaded, 30, 140, 480, 30, 0, 0)
TextGadget(#Count_loaded, 530, 140, 60, 30, "", #PB_Text_Right)
SetGadgetFont(#Count_loaded, FontID(#Font_2))
TextGadget(#Fill_resized_titel, 30, 180, 150, 15, "Erstellte Thumbnails")
SetGadgetFont(#Fill_resized_titel, FontID(#font_3))
ProgressBarGadget(#Fill_resized, 30, 200, 480, 30, 0, 0)
TextGadget(#Count_resized, 530, 200, 60, 30, "", #PB_Text_Right)
SetGadgetFont(#Count_resized, FontID(#Font_2))
EditorGadget(#Protokoll, 10, 250, 580, 130, #PB_Editor_ReadOnly)
SetGadgetFont(#Protokoll, FontID(#Font_1))
EndProcedure
Procedure Fenster_Events(event)
Select event
Case #PB_Event_CloseWindow
ProcedureReturn #False
EndSelect
ProcedureReturn #True
EndProcedure
; -------- Main
picture_path = "D:\Test\Bilder\"
thumb_path = "D:\Test\Thumbs\"
queuesize = 100
; GUI anzeigen
OpenFenster()
; Progressbars auf die Grösse der Queues einstellen
SetGadgetAttribute(#Fill_found, #PB_ProgressBar_Maximum, queuesize)
SetGadgetAttribute(#Fill_loaded, #PB_ProgressBar_Maximum, queuesize)
SetGadgetAttribute(#Fill_resized, #PB_ProgressBar_Maximum, queuesize)
; Füllstand der Queues anzeigen
f = 0
l = 0
R = 0
SetGadgetText(#Count_found, Str(f))
SetGadgetText(#Count_loaded, Str(l))
SetGadgetText(#Count_resized, Str(r))
; Zeit nehmen
timer = ElapsedMilliseconds()
; Mutexe & Threads erzeugen
mutex_found = CreateMutex()
mutex_loaded = CreateMutex()
mutex_resized = CreateMutex()
thread_load = CreateThread(@load_picture(), 23)
thread_resize = CreateThread(@resize_picture(), 23)
thread_save = CreateThread(@save_thumb(), 23)
thread_find = CreateThread(@find_picture(), 23)
AddGadgetItem(#Protokoll, -1, "Verarbeitung gestartet...")
; Event-Loop. Erst beenden, wenn alle Threads durch sind
Repeat
; alle 250ms die Füllstände aktualisieren
event = WaitWindowEvent(250)
f = ListSize(found())
l = ListSize(loaded())
r = ListSize(resized())
SetGadgetState(#Fill_found, f)
SetGadgetState(#Fill_loaded, l)
SetGadgetState(#Fill_resized, r)
SetGadgetText(#Count_found, Str(f))
SetGadgetText(#Count_loaded, Str(l))
SetGadgetText(#Count_resized, Str(r))
Until Fenster_Events(event) = #False And (IsThread(thread_find) + IsThread(thread_load) + IsThread(thread_resize) + IsThread(thread_save) = 0)
End
; -------- Procedures
Procedure.i push_found(entry.s)
; Eintrag in die Found-Queue schreiben
; (zuletzt hinzugefügtes Element immer am Ende der Queue)
; Queue für Zugriff sperren
LockMutex(mutex_found)
; Ist noch Platz in der Queue?
If ListSize(found()) < queuesize
; neues Element am Ender der Queue anfügen
LastElement(found())
If AddElement(found()) = 0
; Fehler beim Erstellen eines neuen Queue-Eintrages
UnlockMutex(mutex_found) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 0 ; Fehler mitteilen
EndIf
; neuer Eintrag wird hinzugefügt
found() = entry
UnlockMutex(mutex_found) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 1 ; Erfolg mitteilen
Else
; kein Eintrag mehr möglich
UnlockMutex(mutex_found) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 0 ; Fehler mitteilen
EndIf
EndProcedure
Procedure.s pop_found()
; Eintrag aus der Found-Queue holen
; ältestes Element steht am Anfang und wird beim Entnehmen aus der Queue gelöscht
Protected dummy.s ; Eintrag aus der Queue
; Queue für Zugriff sperren
LockMutex(mutex_found)
; ist die Queue leer?
If ListSize(found()) > 0
; Element vom Anfang der Queue entnehmen
If FirstElement(found()) = 0
; Fehler beim Ansteuern des ersten Queue-Eintrages
UnlockMutex(mutex_found) ; Zugriff auf Queue wieder freigeben
ProcedureReturn "" ; Rückgabe eines leeren Strings signalisiert Fehler
Else
dummy = found() ; Eintrag aus der Queue merken
DeleteElement(found(), 1) ; Eintrag aus der Queue löschen
UnlockMutex(mutex_found) ; Zugriff auf Queue wieder freigeben
ProcedureReturn dummy ; Rückgabe des Queue-Eintrages signalisiert Erfolg
EndIf
Else
; kein Element mehr verfügbar
UnlockMutex(mutex_found) ; Zugriff auf Queue wieder freigeben
ProcedureReturn "" ; Rückgabe eines leeren Strings signalisiert Fehler
EndIf
EndProcedure
Procedure.i push_loaded(entry.s)
; Eintrag in die Loaded-Queue schreiben
; (zuletzt hinzugefügtes Element immer am Ende der Queue)
; Queue für Zugriff sperren
LockMutex(mutex_loaded)
; Ist noch Platz in der Queue?
If ListSize(loaded()) < queuesize
; neues Element am Ender der Queue anfügen
LastElement(loaded())
If AddElement(loaded()) = 0
; Fehler beim Erstellen eines neuen Queue-Eintrages
UnlockMutex(mutex_loaded) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 0 ; Fehler mitteilen
EndIf
; neuer Eintrag wird hinzugefügt
loaded() = entry
UnlockMutex(mutex_loaded) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 1 ; Erfolg mitteilen
Else
; kein Eintrag mehr möglich
UnlockMutex(mutex_loaded) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 0 ; Fehler mitteilen
EndIf
EndProcedure
Procedure.s pop_loaded()
; Eintrag aus der Found-Queue holen
; ältestes Element steht am Anfang und wird beim Entnehmen aus der Queue gelöscht
Protected dummy.s ; Eintrag aus der Queue
; Queue für Zugriff sperren
LockMutex(mutex_loaded)
; ist die Queue leer?
If ListSize(loaded()) > 0
; Element vom Anfang der Queue entnehmen
If FirstElement(loaded()) = 0
; Fehler beim Ansteuern des ersten Queue-Eintrages
UnlockMutex(mutex_loaded) ; Zugriff auf Queue wieder freigeben
ProcedureReturn "" ; Rückgabe eines leeren Strings signalisiert Fehler
Else
dummy = loaded() ; Eintrag aus der Queue merken
DeleteElement(loaded(), 1) ; Eintrag aus der Queue löschen
UnlockMutex(mutex_loaded) ; Zugriff auf Queue wieder freigeben
ProcedureReturn dummy ; Rückgabe des Queue-Eintrages signalisiert Erfolg
EndIf
Else
; kein Element mehr verfügbar
UnlockMutex(mutex_loaded) ; Zugriff auf Queue wieder freigeben
ProcedureReturn "" ; Rückgabe eines leeren Strings signalisiert Fehler
EndIf
EndProcedure
Procedure.i push_resized(entry.s)
; Eintrag in die Resized-Queue schreiben
; (zuletzt hinzugefügtes Element immer am Ende der Queue)
; Queue für Zugriff sperren
LockMutex(mutex_resized)
; Ist noch Platz in der Queue?
If ListSize(resized()) < queuesize
; neues Element am Ender der Queue anfügen
LastElement(resized())
If AddElement(resized()) = 0
; Fehler beim Erstellen eines neuen Queue-Eintrages
UnlockMutex(mutex_resized) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 0 ; Fehler mitteilen
EndIf
; neuer Eintrag wird hnzugefügt
resized() = entry
UnlockMutex(mutex_resized) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 1 ; Erfolg mitteilen
Else
; kein Eintrag mehr möglich
UnlockMutex(mutex_resized) ; Zugriff auf Queue wieder freigeben
ProcedureReturn 0 ; Fehler mitteilen
EndIf
EndProcedure
Procedure.s pop_resized()
; Eintrag aus der Found-Queue holen
; ältestes Element steht am Anfang und wird beim Entnehmen aus der Queue gelöscht
Protected dummy.s ; Eintrag aus der Queue
; Queue für Zugriff sperren
LockMutex(mutex_resized)
; ist die Queue leer?
If ListSize(resized()) > 0
; Element vom Anfang der Queue entnehmen
If FirstElement(resized()) = 0
; Fehler beim Ansteuern des letzten Queue-Eintrages
UnlockMutex(mutex_resized) ; Zugriff auf Queue wieder freigeben
ProcedureReturn "" ; Rückgabe eines leeren Strings signalisiert Fehler
Else
dummy = resized() ; Eintrag aus der Queue merken
DeleteElement(resized(), 1) ; Eintrag aus der Queue löschen
UnlockMutex(mutex_resized) ; Zugriff auf Queue wieder freigeben
ProcedureReturn dummy ; Rückgabe des Queue-Eintrages signalisiert Erfolg
EndIf
Else
; kein Element mehr verfügbar
UnlockMutex(mutex_resized) ; Zugriff auf Queue wieder freigeben
ProcedureReturn "" ; Rückgabe eines leeren Strings signalisiert Fehler
EndIf
EndProcedure
Procedure find_picture(*wert)
; Verzeichnisstruktur nach Bildern durchsuchen und gefundene Bilder zur Verarbeitung in die Found-Queue setzen
; Es werden nur die Formate JPG, TIF, und BMP berücksichtigt
; Rekursion in einer separaten Prozedur (nicht über Thread)
dir_runner(picture_path)
; Wenn die Verzeichnisstruktur komplett durchlaufen wurde, Ende-Signal für die Threads absetzen
While push_found("-1|**ENDE**") = 0 ; Falls die Found-Queue voll ist, 10ms warten...
Delay(10)
Wend
Repeat ; Warten bis die restlichen Threads beendet wurden...
Delay(100)
Until IsThread(thread_load) + IsThread(thread_resize) + IsThread(thread_save) = 0
AddGadgetItem(#Protokoll, -1, "Verarbeitung beendet.")
; Zeit stoppen
timer = ElapsedMilliseconds() - timer
AddGadgetItem(#Protokoll, -1, "Zeit: " + StrF(timer/1000,2) + " Sekunden")
EndProcedure
Procedure dir_runner(Path.s)
; Rekursives Durchlaufen der Verzeichnisstruktur in <Path.s>
; Gefundene Bilder weden in die Found-Queue eingetragen
Protected dir_handle ; Handle für das eröffnete Verzeichnis
; Verzeichnis öffnen, alle Einträge berücksichtigen
dir_handle = ExamineDirectory(#PB_Any, Path.s,"*.*")
If dir_handle <> 0
; Nächsten Eintrag bearbeiten
While NextDirectoryEntry(dir_handle)
; ist es eine Datei?
If DirectoryEntryType(dir_handle) = #PB_DirectoryEntry_File
; nur bekannte Bildformate laden
If FindString("JPG|TIF|BMP", UCase(GetExtensionPart(DirectoryEntryName(dir_handle)))) > 0
; wenn die Found-Queue voll ist erstmal 5000ms warten
While push_found("0|" + path + DirectoryEntryName(dir_handle)) = 0
Delay(5000)
Wend
EndIf
; wenn ein Verzeichniseintrag gefunden wurde diesen rekursiv durchsuchen (Vorgänger- und Stammverzeichnis-Eintrag unberücksichtigt lassen)
ElseIf DirectoryEntryType(dir_handle) = #PB_DirectoryEntry_Directory And DirectoryEntryName(dir_handle) <> "." And DirectoryEntryName(dir_handle) <> ".."
; hier die Rekursion
dir_runner(Path + DirectoryEntryName(dir_handle) + "\")
EndIf
Wend
FinishDirectory(dir_handle) ; Verzeichnis schliessen
EndIf
EndProcedure
Procedure load_picture(*wert)
; Ein gefundenes Bild laden
; Dazu den nächsten Eintrag aus der Found-Queue holen
; Nach dem Laden, die weitere Verarbeitung an die Loaded-Queue übergeben
Protected found_entry.s
Protected pic_handle
Protected pic_name.s
Repeat
found_entry = pop_found() ; nächsten Eintrag aus der Queue holen
If found_entry <> "" ; ein leerer Rückgabestring signalisiert einen Fehler
pic_name = StringField(found_entry, 2, "|") ; Bildname inkl. Pfad im zweiten Teil des Queue-Eintrages
; Thread beenden, wenn in Queue "**ENDE**" steht
If pic_name = "**ENDE**"
While push_loaded("-1|" + pic_name) = 0 ; Ende-Signal weitergeben. Wenn die Loaded-Queue voll ist, 10ms waren
Delay(10)
Wend
Break
EndIf
pic_handle = LoadImage(#PB_Any, pic_name) ; Bild laden und Handle merken
; geladenes Bild in die Loaded-Queue einreihen, wenn die Loaded-Queue voll ist, 10ms waren
While push_loaded(Str(pic_handle) + "|" + pic_name) = 0
Delay(10)
Wend
Else
Delay(10) ; Queue ist leer, also 10ms warten
EndIf
ForEver
EndProcedure
Procedure resize_picture(*wert)
; Aus einem geladenen Bild ein Thumbnail erzeugen
; Dazu den nächsten Eintrag aus der Loaded-Queue holen
; Nach der Thumbnail-Generierung, die weitere Verarbeitung an die Resized-Queue übergeben
Protected loaded_entry.s
Protected pic_handle
Protected pic_name.s
Repeat
loaded_entry = pop_loaded() ; nächsten Eintrag aus der Queue holen
If loaded_entry <> "" ; ein leerer Rückgabestring signalisiert einen Fehler
; Queue-Eintrag zerlegen in Bild-Handle und Bildname
pic_handle = Val(StringField(loaded_entry, 1, "|"))
pic_name = StringField(loaded_entry, 2, "|")
; Thread beenden, wenn in Queue "**ENDE**" steht
If pic_name = "**ENDE**"
; wenn die Resized-Queue voll ist, 10ms waren
While push_resized("-1|" + pic_name) = 0 ; Ende-Signal weitergeben. Wenn die Resized-Queue voll ist, 10ms waren
Delay(10)
Wend
Break
Else
; nur korrekt geladene Bilder bearbeiten (Handle > 0)
If pic_handle > 0
; Bild verkleinern - auf 150x150 Px um es einfach zu halten
ResizeImage(pic_handle, 150, 150, #PB_Image_Smooth)
; Thumbnail in die Resized-Queue einreihen, wenn die Resized-Queue voll ist, 10ms warten
While push_resized(Str(pic_handle) + "|" + pic_name) = 0
Delay(10)
Wend
Else
AddGadgetItem(#Protokoll, -1, "Kein gültiges Bild: " + pic_name)
EndIf
EndIf
Else
Delay(2000) ; Queue ist leer, also 2000ms warten
EndIf
ForEver
EndProcedure
Procedure save_thumb(*wert)
; Erzeugtes Thumbnail speichern
; Dazu den nächsten Eintrag aus der Resized-Queue holen
; Thumbnail wird im JPG-Format gespeichert und erhält den Namen des Originals + .JPG
Protected resized_entry.s
Protected pic_handle
Protected pic_name.s
Repeat
resized_entry = pop_resized() ; nächsten Eintrag aus der Queue holen
If resized_entry <> "" ; ein leerer Rückgabestring signalisiert einen Fehler
; Queue-Eintrag zerlegen in Bild-Handle und Bildname
pic_handle = Val(StringField(resized_entry, 1, "|"))
pic_name = StringField(resized_entry, 2, "|")
; Thread beenden, wenn in Queue "**ENDE**" steht
If pic_name = "**ENDE**"
Break ; Ende-Signal muss nicht weitergereicht werden
EndIf
; Thumbnail speichern und durch das Bild belegten Speicher freigeben
SaveImage(pic_handle, thumb_path + GetFilePart(pic_name) + ".jpg", #PB_ImagePlugin_JPEG)
FreeImage(pic_handle)
Else
Delay(2000) ; Queue ist leer, also 2000ms warten
EndIf
ForEver
EndProcedure
Viel Spaß
Paul