Seite 1 von 1
MySQL-Anfrage in Thread auslagern
Verfasst: 28.02.2016 19:37
von techniker
Hallo!
in einer aktuellen Anwendung muss ich zeitraubende DB-Anfragen zyklisch aufrufen, wobei mir dann leider der Eventhandler bzw. die Anwendung hängt.
Ich wollte/möchte somit die DB-Querys in einzelne Threads auslagern, um zugleich eine Performancesteigerung zu erzielen.
Die Interaktion zwischen der Hauptanwendung und den Threads wollte ich mit zwei Queues lösen (Query und Response).
Für die eindeutige Zuordnung zwischen Anfrage und Antwort hätte ich zufällige UniqueID's angedacht.
Mein Problem:
Die Antworten sind leider komplett dynamisch - Also vom Typ (momentan String, Long, Quad, Float), vom Variablennamen, dem Wert selbst und der Anzahl der rückgegeben Werte..
Ich wollte eine Queue wie etwa diese hier anwenden:
https://freakscorner.de/2014/01/26/pb-w ... rentqueue/
Wie gebe ich jedoch die Antwort an das Hauptprogramm zurück?
Hat hier jemand einen Tipp für mich? Ich grüble nun bereits seit mehreren Tagen und finde keine passable Lösung..
(Meine letzte Idee war es, das Ergebnis in eine Datei im temp. Verzeichnis zu schreiben und diese später im Hauptprogramm auszuwerten. Jedoch ist das nicht gerade effektiv..)
Re: MySQL-Anfrage in Thread auslagern
Verfasst: 28.02.2016 19:45
von Derren
Du musst doch wissen, wie du mit den Daten aus der DB umgehst. Wie hast du es dann ohne Thread gemacht?
Eine beliebige Anzahl an Parameter zurückgeben würde ich über eine LinkedList lösen.
Code: Alles auswählen
Structure dataStructure
type.i
*data
EndStructure
NewList mylist.dataStructure()
AddElement(myList())
myList()\type = #PB_Quad
myList()\data = AllocateMemory(SizeOf(Quad))
PokeQ(myList()\data, 18975867856348)
If myList()\type = #PB_Quad
Debug PeekQ(myList()\data )
EndIf
AddElement(myList())
myList()\type = #PB_String
myList()\data =@"testString"
If myList()\type = #PB_String
Debug PeekS(myList()\data )
EndIf
Re: MySQL-Anfrage in Thread auslagern
Verfasst: 28.02.2016 20:50
von techniker
Wie ich mit den DB-Daten dann weiter verfahre ist ja nicht mein Problem..
Ich dachte bisher immer, dass ein Peek/Poke an eine von einem Thread gesetzten Wert unsicher ist - oder irre ich mich da?
Ist die List immer noch gültig, auch wenn der Thread beendet wurde?
Mein Problem liegt allein an der sicheren Rückmeldung von komplett dynamischen Daten eines Threads ans Hauptprogramm.

