Seite 1 von 1

MySQL-Anfrage in Thread auslagern

Verfasst: 28.02.2016 19:37
von techniker
Hallo!

in einer aktuellen Anwendung muss ich zeitraubende DB-Anfragen zyklisch aufrufen, wobei mir dann leider der Eventhandler bzw. die Anwendung hängt.

Ich wollte/möchte somit die DB-Querys in einzelne Threads auslagern, um zugleich eine Performancesteigerung zu erzielen.
Die Interaktion zwischen der Hauptanwendung und den Threads wollte ich mit zwei Queues lösen (Query und Response).
Für die eindeutige Zuordnung zwischen Anfrage und Antwort hätte ich zufällige UniqueID's angedacht.

Mein Problem:
Die Antworten sind leider komplett dynamisch - Also vom Typ (momentan String, Long, Quad, Float), vom Variablennamen, dem Wert selbst und der Anzahl der rückgegeben Werte.. :cry:

Ich wollte eine Queue wie etwa diese hier anwenden: https://freakscorner.de/2014/01/26/pb-w ... rentqueue/

Wie gebe ich jedoch die Antwort an das Hauptprogramm zurück?
Hat hier jemand einen Tipp für mich? Ich grüble nun bereits seit mehreren Tagen und finde keine passable Lösung..

(Meine letzte Idee war es, das Ergebnis in eine Datei im temp. Verzeichnis zu schreiben und diese später im Hauptprogramm auszuwerten. Jedoch ist das nicht gerade effektiv..)

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 28.02.2016 19:45
von Derren
Du musst doch wissen, wie du mit den Daten aus der DB umgehst. Wie hast du es dann ohne Thread gemacht?
Eine beliebige Anzahl an Parameter zurückgeben würde ich über eine LinkedList lösen.

Code: Alles auswählen

Structure dataStructure
  type.i
  *data
EndStructure

NewList mylist.dataStructure()



AddElement(myList())
myList()\type = #PB_Quad
myList()\data = AllocateMemory(SizeOf(Quad))
PokeQ(myList()\data, 18975867856348)

If myList()\type = #PB_Quad
  Debug PeekQ(myList()\data )
EndIf 




AddElement(myList())
myList()\type = #PB_String
myList()\data =@"testString"

If myList()\type = #PB_String
  Debug PeekS(myList()\data )
EndIf 

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 28.02.2016 20:50
von techniker
Wie ich mit den DB-Daten dann weiter verfahre ist ja nicht mein Problem.. :mrgreen:

Ich dachte bisher immer, dass ein Peek/Poke an eine von einem Thread gesetzten Wert unsicher ist - oder irre ich mich da?
Ist die List immer noch gültig, auch wenn der Thread beendet wurde? :?

Mein Problem liegt allein an der sicheren Rückmeldung von komplett dynamischen Daten eines Threads ans Hauptprogramm. :roll:

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 28.02.2016 21:58
von Derren
Weiß ich jetzt nicht, ich mache relativ wenig mit Threads.
Wenn du nicht Peek/Poke verwenden willst, würde ich eine allgemeine Struktur erstellen.

Code: Alles auswählen

Structure myStruct
  Type.i
  Integer.i
  Long.l
  Quad.q
  Byte.b 
  String.s
EndStructure

Define var.myStruct
var\Type = #PB_String
var\String = "Hallo Welt"

Select var\Type
  Case #PB_Integer
    Debug var\Integer
  Case #PB_String
    Debug var\String
EndSelect


Im Ganzen:

Code: Alles auswählen

Structure types
	Type.i
	Quad.q
	Integer.i
	Long.l
	String.s
EndStructure 

Structure myStruct
	List mylist.types()
EndStructure 

Define carrierVar.myStruct


Procedure myThread(*ptr.myStruct)
	AddElement(*ptr\mylist())	
	*ptr\mylist()\Type = #PB_Quad
	*ptr\mylist()\Quad = 757238976979735
	
	AddElement(*ptr\mylist())	
	*ptr\mylist()\Type = #PB_String
	*ptr\mylist()\String = "Hallo Welt"	

EndProcedure 

CreateThread(@myThread(), carrierVar.myStruct)


Delay(1000) ;Dem Thread ein wenig Zeit einräumen

ResetList(carrierVar\mylist())
While NextElement(carrierVar\mylist())
	Select carrierVar\mylist()\Type
		Case #PB_Quad
			Debug carrierVar\mylist()\Quad
		Case #PB_String
			Debug carrierVar\mylist()\String
	EndSelect 
