Spielerei mit Threads und Queues

Hier könnt Ihr gute, von Euch geschriebene Codes posten. Sie müssen auf jeden Fall funktionieren und sollten möglichst effizient, elegant und beispielhaft oder einfach nur cool sein.
fabulouspaul
Beiträge: 120
Registriert: 01.04.2011 21:59

Spielerei mit Threads und Queues

Beitrag von fabulouspaul »

Ich habe mir mal Gedanken gemacht, wie man eine Performance-Steigerung durch Parallelisierung erreichen kann.
Als Beispiel habe ich ein ganz einfaches Programm erstellt, das rekursiv eine Verzeichnisstruktur durchläuft und dabei aus gefundenen Bildern Thumbnails erstellt.

Als erstes habe ich die Bearbeitung in 4 einfache Aufgaben geteilt:
1. Bilder finden
2. Bilder laden
3. Thumbnail erzeugen
4. Thumbnail speichern
Wie das einfache Programm diese Aufgaben sequentiell abarbeitet kann man sich leicht vorstellen.
Bei der Verarbeitung mehrerer Bilder ist aber auch klar, dass die Aufgaben eigentlich auch parallel (wenn auch nicht alle Aufgaben für ein und dasselbe Bild) ablaufen könnten.

Weil ich schon immer mal was mit Threads und Mutex ausprobieren wollte, habe ich für jede Aufgabe ein Thread angelegt. Blieb noch die Notwendigkeit, dass sich die einzelnen Threads verständigen müssen. Da die einzelnen Aufgaben durchaus mit unterschiedlichen Laufzeiten arbeiten, schien es mir sinnvoll die Kommunikation über Queues (oder wäre Stacks der bessere Ausdruck? egal!) zu realisieren.

Während also Thread 1 fleissig Bilder sammelt, holt sich der zweite Thread nach und nach die Bilder in den Speicher. Thread 3 nimmt die geladen Bilder und erzeugt daraus Thumbnails, während der vierte Thread das Ergebnis dann abspeichert.
Um die Verarbeitung etwas zu visualisieren, habe ich noch ein paar Progressbars dazugebaut.

Unterm Strich habe ich auf meinem System bei gleicher Ressourcen-Auslastung eine Performance-Steigerung von 20% erzielt und das, obwohl der Code zum einfachen sequenziellen Programm deutlich umfangreicher geworden ist und auch noch eine schönere GUI bekommen hat.

Hier also mal der Code. Man kann sicherlich noch etliches verbessern, aber es soll ja nur eine Anregung sein. :)

Code: Alles auswählen


EnableExplicit

; Nötige En- und Decoder laden
UseJPEGImageDecoder()
UseJPEGImageEncoder()
UseTIFFImageDecoder()

Global thread_find, thread_load, thread_resize, thread_save
Global picture_path.s              ; Pfad der Bilder
Global thumb_path.s                ; Pfad der Thumbs
Global queuesize                   ; Anzahl der max. Einträge in einer Queue

Global timer                       ; Zähler für die Bearbeitungsdauer
Global f,l,r                       ; Füllstand der Queues für die GUI

; Queues enthalten einen String mit <handle>|<bildname>
Global NewList found.s()          ; Queue mit gefundenen Bildernamen (ganzer Pfad)
Global NewList loaded.s()         ; Queue mit geladenen Bildern
Global NewList resized.s()        ; Queue mit Thumbs

; Mutexe, um den Zugriff auf die einzelnen Queues zu sperren
Global mutex_found, mutex_loaded, mutex_resized

; Prozeduren zum befüllen (push) und entnehmen (pop) von Einträgen in den Queues
Declare.i push_found(entry.s)
Declare.s pop_found()
Declare.i push_loaded(entry.s)
Declare.s pop_loaded()
Declare.i push_resized(entry.s)
Declare.s pop_resized()

; Threads zur Bearbeitung der Queues
Declare find_picture(*wert)        ; Bilddateien in der Verzeichnisstruktur suchen und in die Found-Queue setzen
Declare load_picture(*wert)        ; Einträge aus der Found-Queue holen, Bilder laden und Ergebnis in die Loadad-Queue setzen
Declare resize_picture(*wert)      ; Einträge aus der Loaded-Queue holen, Thumbs generieren und Ergebnis in die Resized-Queue setzen
Declare save_thumb(*wert)          ; Einträge aus der Resized-Queue holen und Thumbs speichern
; rekursives durchsuchen einer Verzeichnisstruktur
Declare dir_runner(Path.s)         ; Kein Thread. Wird vom Thread find_picture() aufgerufen (wg. Rekursion)