Re: MySQL-Anfrage in Thread auslagern
Verfasst: 28.02.2016 21:58
von Derren
Weiß ich jetzt nicht, ich mache relativ wenig mit Threads.
Wenn du nicht Peek/Poke verwenden willst, würde ich eine allgemeine Struktur erstellen.
Code: Alles auswählen
Structure myStruct
Type.i
Integer.i
Long.l
Quad.q
Byte.b
String.s
EndStructure
Define var.myStruct
var\Type = #PB_String
var\String = "Hallo Welt"
Select var\Type
Case #PB_Integer
Debug var\Integer
Case #PB_String
Debug var\String
EndSelect
Im Ganzen:
Code: Alles auswählen
Structure types
Type.i
Quad.q
Integer.i
Long.l
String.s
EndStructure
Structure myStruct
List mylist.types()
EndStructure
Define carrierVar.myStruct
Procedure myThread(*ptr.myStruct)
AddElement(*ptr\mylist())
*ptr\mylist()\Type = #PB_Quad
*ptr\mylist()\Quad = 757238976979735
AddElement(*ptr\mylist())
*ptr\mylist()\Type = #PB_String
*ptr\mylist()\String = "Hallo Welt"
EndProcedure
CreateThread(@myThread(), carrierVar.myStruct)
Delay(1000) ;Dem Thread ein wenig Zeit einräumen
ResetList(carrierVar\mylist())
While NextElement(carrierVar\mylist())
Select carrierVar\mylist()\Type
Case #PB_Quad
Debug carrierVar\mylist()\Quad
Case #PB_String
Debug carrierVar\mylist()\String
EndSelect
Wend
Für den gleichzeitigen Zugriff musst du natürlich einen Mutex erstellen, das habe ich jetzt mal außen vor gelassen.
Re: MySQL-Anfrage in Thread auslagern
Verfasst: 29.02.2016 08:54
von mhs
Die einfachste Variante ist die Tabellen bzw. abgefragten Spalten als Strukturen anzulegen und alle Rückgabewerte in einer Liste zu speichern. Diese Liste gibt dann der Thread pro "job" als Ergebnis zurück.
Wenn deine Rückgabewerte natürlich komplett dynamisch sind, also du auch die Query dazu eigentlich nicht kennst, dann musst du den Weg über die Variant Struktur gehen, den dir Derren vorgeschlagen hat. Oder du machst so etwas in der Art:
Code: Alles auswählen
Structure Row
Map Columns.s()
EndStructure
NewList Rows.Row()
#db = 1
Define.i Count, i
; -- DATABASE QUERY --
Count = DatabaseColumns(#db)
While NextDatabaseRow(#db)
AddElement(Rows())
For i = 0 To Count - 1
Rows(DatabaseColumnName(#db, i)) = GetDatabaseString(#db, i)
Next i
Wend
Re: MySQL-Anfrage in Thread auslagern
Verfasst: 29.02.2016 11:00
von techniker
Ich habe nun den Code aus dem Link vom 1. Post für meine Bedürfnisse nach euren Anregungen angepasst.
Auf den ersten Blick scheint er reibungslos zu laufen..
Frage an die Gurus unter euch: Seht ihr bei der Datenübergabe o.ä. ein Risiko oder einen Fehler?
Code: Alles auswählen
EnableExplicit
; *************** QueryQueue ***************
DeclareModule QueryQueue
Structure QType
query.s
EndStructure
Structure QStruct
uniqueid.s
List QList.QType()
EndStructure
Interface Type
destroy.i()
countElements.i()
isEmpty.i()
push.i(*element.QStruct)
pop.i()
EndInterface
Declare.i new()
EndDeclareModule
Module QueryQueue
Structure Type_S
*vTable
List *elements.QStruct()
lock.i
semaphore.i
EndStructure
Procedure.i new()
Protected *this.Type_S = AllocateMemory(SizeOf(Type_S))
If (Not *this)
ProcedureReturn #False
EndIf
InitializeStructure(*this, Type_S)
With *this
\vTable = ?ConcurrentQueue_vTable
\semaphore = CreateSemaphore(0)
If (Not \semaphore)
ClearStructure(*this, Type_S)
FreeMemory(*this)
ProcedureReturn #False
EndIf
\lock = CreateMutex()
If (Not \lock)
FreeSemaphore(\semaphore)
ClearStructure(*this, Type_S)
FreeMemory(*this)
ProcedureReturn #False
EndIf
EndWith
ProcedureReturn *this
EndProcedure
Procedure.i destroy(*this.Type_S)
With *this
FreeSemaphore(\semaphore)
FreeMutex(\lock)
ClearStructure(*this, Type_S)
EndWith
FreeMemory(*this)
EndProcedure
Procedure.i countElements(*this.Type_S)
Protected size.i
With *this
LockMutex(\lock)
size = ListSize(\elements())
UnlockMutex(\lock)
EndWith
ProcedureReturn size
EndProcedure
Procedure.i isEmpty(*this.Type)
ProcedureReturn Bool(*this\countElements() = 0)
EndProcedure
Procedure.i push(*this.Type_S, *element.QStruct)
Protected result.i = #False
With *this
LockMutex(\lock)
LastElement(\elements())
If AddElement(\elements())
\elements() = *element
SignalSemaphore(\semaphore)
result = #True
EndIf
UnlockMutex(\lock)
EndWith
ProcedureReturn result
EndProcedure
Procedure.i pop(*this.Type_S)
Protected *element.QStruct
With *this
LockMutex(\lock)
If (Not TrySemaphore(\semaphore))
UnlockMutex(\lock)
WaitSemaphore(\semaphore)
LockMutex(\lock)
EndIf
FirstElement(\elements())
*element = \elements()
DeleteElement(\elements())
UnlockMutex(\lock)
EndWith
ProcedureReturn *element
EndProcedure
DataSection
ConcurrentQueue_vTable:
Data.i @destroy(), @countElements(), @isEmpty(), @push(), @pop()
EndDataSection
EndModule
; *************** ResponseQueue ***************
DeclareModule ResponseQueue
Structure RType
row.l
vartype.i
varname.s
quad.q
float.f
long.l
string.s
EndStructure
Structure RStruct
uniqueid.s
List RList.RType()
EndStructure
Interface Type
destroy.i()
countElements.i()
isEmpty.i()
push.i(*element.RStruct)
pop.i()
EndInterface
Declare.i new()
EndDeclareModule
Module ResponseQueue
Structure Type_S
*vTable
List *elements.RStruct()
lock.i
semaphore.i
EndStructure
Procedure.i new()
Protected *this.Type_S = AllocateMemory(SizeOf(Type_S))
If (Not *this)
ProcedureReturn #False
EndIf
InitializeStructure(*this, Type_S)
With *this
\vTable = ?ConcurrentQueue_vTable
\semaphore = CreateSemaphore(0)
If (Not \semaphore)
ClearStructure(*this, Type_S)
FreeMemory(*this)
ProcedureReturn #False
EndIf
\lock = CreateMutex()
If (Not \lock)
FreeSemaphore(\semaphore)
ClearStructure(*this, Type_S)
FreeMemory(*this)
ProcedureReturn #False
EndIf
EndWith
ProcedureReturn *this
EndProcedure
Procedure.i destroy(*this.Type_S)
With *this
FreeSemaphore(\semaphore)
FreeMutex(\lock)
ClearStructure(*this, Type_S)
EndWith
FreeMemory(*this)
EndProcedure
Procedure.i countElements(*this.Type_S)
Protected size.i
With *this
LockMutex(\lock)
size = ListSize(\elements())
UnlockMutex(\lock)
EndWith
ProcedureReturn size
EndProcedure
Procedure.i isEmpty(*this.Type)
ProcedureReturn Bool(*this\countElements() = 0)
EndProcedure
Procedure.i push(*this.Type_S, *element.RStruct)
Protected result.i = #False
With *this
LockMutex(\lock)
LastElement(\elements())
If AddElement(\elements())
\elements() = *element
SignalSemaphore(\semaphore)
result = #True
EndIf
UnlockMutex(\lock)
EndWith
ProcedureReturn result
EndProcedure
Procedure.i pop(*this.Type_S)
Protected *element.RStruct
With *this
LockMutex(\lock)
If (Not TrySemaphore(\semaphore))
UnlockMutex(\lock)
WaitSemaphore(\semaphore)
LockMutex(\lock)
EndIf
FirstElement(\elements())
*element = \elements()
DeleteElement(\elements())
UnlockMutex(\lock)
EndWith
ProcedureReturn *element
EndProcedure
DataSection
ConcurrentQueue_vTable:
Data.i @destroy(), @countElements(), @isEmpty(), @push(), @pop()
EndDataSection
EndModule
; *************** Main ***************
Structure QType
query.s
EndStructure
Structure QStruct
uniqueid.s
List QList.QType()
EndStructure
Structure RType
row.l
vartype.i
varname.s
quad.q
float.f
long.l
string.s
EndStructure
Structure RStruct
uniqueid.s
List RList.RType()
EndStructure
Structure QHdl
QQueue.QueryQueue::Type
RQueue.ResponseQueue::Type
EndStructure
Define Queue.QHdl
Queue\QQueue = QueryQueue::new()
Queue\RQueue = ResponseQueue::new()
Define QDat.QStruct
Define *RDat.RStruct
Procedure Thread(*Queue.QHdl)
Define *tQDat.QStruct
Define tRDat.RStruct
Repeat
*tQDat = *Queue\QQueue\pop()
Debug "Pop: " + *tQDat\uniqueid
ForEach *tQDat\QList()
Debug *tQDat\QList()\query
DeleteElement(*tQDat\QList())
Next
If *tQDat\uniqueid = "10"
tRDat\uniqueid = "0815 :-)"
AddElement(tRDat\RList())
tRDat\RList()\vartype = #PB_String
tRDat\RList()\varname = "teststring"
tRDat\RList()\string = "testvalue"
*Queue\RQueue\push(@tRDat)
EndIf
ForEver
EndProcedure
Define thread.i = CreateThread(@Thread(), @Queue.QHdl)
Define i.i
For i = 1 To 20
Debug "Push: " + i
QDat\uniqueid = Str(i)
AddElement(QDat\QList())
QDat\QList()\query = "Query 1"
AddElement(QDat\QList())
QDat\QList()\query = "Query 2"
AddElement(QDat\QList())
QDat\QList()\query = "Query 3"
Queue\QQueue\push(@QDat)
Delay(Random(30, 10))
Next
*RDat = Queue\RQueue\pop()
Debug "MasterPop: " + *RDat\uniqueid
ForEach *RDat\RList()
Debug *RDat\RList()\vartype
Debug *RDat\RList()\varname
Debug *RDat\RList()\string
DeleteElement(*RDat\RList())
Next
Delay(5000)
KillThread(thread)
Re: MySQL-Anfrage in Thread auslagern
Verfasst: 29.02.2016 15:12
von techniker
Leider ist der Code noch fehlerbehaftet und ich finde einfach nicht heraus, wo es genau hakt..
Wie es aussieht, wird die Push-Routine noch korrekt aufgerufen und der Wert in die Queue geschrieben.
Die Anzahl der Elemente in der Queue stimmt, aber die Daten sind Müll..
Aber warum..?
Den Code habe ich nun auf das Minimum kastriert:
Code: Alles auswählen
EnableExplicit
DeclareModule QueryQueue
Structure QStruct
uniqueid.s
query.s
EndStructure
Interface Type
push.i(*element.QStruct)
pop.i()
EndInterface
Declare.i new()
EndDeclareModule
Module QueryQueue
Structure Type_QS
*vTable
List *elements.QStruct()
lock.i
semaphore.i
EndStructure
Procedure.i new()
Protected *this.Type_QS = AllocateMemory(SizeOf(Type_QS))
If (Not *this)
ProcedureReturn #False
EndIf
InitializeStructure(*this, Type_QS)
With *this
\vTable = ?QueryQueue_vTable
\semaphore = CreateSemaphore(0)
If (Not \semaphore)
ClearStructure(*this, Type_QS)
FreeMemory(*this)
ProcedureReturn #False
EndIf
\lock = CreateMutex()
If (Not \lock)
FreeSemaphore(\semaphore)
ClearStructure(*this, Type_QS)
FreeMemory(*this)
ProcedureReturn #False
EndIf
EndWith
ProcedureReturn *this
EndProcedure
Procedure.i push(*this.Type_QS, *element.QStruct)
Protected result.i = #False
With *this
LockMutex(\lock)
LastElement(\elements())
If AddElement(\elements())
\elements() = *element
Debug " WrQ:" + \elements()\uniqueid
SignalSemaphore(\semaphore)
result = #True
EndIf
UnlockMutex(\lock)
EndWith
ProcedureReturn result
EndProcedure
Procedure.i pop(*this.Type_QS)
Define *element.QStruct
With *this
LockMutex(\lock)
If (Not TrySemaphore(\semaphore))
UnlockMutex(\lock)
WaitSemaphore(\semaphore)
LockMutex(\lock)
EndIf
FirstElement(\elements())
Debug " RdQ:" + \elements()\uniqueid
*element = \elements()
DeleteElement(\elements())
UnlockMutex(\lock)
EndWith
ProcedureReturn *element
EndProcedure
DataSection
QueryQueue_vTable:
Data.i @push(), @pop()
EndDataSection
EndModule
Structure QStruct
uniqueid.s
query.s
EndStructure
Structure QHdl
QQueue.QueryQueue::Type
;RQueue.ResponseQueue::Type
EndStructure
Define Queue.QHdl
Queue\QQueue = QueryQueue::new()
;Queue\RQueue = ResponseQueue::new()
Define QDat.QStruct
;Define *RDat.RStruct
Procedure Thread(*Queue.QHdl)
Protected *tQDat.QStruct
Repeat
*tQDat = *Queue\QQueue\pop()
Debug "Pop: " + *tQDat\uniqueid
Delay(Random(50, 20))
ForEver
EndProcedure
Define thread.i = CreateThread(@Thread(), @Queue)
Define i.i
For i = 1 To 10
Debug "Push: " + i
QDat\uniqueid = Str(i)
QDat\query = "Query"
Queue\QQueue\push(@QDat)
Delay(Random(30, 10))
Next
Delay(1000)
KillThread(thread)
Re: MySQL-Anfrage in Thread auslagern
Verfasst: 29.02.2016 15:57
von NicTheQuick
Meine push-Funktion pusht nur den Pointer in die Warteschlange, nicht die Daten dahinter. Und da QDat nur einmal in deinem Programm existiert, wird ständig der selbe Pointer in die Warteschlange gepusht.
Hier mal, wie man es richtig macht:
Code: Alles auswählen
EnableExplicit
DeclareModule QueryQueue
Structure QStruct
uniqueid.s
query.s
EndStructure
Interface type
push.i(*element.QStruct)
pop.i()
EndInterface
Declare.i new()
EndDeclareModule
Module QueryQueue
Structure Type_QS
*vTable
List *elements.QStruct()
lock.i
semaphore.i
EndStructure
Procedure.i new()
Protected *this.Type_QS = AllocateMemory(SizeOf(Type_QS))
If (Not *this)
ProcedureReturn #False
EndIf
InitializeStructure(*this, Type_QS)
With *this
\vTable = ?QueryQueue_vTable
\semaphore = CreateSemaphore(0)
If (Not \semaphore)
ClearStructure(*this, Type_QS)
FreeMemory(*this)
ProcedureReturn #False
EndIf
\lock = CreateMutex()
If (Not \lock)
FreeSemaphore(\semaphore)
ClearStructure(*this, Type_QS)
FreeMemory(*this)
ProcedureReturn #False
EndIf
EndWith
ProcedureReturn *this
EndProcedure
Procedure.i push(*this.Type_QS, *element.QStruct)
Protected result.i = #False
With *this
LockMutex(\lock)
LastElement(\elements())
If AddElement(\elements())
\elements() = *element
Debug " WrQ:" + \elements()\uniqueid
SignalSemaphore(\semaphore)
result = #True
EndIf
UnlockMutex(\lock)
EndWith
ProcedureReturn result
EndProcedure
Procedure.i pop(*this.Type_QS)
Define *element.QStruct
With *this
LockMutex(\lock)
If (Not TrySemaphore(\semaphore))
UnlockMutex(\lock)
WaitSemaphore(\semaphore)
LockMutex(\lock)
EndIf
FirstElement(\elements())
Debug " RdQ:" + \elements()\uniqueid
*element = \elements()
DeleteElement(\elements())
UnlockMutex(\lock)
EndWith
ProcedureReturn *element
EndProcedure
DataSection
QueryQueue_vTable:
Data.i @push(), @pop()
EndDataSection
EndModule
Structure QStruct
uniqueid.s
query.s
EndStructure
Structure QHdl
QQueue.QueryQueue::type
;RQueue.ResponseQueue::Type
EndStructure
Define Queue.QHdl
Queue\QQueue = QueryQueue::new()
;Queue\RQueue = ResponseQueue::new()
Define QDat.QStruct
;Define *RDat.RStruct
Procedure Thread(*Queue.QHdl)
Protected *tQDat.QStruct
Repeat
*tQDat = *Queue\QQueue\pop()
Debug "Pop: " + *tQDat\uniqueid
Delay(Random(50, 20))
FreeStructure(*tQDat) ; Speicher wieder freigeben
ForEver
EndProcedure
Procedure.i pushElement(*Queue.QueryQueue::type, uniqueid.s, query.s)
Protected *element.QStruct = AllocateStructure(QStruct) ; Speicher allozieren
If Not *element
ProcedureReturn #False
EndIf
With *element
\uniqueid = uniqueid
\query = query
EndWith
ProcedureReturn *Queue\push(*element)
EndProcedure
Define thread.i = CreateThread(@Thread(), @Queue)
Define i.i
For i = 1 To 10
Debug "Push: " + i
pushElement(Queue\QQueue, Str(i), "Query")
Delay(Random(30, 10))
Next
Delay(1000)
KillThread(thread)
Beachte auch das 'FreeStructure()' im Thread, das du aufrufen solltest um das Element tatsächlich im Speicher zu löschen.
Re: MySQL-Anfrage in Thread auslagern
Verfasst: 29.02.2016 16:04
von techniker
Perfekt!
Vielen herzlichen Dank!
(Werde mir das ganze nun etwas genauer ansehen, da ich das aus der C-Welt (Controllerprogrammierung) etwas anders gewöhnt bin..

)