Wend 
Für den gleichzeitigen Zugriff musst du natürlich einen Mutex erstellen, das habe ich jetzt mal außen vor gelassen.

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 29.02.2016 08:54
von mhs
Die einfachste Variante ist die Tabellen bzw. abgefragten Spalten als Strukturen anzulegen und alle Rückgabewerte in einer Liste zu speichern. Diese Liste gibt dann der Thread pro "job" als Ergebnis zurück.

Wenn deine Rückgabewerte natürlich komplett dynamisch sind, also du auch die Query dazu eigentlich nicht kennst, dann musst du den Weg über die Variant Struktur gehen, den dir Derren vorgeschlagen hat. Oder du machst so etwas in der Art:

Code: Alles auswählen

Structure Row
  Map Columns.s()
EndStructure

NewList Rows.Row()

#db = 1

Define.i Count, i

; -- DATABASE QUERY --

Count = DatabaseColumns(#db)

While NextDatabaseRow(#db)

  AddElement(Rows())

  For i = 0 To Count - 1
    Rows(DatabaseColumnName(#db, i)) = GetDatabaseString(#db, i)
  Next i

Wend

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 29.02.2016 11:00
von techniker
Ich habe nun den Code aus dem Link vom 1. Post für meine Bedürfnisse nach euren Anregungen angepasst.
Auf den ersten Blick scheint er reibungslos zu laufen.. :lol:

Frage an die Gurus unter euch: Seht ihr bei der Datenübergabe o.ä. ein Risiko oder einen Fehler?

Code: Alles auswählen

EnableExplicit

; *************** QueryQueue ***************

DeclareModule QueryQueue
  Structure QType
    query.s
  EndStructure 

  Structure QStruct
    uniqueid.s
    List QList.QType()
  EndStructure   
  
  Interface Type
		destroy.i()
		countElements.i()
		isEmpty.i()
		push.i(*element.QStruct)
		pop.i()
	EndInterface

	Declare.i new()
EndDeclareModule


Module QueryQueue
 Structure Type_S
		*vTable
		List *elements.QStruct()
		lock.i
		semaphore.i
	EndStructure
	
	
	Procedure.i new()
		Protected *this.Type_S = AllocateMemory(SizeOf(Type_S))
		If (Not *this)
			ProcedureReturn #False
		EndIf
		InitializeStructure(*this, Type_S)

		With *this
			\vTable = ?ConcurrentQueue_vTable
			\semaphore = CreateSemaphore(0)
			If (Not \semaphore)
				ClearStructure(*this, Type_S)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
			\lock = CreateMutex()
			If (Not \lock)
				FreeSemaphore(\semaphore)
				ClearStructure(*this, Type_S)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
		EndWith

		ProcedureReturn *this
	EndProcedure
	
	
	Procedure.i destroy(*this.Type_S)
		With *this
			FreeSemaphore(\semaphore)
			FreeMutex(\lock)
			ClearStructure(*this, Type_S)
		EndWith
		FreeMemory(*this)
	EndProcedure
	
	
	Procedure.i countElements(*this.Type_S)
		Protected size.i
		With *this
			LockMutex(\lock)
			size = ListSize(\elements())
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn size
	EndProcedure
	
	
	Procedure.i isEmpty(*this.Type)
		ProcedureReturn Bool(*this\countElements() = 0)
	EndProcedure
	
	
	Procedure.i push(*this.Type_S, *element.QStruct)
		Protected result.i = #False
		With *this
			LockMutex(\lock)
			LastElement(\elements())
			If AddElement(\elements())
				\elements() = *element
				SignalSemaphore(\semaphore)
				result = #True
			EndIf
			UnlockMutex(\lock)
		EndWith

		ProcedureReturn result
	EndProcedure
	
	
	Procedure.i pop(*this.Type_S)
		Protected *element.QStruct
		With *this
			LockMutex(\lock)
			If (Not TrySemaphore(\semaphore))
				UnlockMutex(\lock)
				WaitSemaphore(\semaphore)
				LockMutex(\lock)
			EndIf
			FirstElement(\elements())
			*element = \elements()
			DeleteElement(\elements())
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn *element
	EndProcedure

	DataSection
		ConcurrentQueue_vTable:
			Data.i @destroy(), @countElements(), @isEmpty(), @push(), @pop()
	EndDataSection
EndModule


; *************** ResponseQueue ***************

DeclareModule ResponseQueue
  Structure RType
    row.l
    vartype.i
    varname.s
    quad.q
    float.f
    long.l
    string.s
  EndStructure 
  
  Structure RStruct
    uniqueid.s
    List RList.RType()
  EndStructure   
  
  Interface Type
		destroy.i()
		countElements.i()
		isEmpty.i()
		push.i(*element.RStruct)
		pop.i()
	EndInterface

	Declare.i new()
EndDeclareModule


Module ResponseQueue
 Structure Type_S
		*vTable
		List *elements.RStruct()
		lock.i
		semaphore.i
	EndStructure
	
	
	Procedure.i new()
		Protected *this.Type_S = AllocateMemory(SizeOf(Type_S))
		If (Not *this)
			ProcedureReturn #False
		EndIf
		InitializeStructure(*this, Type_S)

		With *this
			\vTable = ?ConcurrentQueue_vTable
			\semaphore = CreateSemaphore(0)
			If (Not \semaphore)
				ClearStructure(*this, Type_S)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
			\lock = CreateMutex()
			If (Not \lock)
				FreeSemaphore(\semaphore)
				ClearStructure(*this, Type_S)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
		EndWith

		ProcedureReturn *this
	EndProcedure
	
	
	Procedure.i destroy(*this.Type_S)
		With *this
			FreeSemaphore(\semaphore)
			FreeMutex(\lock)
			ClearStructure(*this, Type_S)
		EndWith
		FreeMemory(*this)
	EndProcedure
	
	
	Procedure.i countElements(*this.Type_S)
		Protected size.i
		With *this
			LockMutex(\lock)
			size = ListSize(\elements())
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn size
	EndProcedure
	
	
	Procedure.i isEmpty(*this.Type)
		ProcedureReturn Bool(*this\countElements() = 0)
	EndProcedure
	
	
	Procedure.i push(*this.Type_S, *element.RStruct)
		Protected result.i = #False
		With *this
			LockMutex(\lock)
			LastElement(\elements())
			If AddElement(\elements())
				\elements() = *element
				SignalSemaphore(\semaphore)
				result = #True
			EndIf
			UnlockMutex(\lock)
		EndWith

		ProcedureReturn result
	EndProcedure
	
	
	Procedure.i pop(*this.Type_S)
		Protected *element.RStruct
		With *this
			LockMutex(\lock)
			If (Not TrySemaphore(\semaphore))
				UnlockMutex(\lock)
				WaitSemaphore(\semaphore)
				LockMutex(\lock)
			EndIf
			FirstElement(\elements())
			*element = \elements()
			DeleteElement(\elements())
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn *element
	EndProcedure

	DataSection
		ConcurrentQueue_vTable:
			Data.i @destroy(), @countElements(), @isEmpty(), @push(), @pop()
	EndDataSection
EndModule





; *************** Main ***************


Structure QType
  query.s
EndStructure 
  
Structure QStruct
  uniqueid.s
  List QList.QType()
EndStructure   
  
Structure RType
  row.l
  vartype.i
  varname.s
  quad.q
  float.f
  long.l
  string.s
EndStructure 

Structure RStruct
  uniqueid.s
  List RList.RType()
EndStructure   

Structure QHdl
  QQueue.QueryQueue::Type
  RQueue.ResponseQueue::Type
EndStructure


Define Queue.QHdl
Queue\QQueue = QueryQueue::new()
Queue\RQueue = ResponseQueue::new()

Define QDat.QStruct
Define *RDat.RStruct


Procedure Thread(*Queue.QHdl)
  Define *tQDat.QStruct
  Define tRDat.RStruct
    
  Repeat
    *tQDat = *Queue\QQueue\pop()
    Debug "Pop: " + *tQDat\uniqueid
    ForEach *tQDat\QList()
      Debug *tQDat\QList()\query
      DeleteElement(*tQDat\QList())
    Next
    
    If *tQDat\uniqueid = "10"
      tRDat\uniqueid = "0815 :-)"
      
      AddElement(tRDat\RList())
      tRDat\RList()\vartype = #PB_String
      tRDat\RList()\varname = "teststring"
      tRDat\RList()\string = "testvalue"
      *Queue\RQueue\push(@tRDat)
    EndIf
    
	ForEver
EndProcedure


Define thread.i = CreateThread(@Thread(), @Queue.QHdl)


Define i.i
For i = 1 To 20
  Debug "Push: " + i
  QDat\uniqueid = Str(i)
  
  AddElement(QDat\QList())
  QDat\QList()\query = "Query 1"
  AddElement(QDat\QList())
  QDat\QList()\query = "Query 2"
  AddElement(QDat\QList())
  QDat\QList()\query = "Query 3"
  
	Queue\QQueue\push(@QDat)
	Delay(Random(30, 10))
Next

*RDat = Queue\RQueue\pop()
Debug "MasterPop: " + *RDat\uniqueid
ForEach *RDat\RList()
  Debug *RDat\RList()\vartype
  Debug *RDat\RList()\varname
  Debug *RDat\RList()\string
  DeleteElement(*RDat\RList())
Next


Delay(5000)
KillThread(thread)

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 29.02.2016 15:12
von techniker
Leider ist der Code noch fehlerbehaftet und ich finde einfach nicht heraus, wo es genau hakt.. :angry:

Wie es aussieht, wird die Push-Routine noch korrekt aufgerufen und der Wert in die Queue geschrieben.
Die Anzahl der Elemente in der Queue stimmt, aber die Daten sind Müll.. :roll:

Aber warum..? :?

Den Code habe ich nun auf das Minimum kastriert:

Code: Alles auswählen

EnableExplicit


DeclareModule QueryQueue
  Structure QStruct
    uniqueid.s
    query.s
  EndStructure   
  
	Interface Type
		push.i(*element.QStruct)
		pop.i()
	EndInterface

	Declare.i new()
EndDeclareModule


Module QueryQueue
	Structure Type_QS
		*vTable
		List *elements.QStruct()
		lock.i
		semaphore.i
	EndStructure
	
	
	Procedure.i new()
		Protected *this.Type_QS = AllocateMemory(SizeOf(Type_QS))
		If (Not *this)
			ProcedureReturn #False
		EndIf
		InitializeStructure(*this, Type_QS)

		With *this
			\vTable = ?QueryQueue_vTable
			\semaphore = CreateSemaphore(0)
			If (Not \semaphore)
				ClearStructure(*this, Type_QS)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
			\lock = CreateMutex()
			If (Not \lock)
				FreeSemaphore(\semaphore)
				ClearStructure(*this, Type_QS)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
		EndWith
		ProcedureReturn *this
	EndProcedure
	
	
	Procedure.i push(*this.Type_QS, *element.QStruct)
		Protected result.i = #False
		With *this
			LockMutex(\lock)
			LastElement(\elements())
			If AddElement(\elements())
			  \elements() = *element
Debug "                    WrQ:" + \elements()\uniqueid
				SignalSemaphore(\semaphore)
				result = #True
			EndIf
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn result
	EndProcedure
	
	
	Procedure.i pop(*this.Type_QS)
	  Define *element.QStruct
		With *this
			LockMutex(\lock)
			If (Not TrySemaphore(\semaphore))
				UnlockMutex(\lock)
				WaitSemaphore(\semaphore)
				LockMutex(\lock)
			EndIf
			FirstElement(\elements())
Debug "                    RdQ:" + \elements()\uniqueid
			*element = \elements()
			DeleteElement(\elements())
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn *element
	EndProcedure

	DataSection
		QueryQueue_vTable:
			Data.i @push(), @pop()
	EndDataSection
EndModule


Structure QStruct
  uniqueid.s
  query.s
EndStructure   

Structure QHdl
  QQueue.QueryQueue::Type
  ;RQueue.ResponseQueue::Type
EndStructure

Define Queue.QHdl
Queue\QQueue = QueryQueue::new()
;Queue\RQueue = ResponseQueue::new()

Define QDat.QStruct
;Define *RDat.RStruct


Procedure Thread(*Queue.QHdl)
  Protected *tQDat.QStruct
  
  Repeat
    *tQDat = *Queue\QQueue\pop()
    Debug "Pop: " + *tQDat\uniqueid
    Delay(Random(50, 20))
	ForEver
EndProcedure


Define thread.i = CreateThread(@Thread(), @Queue)


Define i.i
For i = 1 To 10
  Debug "Push: " + i
  QDat\uniqueid = Str(i)
  QDat\query = "Query"
  Queue\QQueue\push(@QDat)
	Delay(Random(30, 10))
Next

Delay(1000)
KillThread(thread)

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 29.02.2016 15:57
von NicTheQuick
Meine push-Funktion pusht nur den Pointer in die Warteschlange, nicht die Daten dahinter. Und da QDat nur einmal in deinem Programm existiert, wird ständig der selbe Pointer in die Warteschlange gepusht.
Hier mal, wie man es richtig macht:

Code: Alles auswählen

EnableExplicit


DeclareModule QueryQueue
	Structure QStruct
		uniqueid.s
		query.s
	EndStructure   
	
	Interface type
		push.i(*element.QStruct)
		pop.i()
	EndInterface
	
	Declare.i new()
EndDeclareModule


Module QueryQueue
	Structure Type_QS
		*vTable
		List *elements.QStruct()
		lock.i
		semaphore.i
	EndStructure
	
	
	Procedure.i new()
		Protected *this.Type_QS = AllocateMemory(SizeOf(Type_QS))
		If (Not *this)
			ProcedureReturn #False
		EndIf
		InitializeStructure(*this, Type_QS)
		
		With *this
			\vTable = ?QueryQueue_vTable
			\semaphore = CreateSemaphore(0)
			If (Not \semaphore)
				ClearStructure(*this, Type_QS)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
			\lock = CreateMutex()
			If (Not \lock)
				FreeSemaphore(\semaphore)
				ClearStructure(*this, Type_QS)
				FreeMemory(*this)
				ProcedureReturn #False
			EndIf
		EndWith
		ProcedureReturn *this
	EndProcedure
	
	
	Procedure.i push(*this.Type_QS, *element.QStruct)
		Protected result.i = #False
		With *this
			LockMutex(\lock)
			LastElement(\elements())
			If AddElement(\elements())
				\elements() = *element
				Debug "                    WrQ:" + \elements()\uniqueid
				SignalSemaphore(\semaphore)
				result = #True
			EndIf
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn result
	EndProcedure
	
	
	Procedure.i pop(*this.Type_QS)
		Define *element.QStruct
		With *this
			LockMutex(\lock)
			If (Not TrySemaphore(\semaphore))
				UnlockMutex(\lock)
				WaitSemaphore(\semaphore)
				LockMutex(\lock)
			EndIf
			FirstElement(\elements())
			Debug "                    RdQ:" + \elements()\uniqueid
			*element = \elements()
			DeleteElement(\elements())
			UnlockMutex(\lock)
		EndWith
		ProcedureReturn *element
	EndProcedure
	
	DataSection
		QueryQueue_vTable:
		Data.i @push(), @pop()
	EndDataSection
EndModule


Structure QStruct
	uniqueid.s
	query.s
EndStructure   

Structure QHdl
	QQueue.QueryQueue::type
	;RQueue.ResponseQueue::Type
EndStructure

Define Queue.QHdl
Queue\QQueue = QueryQueue::new()
;Queue\RQueue = ResponseQueue::new()

Define QDat.QStruct
;Define *RDat.RStruct


Procedure Thread(*Queue.QHdl)
	Protected *tQDat.QStruct
	
	Repeat
		*tQDat = *Queue\QQueue\pop()
		Debug "Pop: " + *tQDat\uniqueid
		Delay(Random(50, 20))
		FreeStructure(*tQDat) ; Speicher wieder freigeben
	ForEver
EndProcedure

Procedure.i pushElement(*Queue.QueryQueue::type, uniqueid.s, query.s)
	Protected *element.QStruct = AllocateStructure(QStruct) ; Speicher allozieren
	If Not *element
		ProcedureReturn #False
	EndIf
	With *element
		\uniqueid = uniqueid
		\query = query
	EndWith
	ProcedureReturn *Queue\push(*element)
EndProcedure


Define thread.i = CreateThread(@Thread(), @Queue)


Define i.i
For i = 1 To 10
	Debug "Push: " + i
	pushElement(Queue\QQueue, Str(i), "Query")
	Delay(Random(30, 10))
Next

Delay(1000)
KillThread(thread) 
Beachte auch das 'FreeStructure()' im Thread, das du aufrufen solltest um das Element tatsächlich im Speicher zu löschen.

Re: MySQL-Anfrage in Thread auslagern

Verfasst: 29.02.2016 16:04
von techniker
Perfekt! :o

Vielen herzlichen Dank!
(Werde mir das ganze nun etwas genauer ansehen, da ich das aus der C-Welt (Controllerprogrammierung) etwas anders gewöhnt bin.. :allright: )