; GUI-Definitionen
Global event

Enumeration FormWindow
  #Fenster
EndEnumeration

Enumeration FormGadget
  #Titel
  #Protokoll
  #Fill_found
  #Fill_loaded
  #Fill_resized
  #Count_found
  #Count_loaded
  #Count_resized
  #Fill_found_titel
  #Fill_loaded_titel
  #Fill_resized_titel
EndEnumeration

Enumeration FormFont
  #Font_0
  #Font_1
  #Font_2  
  #Font_3
EndEnumeration

LoadFont(#Font_0,"Arial", 14, #PB_Font_Bold)
LoadFont(#Font_1,"Arial", 10)
LoadFont(#Font_2,"Arial", 10, #PB_Font_Bold)
LoadFont(#Font_3,"Arial", 8)

Procedure OpenFenster(x = 0, y = 0, width = 600, height = 400)
  OpenWindow(#Fenster, x, y, width, height, "Thumbnailer (Queue-Test)", #PB_Window_SystemMenu)
  TextGadget(#Titel, 10, 10, 580, 40, "Thumbnailer (Queue-Test)", #PB_Text_Center)
  SetGadgetFont(#Titel, FontID(#Font_0))
  
  TextGadget(#Fill_found_titel, 30, 60, 150, 15, "Gefundene Dateien")
  SetGadgetFont(#Fill_found_titel, FontID(#font_3))
  ProgressBarGadget(#Fill_found, 30, 80, 480, 30, 0, 0)
  TextGadget(#Count_found, 530, 80, 60, 30, "", #PB_Text_Right)
  SetGadgetFont(#Count_found, FontID(#Font_2))
  
  TextGadget(#Fill_loaded_titel, 30, 120, 150, 15, "Geladene Dateien")
  SetGadgetFont(#Fill_loaded_titel, FontID(#font_3))
  ProgressBarGadget(#Fill_loaded, 30, 140, 480, 30, 0, 0)
  TextGadget(#Count_loaded, 530, 140, 60, 30, "", #PB_Text_Right)
  SetGadgetFont(#Count_loaded, FontID(#Font_2))
  
  TextGadget(#Fill_resized_titel, 30, 180, 150, 15, "Erstellte Thumbnails")
  SetGadgetFont(#Fill_resized_titel, FontID(#font_3))
  ProgressBarGadget(#Fill_resized, 30, 200, 480, 30, 0, 0)  
  TextGadget(#Count_resized, 530, 200, 60, 30, "", #PB_Text_Right)
  SetGadgetFont(#Count_resized, FontID(#Font_2))
  
  EditorGadget(#Protokoll, 10, 250, 580, 130, #PB_Editor_ReadOnly)
  SetGadgetFont(#Protokoll, FontID(#Font_1))

EndProcedure

Procedure Fenster_Events(event)
  Select event
    Case #PB_Event_CloseWindow
      ProcedureReturn #False
  EndSelect
  ProcedureReturn #True
EndProcedure

; -------- Main

picture_path = "D:\Test\Bilder\"
thumb_path = "D:\Test\Thumbs\"
queuesize = 100

; GUI anzeigen
OpenFenster()

; Progressbars auf die Grösse der Queues einstellen
SetGadgetAttribute(#Fill_found, #PB_ProgressBar_Maximum, queuesize)
SetGadgetAttribute(#Fill_loaded, #PB_ProgressBar_Maximum, queuesize)
SetGadgetAttribute(#Fill_resized, #PB_ProgressBar_Maximum, queuesize)

; Füllstand der Queues anzeigen
f = 0
l = 0
R = 0
SetGadgetText(#Count_found, Str(f))
SetGadgetText(#Count_loaded, Str(l))
SetGadgetText(#Count_resized, Str(r))

; Zeit nehmen
timer = ElapsedMilliseconds()

; Mutexe & Threads erzeugen
mutex_found = CreateMutex()
mutex_loaded = CreateMutex()
mutex_resized = CreateMutex()
thread_load = CreateThread(@load_picture(), 23)
thread_resize = CreateThread(@resize_picture(), 23)
thread_save = CreateThread(@save_thumb(), 23)
thread_find = CreateThread(@find_picture(), 23)

AddGadgetItem(#Protokoll, -1, "Verarbeitung gestartet...")

; Event-Loop. Erst beenden, wenn alle Threads durch sind
Repeat
  ; alle 250ms die Füllstände aktualisieren
  event = WaitWindowEvent(250)
  
  f = ListSize(found())
  l = ListSize(loaded())
  r = ListSize(resized())
  SetGadgetState(#Fill_found, f)
  SetGadgetState(#Fill_loaded, l)
  SetGadgetState(#Fill_resized, r)
  SetGadgetText(#Count_found, Str(f))
  SetGadgetText(#Count_loaded, Str(l))
  SetGadgetText(#Count_resized, Str(r))
    
Until Fenster_Events(event) = #False And (IsThread(thread_find) + IsThread(thread_load) + IsThread(thread_resize) + IsThread(thread_save) = 0)

End

; -------- Procedures
Procedure.i push_found(entry.s)
  ; Eintrag in die Found-Queue schreiben
  ; (zuletzt hinzugefügtes Element immer am Ende der Queue)
  
  ; Queue für Zugriff sperren
  LockMutex(mutex_found)
  
  ; Ist noch Platz in der Queue?
  If ListSize(found()) < queuesize
    ; neues Element am Ender der Queue anfügen
    LastElement(found())
    If AddElement(found()) = 0
      ; Fehler beim Erstellen eines neuen Queue-Eintrages
      UnlockMutex(mutex_found)                    ; Zugriff auf Queue wieder freigeben
      ProcedureReturn 0                         ; Fehler mitteilen
    EndIf
    ; neuer Eintrag wird hinzugefügt
    found() = entry
    UnlockMutex(mutex_found)                      ; Zugriff auf Queue wieder freigeben
    ProcedureReturn 1                           ; Erfolg mitteilen
  Else
    ; kein Eintrag mehr möglich
    UnlockMutex(mutex_found)                      ; Zugriff auf Queue wieder freigeben
    ProcedureReturn 0                           ; Fehler mitteilen
  EndIf  
EndProcedure

Procedure.s pop_found()
  ; Eintrag aus der Found-Queue holen
  ; ältestes Element steht am Anfang und wird beim Entnehmen aus der Queue gelöscht
  
  Protected dummy.s                              ; Eintrag aus der Queue
  
  ; Queue für Zugriff sperren
  LockMutex(mutex_found)
  
  ; ist die Queue leer?
  If ListSize(found()) > 0
    ; Element vom Anfang der Queue entnehmen
    If FirstElement(found()) = 0
      ; Fehler beim Ansteuern des ersten Queue-Eintrages
      UnlockMutex(mutex_found)                    ; Zugriff auf Queue wieder freigeben
      ProcedureReturn ""                        ; Rückgabe eines leeren Strings signalisiert Fehler
    Else
      dummy = found()                             ; Eintrag aus der Queue merken
      DeleteElement(found(), 1)                   ; Eintrag aus der Queue löschen
      UnlockMutex(mutex_found)                    ; Zugriff auf Queue wieder freigeben
      ProcedureReturn dummy                     ; Rückgabe des Queue-Eintrages signalisiert Erfolg
    EndIf  
  Else
    ; kein Element mehr verfügbar
    UnlockMutex(mutex_found)                      ; Zugriff auf Queue wieder freigeben
    ProcedureReturn ""                          ; Rückgabe eines leeren Strings signalisiert Fehler
  EndIf  
EndProcedure

Procedure.i push_loaded(entry.s)
  ; Eintrag in die Loaded-Queue schreiben
  ; (zuletzt hinzugefügtes Element immer am Ende der Queue)
  
  ; Queue für Zugriff sperren
  LockMutex(mutex_loaded)
  
  ; Ist noch Platz in der Queue?
  If ListSize(loaded()) < queuesize
    ; neues Element am Ender der Queue anfügen
    LastElement(loaded())
    If AddElement(loaded()) = 0
      ; Fehler beim Erstellen eines neuen Queue-Eintrages
      UnlockMutex(mutex_loaded)                   ; Zugriff auf Queue wieder freigeben
      ProcedureReturn 0                         ; Fehler mitteilen
    EndIf
    ; neuer Eintrag wird hinzugefügt
    loaded() = entry
    UnlockMutex(mutex_loaded)                     ; Zugriff auf Queue wieder freigeben
    ProcedureReturn 1                           ; Erfolg mitteilen
  Else
    ; kein Eintrag mehr möglich
    UnlockMutex(mutex_loaded)                     ; Zugriff auf Queue wieder freigeben
    ProcedureReturn 0                           ; Fehler mitteilen
  EndIf  
EndProcedure

Procedure.s pop_loaded()
  ; Eintrag aus der Found-Queue holen
  ; ältestes Element steht am Anfang und wird beim Entnehmen aus der Queue gelöscht
  
  Protected dummy.s                              ; Eintrag aus der Queue
  
  ; Queue für Zugriff sperren
  LockMutex(mutex_loaded)
  
  ; ist die Queue leer?
  If ListSize(loaded()) > 0
    ; Element vom Anfang der Queue entnehmen
    If FirstElement(loaded()) = 0
      ; Fehler beim Ansteuern des ersten Queue-Eintrages
      UnlockMutex(mutex_loaded)                   ; Zugriff auf Queue wieder freigeben
      ProcedureReturn ""                        ; Rückgabe eines leeren Strings signalisiert Fehler
    Else
      dummy = loaded()                            ; Eintrag aus der Queue merken
      DeleteElement(loaded(), 1)                  ; Eintrag aus der Queue löschen
      UnlockMutex(mutex_loaded)                   ; Zugriff auf Queue wieder freigeben
      ProcedureReturn dummy                     ; Rückgabe des Queue-Eintrages signalisiert Erfolg
    EndIf  
  Else
    ; kein Element mehr verfügbar
    UnlockMutex(mutex_loaded)                     ; Zugriff auf Queue wieder freigeben
    ProcedureReturn ""                          ; Rückgabe eines leeren Strings signalisiert Fehler
  EndIf  
EndProcedure

Procedure.i push_resized(entry.s)
  ; Eintrag in die Resized-Queue schreiben
  ; (zuletzt hinzugefügtes Element immer am Ende der Queue)
  
  ; Queue für Zugriff sperren
  LockMutex(mutex_resized)

  ; Ist noch Platz in der Queue?
  If ListSize(resized()) < queuesize
    ; neues Element am Ender der Queue anfügen
    LastElement(resized())
    If AddElement(resized()) = 0
      ; Fehler beim Erstellen eines neuen Queue-Eintrages
      UnlockMutex(mutex_resized)                  ; Zugriff auf Queue wieder freigeben
      ProcedureReturn 0                         ; Fehler mitteilen
    EndIf
    ; neuer Eintrag wird hnzugefügt
    resized() = entry
    UnlockMutex(mutex_resized)                    ; Zugriff auf Queue wieder freigeben
    ProcedureReturn 1                           ; Erfolg mitteilen
  Else
    ; kein Eintrag mehr möglich
    UnlockMutex(mutex_resized)                    ; Zugriff auf Queue wieder freigeben
    ProcedureReturn 0                           ; Fehler mitteilen
  EndIf  
EndProcedure

Procedure.s pop_resized()
  ; Eintrag aus der Found-Queue holen
  ; ältestes Element steht am Anfang und wird beim Entnehmen aus der Queue gelöscht
  
  Protected dummy.s                              ; Eintrag aus der Queue
  
  ; Queue für Zugriff sperren
  LockMutex(mutex_resized)
  
  ; ist die Queue leer?
  If ListSize(resized()) > 0
    ; Element vom Anfang der Queue entnehmen
    If FirstElement(resized()) = 0
      ; Fehler beim Ansteuern des letzten Queue-Eintrages
      UnlockMutex(mutex_resized)                  ; Zugriff auf Queue wieder freigeben
      ProcedureReturn ""                        ; Rückgabe eines leeren Strings signalisiert Fehler
    Else
      dummy = resized()                           ; Eintrag aus der Queue merken
      DeleteElement(resized(), 1)                 ; Eintrag aus der Queue löschen
      UnlockMutex(mutex_resized)                  ; Zugriff auf Queue wieder freigeben
      ProcedureReturn dummy                     ; Rückgabe des Queue-Eintrages signalisiert Erfolg
    EndIf  
  Else
    ; kein Element mehr verfügbar
    UnlockMutex(mutex_resized)                    ; Zugriff auf Queue wieder freigeben
    ProcedureReturn ""                          ; Rückgabe eines leeren Strings signalisiert Fehler
  EndIf  
EndProcedure

Procedure find_picture(*wert)
  ; Verzeichnisstruktur nach Bildern durchsuchen und gefundene Bilder zur Verarbeitung in die Found-Queue setzen
  ; Es werden nur die Formate JPG, TIF, und BMP berücksichtigt
  
  ; Rekursion in einer separaten Prozedur (nicht über Thread)
  dir_runner(picture_path)
  
  ; Wenn die Verzeichnisstruktur komplett durchlaufen wurde, Ende-Signal für die Threads absetzen
  While push_found("-1|**ENDE**") = 0            ; Falls die Found-Queue voll ist, 10ms warten...
    Delay(10)
  Wend  
  
  Repeat                                         ; Warten bis die restlichen Threads beendet wurden...
    Delay(100)
  Until IsThread(thread_load) + IsThread(thread_resize) + IsThread(thread_save) = 0
  
  AddGadgetItem(#Protokoll, -1, "Verarbeitung beendet.")
  
  ; Zeit stoppen
  timer = ElapsedMilliseconds() - timer
  
  AddGadgetItem(#Protokoll, -1, "Zeit: " + StrF(timer/1000,2) + " Sekunden")
EndProcedure

Procedure dir_runner(Path.s)
  ; Rekursives Durchlaufen der Verzeichnisstruktur in <Path.s>
  ; Gefundene Bilder weden in die Found-Queue eingetragen
  
  Protected dir_handle                           ; Handle für das eröffnete Verzeichnis
  
  ; Verzeichnis öffnen, alle Einträge berücksichtigen
  dir_handle = ExamineDirectory(#PB_Any, Path.s,"*.*")
  
  If dir_handle <> 0
    ; Nächsten Eintrag bearbeiten
    While NextDirectoryEntry(dir_handle)
      ; ist es eine Datei?
      If DirectoryEntryType(dir_handle) = #PB_DirectoryEntry_File
        ; nur bekannte Bildformate laden
        If FindString("JPG|TIF|BMP", UCase(GetExtensionPart(DirectoryEntryName(dir_handle)))) > 0
          ; wenn die Found-Queue voll ist erstmal 5000ms warten 
          While push_found("0|" + path + DirectoryEntryName(dir_handle)) = 0
            Delay(5000)
          Wend
        EndIf 
        ; wenn ein Verzeichniseintrag gefunden wurde diesen rekursiv durchsuchen (Vorgänger- und Stammverzeichnis-Eintrag unberücksichtigt lassen)
      ElseIf DirectoryEntryType(dir_handle) = #PB_DirectoryEntry_Directory And DirectoryEntryName(dir_handle) <> "." And DirectoryEntryName(dir_handle) <> ".."
        ; hier die Rekursion
        dir_runner(Path + DirectoryEntryName(dir_handle) + "\")
      EndIf
    Wend
    FinishDirectory(dir_handle)                   ; Verzeichnis schliessen
  EndIf
EndProcedure

Procedure load_picture(*wert)
  ; Ein gefundenes Bild laden
  ; Dazu den nächsten Eintrag aus der Found-Queue holen
  ; Nach dem Laden, die weitere Verarbeitung an die Loaded-Queue übergeben
  
  Protected found_entry.s
  Protected pic_handle
  Protected pic_name.s
  
  Repeat
    found_entry = pop_found()                     ; nächsten Eintrag aus der Queue holen
    If found_entry <> ""                          ; ein leerer Rückgabestring signalisiert einen Fehler
      
      pic_name = StringField(found_entry, 2, "|") ; Bildname inkl. Pfad im zweiten Teil des Queue-Eintrages
      
      ; Thread beenden, wenn in Queue "**ENDE**" steht
      If pic_name = "**ENDE**"
        While push_loaded("-1|" + pic_name) = 0   ; Ende-Signal weitergeben. Wenn die Loaded-Queue voll ist, 10ms waren
          Delay(10)
        Wend
        Break
      EndIf
      
      pic_handle = LoadImage(#PB_Any, pic_name)   ; Bild laden und Handle merken
      
      ; geladenes Bild in die Loaded-Queue einreihen, wenn die Loaded-Queue voll ist, 10ms waren
      While push_loaded(Str(pic_handle) + "|" + pic_name) = 0
        Delay(10)
      Wend
    Else
      Delay(10)                                   ; Queue ist leer, also 10ms warten
    EndIf
  ForEver 
EndProcedure

Procedure resize_picture(*wert)
  ; Aus einem geladenen Bild ein Thumbnail erzeugen
  ; Dazu den nächsten Eintrag aus der Loaded-Queue holen
  ; Nach der Thumbnail-Generierung, die weitere Verarbeitung an die Resized-Queue übergeben
  
  Protected loaded_entry.s
  Protected pic_handle
  Protected pic_name.s
  
  Repeat
    loaded_entry = pop_loaded()                   ; nächsten Eintrag aus der Queue holen
    If loaded_entry <> ""                         ; ein leerer Rückgabestring signalisiert einen Fehler
      ; Queue-Eintrag zerlegen in Bild-Handle und Bildname
      pic_handle = Val(StringField(loaded_entry, 1, "|"))  
      pic_name = StringField(loaded_entry, 2, "|")
      
      ; Thread beenden, wenn in Queue "**ENDE**" steht
      If pic_name = "**ENDE**"
        ; wenn die Resized-Queue voll ist, 10ms waren
        While push_resized("-1|" + pic_name) = 0  ; Ende-Signal weitergeben. Wenn die Resized-Queue voll ist, 10ms waren
          Delay(10)
        Wend
        Break
      Else
        ; nur korrekt geladene Bilder bearbeiten (Handle > 0)
        If pic_handle > 0
          ; Bild verkleinern - auf 150x150 Px um es einfach zu halten
          ResizeImage(pic_handle, 150, 150, #PB_Image_Smooth)
          
          ; Thumbnail in die Resized-Queue einreihen, wenn die Resized-Queue voll ist, 10ms warten
          While push_resized(Str(pic_handle) + "|" + pic_name) = 0
            Delay(10)
          Wend  
        Else
          AddGadgetItem(#Protokoll, -1, "Kein gültiges Bild: " + pic_name)
        EndIf
      EndIf  
    Else
      Delay(2000)                                  ; Queue ist leer, also 2000ms warten 
    EndIf
  ForEver   
EndProcedure

Procedure save_thumb(*wert)
  ; Erzeugtes Thumbnail speichern
  ; Dazu den nächsten Eintrag aus der Resized-Queue holen
  ; Thumbnail wird im JPG-Format gespeichert und erhält den Namen des Originals + .JPG
  
  Protected resized_entry.s
  Protected pic_handle
  Protected pic_name.s
  
  Repeat
    resized_entry = pop_resized()                  ; nächsten Eintrag aus der Queue holen
    If resized_entry <> ""                         ; ein leerer Rückgabestring signalisiert einen Fehler
      ; Queue-Eintrag zerlegen in Bild-Handle und Bildname
      pic_handle = Val(StringField(resized_entry, 1, "|"))  
      pic_name = StringField(resized_entry, 2, "|")
      
      ; Thread beenden, wenn in Queue "**ENDE**" steht
      If pic_name = "**ENDE**"
        Break                                      ; Ende-Signal muss nicht weitergereicht werden
      EndIf
      
      ; Thumbnail speichern und durch das Bild belegten Speicher freigeben
      SaveImage(pic_handle, thumb_path + GetFilePart(pic_name) + ".jpg", #PB_ImagePlugin_JPEG)
      FreeImage(pic_handle)
    Else
      Delay(2000)                                  ; Queue ist leer, also 2000ms warten 
    EndIf
  ForEver     
EndProcedure
Beim kompilieren daran denken, ein "Thread-sicheres Executable" zu erstellen.

Viel Spaß
Paul
Derren
Beiträge: 558
Registriert: 23.07.2011 02:08

Re: Spielerei mit Threads und Queues

Beitrag von Derren »

Coole Idee. Hab's zwar noch nicht angeschaut, aber hab mich auch noch nicht wirklich mit Threads beschäftigt (leider).
So ein einfaches Beispiel wird mir bestimmt eine große Hilfe sein. Danke für die Mühe und für's Teilen :)
Signatur und so
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8812
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 64 GB DDR4-3200
Ubuntu 24.04.2 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken

Re: Spielerei mit Threads und Queues

Beitrag von NicTheQuick »

Das mit den Delays ist noch sehr ineffizient. Normalerweise arbeitet man hier mit Semaphoren oder mit Locks in Verbindung mit Bedingungen. So kann der Schreibthread dem Lesethread immer mitteilen, wenn wieder was neues in die Queue gelegt wurde und der Lesethread tut dann tatsächlich solange nichts bis er diese Information bekommt. Mit den Delays kann es ja passieren, dass 1ms nachdem das Delay startet, etwas in die Queue gelegt wird, und dann wartet der Lesethread aber trotzdem noch seine 19 ms, obwohl er es nicht müsste.
Benutzeravatar
hjbremer
Beiträge: 822
Registriert: 27.02.2006 22:30
Computerausstattung: von gestern
Wohnort: Neumünster

Re: Spielerei mit Threads und Queues

Beitrag von hjbremer »

:allright: sehr schön
Purebasic 5.70 x86 5.72 X 64 - Windows 10

Der Computer hat dem menschlichen Gehirn gegenüber nur einen Vorteil: Er wird benutzt
grüße hjbremer
Benutzeravatar
NicTheQuick
Ein Admin
Beiträge: 8812
Registriert: 29.08.2004 20:20
Computerausstattung: Ryzen 7 5800X, 64 GB DDR4-3200
Ubuntu 24.04.2 LTS
GeForce RTX 3080 Ti
Wohnort: Saarbrücken

Re: Spielerei mit Threads und Queues

Beitrag von NicTheQuick »

Ich hab gerade mal eine Concurrent Queue als Modul und Interface gebaut, wenn du interessiert bist. Ich habe sie noch nicht ausgiebig getestet, aber ich denke es ist korrekt. Du kannst ja mal testen, ob es damit nochmal einen Tick schneller wird.

Code: Alles auswählen

EnableExplicit

DeclareModule ConcurrentQueue
	Structure Queue
		notEmptyCondition.i
		notFullCondition.i
		lock.i
		maxSize.i
		elementSize.i
		List *elements()
	EndStructure
	
	#UNLIMITED = -1
	#STORE_POINTER = 0
	
	Declare.i new(elementSize.i = 0, maxSize.i = #UNLIMITED)
	Declare.i create(elementSize.i = #STORE_POINTER, maxSize.i = #UNLIMITED)
	Declare.i push(*queue, *element, block.i = #True)
	Declare.i pop(*queue,  *doNotBlock.Integer = 0)
	Declare.i free(*queue)
	
	Interface IQueue
		push.i(*element, block.i = #True)
		pop.i(*doNotBlock.Integer = 0)
		destroy()
	EndInterface
EndDeclareModule

Module ConcurrentQueue
	Structure IQueue_S
		*vTable
		*q.Queue
	EndStructure
	
	Procedure.i ipush(*this.IQueue_S, *element, block = #True)
		ProcedureReturn push(*this\q, *element, block)
	EndProcedure
	
	Procedure.i ipop(*this.IQueue_S, *doNotBlock.Integer = 0)
		ProcedureReturn pop(*this\q, *doNotBlock)
	EndProcedure
	
	Procedure.i idestroy(*this.IQueue_S)
		free(*this\q)
		FreeMemory(*this)
		ProcedureReturn #True
	EndProcedure
	
	DataSection
		I_vTable:
			Data.i @ipush(), @ipop(), @idestroy()
	EndDataSection
	
	Procedure.i new(elementSize.i = 0, maxSize.i = #UNLIMITED)
		Protected *this.IQueue_S = AllocateMemory(SizeOf(IQueue_S))
		If (Not *this)
			ProcedureReturn #False
		EndIf
		*this\vTable = ?I_vTable
		*this\q = create(elementSize, maxSize)
		If (Not *this\q)
			FreeMemory(*this)
			ProcedureReturn #False
		EndIf
		
		ProcedureReturn *this
	EndProcedure
	
	Procedure create(elementSize.i = 0, maxSize.i = #UNLIMITED)
		Protected *queue.Queue = AllocateMemory(SizeOf(Queue))
		If (Not *queue)
			ProcedureReturn #False
		EndIf
		InitializeStructure(*queue, Queue)
		
		If (maxSize < 0)
			maxSize = #UNLIMITED
		EndIf
		
		With *queue
			\lock = CreateMutex()
			\notEmptyCondition = CreateSemaphore(0)
			If (maxSize <> #UNLIMITED)
				\notFullCondition = CreateSemaphore(maxSize)
			Else
				\notFullCondition = 0
			EndIf
			\maxSize = maxSize
			\elementSize = elementSize
		EndWith
		
		ProcedureReturn *queue
	EndProcedure
	
	Procedure.i push(*queue.Queue, *element, block.i = #True)
		Protected *copy = *element
		With *queue
			If (\elementSize)
				*copy = AllocateMemory(\elementSize)
				If (Not *copy)
					ProcedureReturn #False
				EndIf
				CopyMemory(*element, *copy, \elementSize)
			EndIf
			
			LockMutex(\lock)
			LastElement(\elements())
			If (AddElement(\elements()))
				\elements() = *copy
				If (\notFullCondition)
					If (Not TrySemaphore(\notFullCondition))
						UnlockMutex(\lock)
						If (Not block)
							ProcedureReturn 0
						EndIf
						WaitSemaphore(\notFullCondition)
						LockMutex(\lock)
					EndIf
				EndIf
				SignalSemaphore(\notEmptyCondition)
			Else
				UnlockMutex(\lock)
				ProcedureReturn #False
			EndIf
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn #True
	EndProcedure
	
	Procedure.i pop(*queue.Queue, *doNotBlock.Integer = 0)
		Protected *copy
		With *queue
			LockMutex(\lock)
			If (Not TrySemaphore(\notEmptyCondition))
				UnlockMutex(\lock)
				If (*doNotBlock)
					ProcedureReturn #False
				EndIf
				WaitSemaphore(\notEmptyCondition)
				LockMutex(\lock)
			EndIf
				
			If FirstElement(\elements())
				*copy = \elements()
				If (*doNotBlock)
					*doNotBlock\i = *copy
				EndIf
				
				DeleteElement(\elements())
				If (\notFullCondition)
					SignalSemaphore(\notFullCondition)
				EndIf
				UnlockMutex(\lock)
				If (*doNotBlock)
					ProcedureReturn #True
				Else
					ProcedureReturn *copy
				EndIf
			EndIf
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn #False
	EndProcedure
	
	Procedure.i free(*queue.Queue)
		ClearStructure(*queue, Queue)
		FreeMemory(*queue)
		ProcedureReturn #True
	EndProcedure
EndModule

; Als normales Modul
Define *q = ConcurrentQueue::create(ConcurrentQueue::#STORE_POINTER, 2)

Debug ConcurrentQueue::push(*q, 12)
Debug ConcurrentQueue::push(*q, 23)
Debug ConcurrentQueue::push(*q, 34, #False)
Debug ConcurrentQueue::pop(*q)
Debug ConcurrentQueue::push(*q, 34, #False)
Debug ConcurrentQueue::pop(*q)
Define v.i
Debug ConcurrentQueue::pop(*q, @v)
Debug v
ConcurrentQueue::free(*q)

; Als Interface
Debug ""
Debug "Als Interface"
Define q.ConcurrentQueue::IQueue = ConcurrentQueue::new(ConcurrentQueue::#STORE_POINTER, 2)

Debug q\push(12)
Debug q\push(23)
Debug q\push(34, #False)
Debug q\pop()
Debug q\push(34, #False)
Debug q\pop()
Debug q\pop(@v)
Debug v
q\destroy()
fabulouspaul
Beiträge: 120
Registriert: 01.04.2011 21:59

Re: Spielerei mit Threads und Queues

Beitrag von fabulouspaul »

NicTheQuick hat geschrieben:Das mit den Delays ist noch sehr ineffizient. Normalerweise arbeitet man hier mit Semaphoren oder mit Locks in Verbindung mit Bedingungen. So kann der Schreibthread dem Lesethread immer mitteilen, wenn wieder was neues in die Queue gelegt wurde und der Lesethread tut dann tatsächlich solange nichts bis er diese Information bekommt. Mit den Delays kann es ja passieren, dass 1ms nachdem das Delay startet, etwas in die Queue gelegt wird, und dann wartet der Lesethread aber trotzdem noch seine 19 ms, obwohl er es nicht müsste.
Danke für den Hinweis!
Bis zu den Semaphoren war ich bei meinen Thread-Experimenten noch nicht vorgedrungen :wink:

Ich habe die arbeitenden Threads mit Semaphoren für die Synchronisation ausgestattet und dafür die Pausen bei leeren Queues entfernt.
Dadurch konnte ich nochmal eine kleine Verbesserung in der Performance erzielen (ca. 4%). :allright: Allerdings ist mir auch aufgefallen, dass die Ressourcennutzung sich deutlich steigert (ca. 20%).

Genau das passiert auch, wenn ich ohne Semaphoren arbeite und die Delay()-Funktion auskommentiere, die bei einer leeren Queue wartet, bevor die Queue wieder überprüft wird. Es gilt also abzuwägen, ob man die Performancesteigerung auf Basis der erhöhten Ressourcennutzung erkaufen möchte.

Aber vielleicht habe ich das Konzept der Semaphoren noch nicht völlig verstanden und man muss diese anders anwenden.
fabulouspaul
Beiträge: 120
Registriert: 01.04.2011 21:59

Re: Spielerei mit Threads und Queues

Beitrag von fabulouspaul »

NicTheQuick hat geschrieben:Ich hab gerade mal eine Concurrent Queue als Modul und Interface gebaut, wenn du interessiert bist. Ich habe sie noch nicht ausgiebig getestet, aber ich denke es ist korrekt. Du kannst ja mal testen, ob es damit nochmal einen Tick schneller wird.
Danke! :allright:
Aber den Code muss ich mir mal in Ruhe zu Gemüte führen um ihn zu verstehen.
Antworten