Sqlite concurrent Write queries in Threads
Posted: Sun Apr 19, 2015 10:26 pm
Hope it helps.
NB : Plz, tell me about the perfs you get.
With the debugger on, i get #70000 write queries in 14 sec.
NB : Plz, tell me about the perfs you get.
With the debugger on, i get #70000 write queries in 14 sec.
Code: Select all
; Name: Example Sqlite Write Within Threads
; Version: 1.0 2015-04-19
; PureBasic version: 5.31
; Author: Starwolf20
; SQLITE support threads natively but managing concurrent write queries (Insert, Delete, Update)
; can be tricky !!
; Here my example to show a way to do it. Hope it helps.
; Don't forget to activate the Thread checkmark in the PB options.
;
; NB : All the write queries are managed in the 'Thread_SqlitePoolWrite()' procedure with a DB opened specifically.
; : This choice permits doing some DB managing easily and in only one place.
;
;
; Thousands of thanks for the invaluable information finded in the PB Forums !!!!!!
; and to the PB Team.
EnableExplicit
UseSQLiteDatabase()
Global G_DBName$ = "TestTable.sq3"
Global G_ixDB = -1
Global G_ixDBWrite = -1
Global SqlMutex
Global G_CountWrite.l
#G_PREF_SeuilAutoCommit = 100
#G_PREF_NBThread = 20
#G_PREF_MAXNBLines = 100
Enumeration
#Running
#Ended
EndEnumeration
Structure S_Thread
Statut.i
Param.i
EndStructure
Procedure Open_ReadOnlyDB()
Protected iFilesize
Protected ixDB, ixFile
Protected Query$, Result
G_ixDB = 0
DeleteFile(G_DBName$)
ixFile = CreateFile(#PB_Any, G_DBName$)
If ixFile = 0 : Debug "ERR : DB creation error " + G_DBName$ : End
Else
CloseFile(ixFile)
ixDB = OpenDatabase(#PB_Any, G_DBName$, "", "")
If ixDB = 0 : Debug "ERR : DB open error " + G_DBName$ + ". - " + DatabaseError() : End
Else
G_ixDB = ixDB
Query$ = "CREATE TABLE TestTable "
Query$ = Query$ + " (ThreadID TEXT, "
Query$ = Query$ + " ID TEXT, "
Query$ = Query$ + " Info INTEGER "
Query$ = Query$ + " );"
Result = DatabaseUpdate(G_ixDB, Query$)
If Result = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : End : EndIf
EndIf
EndIf
EndProcedure
Procedure Open_WriteDB()
Protected Query$, Resultat
G_ixDBWrite = OpenDatabase(#PB_Any, G_DBName$, "", "")
If G_ixDBWrite = 0 : Debug "ERR : DB open error " + G_DBName$ + ". - " + DatabaseError() : EndIf
;--- PRAGMA definition
Query$ = "PRAGMA synchronous=OFF;"
Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf
Query$ = "PRAGMA read_uncommitted=ON;"
Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf
Query$ = "PRAGMA journal_mode=WAL;"
Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf
;Though limit the size of the journal, by using a small autocheckpoint And a journal_size_limit about thrice that size
;Query$ = "PRAGMA wal_autocheckpoint = 160;" ; /* number of 32KiB pages in a 512KiB journal */
;Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
;If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf
; Query$ = "PRAGMA journal_size_limit = 15360;" ;/* 512KiB * 3 */
;Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
;If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf
EndProcedure
Procedure Close_DB()
If IsDatabase(G_ixDB) <> 0 : CloseDatabase(G_ixDB) : EndIf
If IsDatabase(G_ixDBWrite) <> 0 : CloseDatabase(G_ixDBWrite) : EndIf
EndProcedure
Procedure Thread_SqlitePoolWrite(Query$, Type$ = "")
Protected Query1$, Result = -1
Static ISqlCompteur
; MUTEX Activation.
LockMutex(SqlMutex)
;Debug "SqlitePoolWrite " + Query$
Type$ = UCase(Type$)
Select Type$
; AutoCommit activated by the programm logic.
Case "FORCECOMMIT"
If iSqlCompteur > 0
Query1$ = "COMMIT TRANSACTION"
Result = DatabaseUpdate(G_ixDBWrite, Query1$)
If Result = 0 : Debug "ERR : DB Error on " + Query1$ + " - " + DatabaseError() : EndIf
iSqlCompteur = 0
EndIf
; BEGIN/COMMIT TRANSACTION managing.
Default
If iSqlCompteur = 0
Query1$ = "BEGIN TRANSACTION"
Result = DatabaseUpdate(G_ixDBWrite, Query1$)
If Result = 0 : Debug "ERR : DB Error on " + Query1$ + " - " + DatabaseError() : EndIf
EndIf
Result = DatabaseUpdate(G_ixDBWrite, Query$)
If Result = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf
iSqlCompteur = iSqlCompteur + 1
If iSqlCompteur >= #G_PREF_SeuilAutoCommit
Query1$ = "COMMIT TRANSACTION"
Result = DatabaseUpdate(G_ixDBWrite, Query1$)
If Result = 0 : Debug "ERR : DB Error on " + Query1$ + " - " + DatabaseError() : EndIf
iSqlCompteur = 0
EndIf
EndSelect
; MUTEX release.
UnlockMutex(SqlMutex)
ProcedureReturn Result
EndProcedure
Procedure Thread_Main(*Parm.S_Thread)
Protected Query$, Result, LInfo, L_IxDB, i, IMax, LID$, Param.l = *Parm\Param
*Parm\Statut = #Running
Debug "Thread " + Str(*Parm\Param) + " running."
; open a thread specific DB to permit the read part.
L_IxDB = OpenDatabase(#PB_Any, G_DBName$, "", "")
If L_ixDB = 0 : Debug "ERR : DB open error " + G_DBName$ + ". - " + DatabaseError() : EndIf
; add some data in DB with a FORCED COMMIT at the end of the loop to ""write"" data.
IMax = Random(#G_PREF_MAXNBLines, 10)
For i = 1 To IMax
Query$ = "INSERT INTO TestTable "
Query$ = Query$ + "(ThreadID, ID, Info) "
Query$ = Query$ + "VALUES ('" + Str(param) + "', '" + Str(i) + "', 1 ) "
Result = Thread_SqlitePoolWrite(Query$)
If Result = 0 : Debug "ERROR : DB Err on " + Query$ + " - " + DatabaseError() : EndIf
G_CountWrite +1
Next i
Result = Thread_SqlitePoolWrite("", "FORCECOMMIT")
If Result = 0 : Debug "ERR : DB Err on FORCECOMMIT " + Query$ + " - " + DatabaseError() : EndIf
; then read the same data to update them (not a useful process here , just to mix a lot of DB Read/Write )
For i = 1 To IMax
Query$ = "SELECT ID, Info "
Query$ = Query$ + "FROM TestTable "
Query$ = Query$ + "WHERE ThreadID = '" + Str(Param) + "' "
If DatabaseQuery(L_ixDB, Query$) = 0 : Debug "ERR : DB error on " + Query$ + " - " + DatabaseError() : EndIf
While NextDatabaseRow(L_ixDB)
LID$ = GetDatabaseString (L_ixDB, 0)
Linfo = GetDatabaseLong(L_ixDB, 1)
LInfo = Linfo + 1
Query$ = "UPDATE TestTable "
Query$ = Query$ + "SET Info = " + LInfo + " "
Query$ = Query$ + "WHERE ThreadID = '" + Str(Param) + "' "
Query$ = Query$ + " AND ID = '" + LID$ + "' "
Result = Thread_SqlitePoolWrite(Query$)
If Result = 0 : Debug "ERROR : DB Err on " + Query$ + " - " + DatabaseError() : EndIf
G_CountWrite +1
Wend
FinishDatabaseQuery(L_ixDB)
Next i
; do a FORCED COMMIT to ""write"" data
Result = Thread_SqlitePoolWrite("", "FORCECOMMIT")
If Result = 0 : Debug "ERR : DB Err on FORCECOMMIT " + Query$ + " - " + DatabaseError() : EndIf
CloseDatabase(L_IxDB)
*Parm\Statut = #Ended
EndProcedure
Global i, ThreadID,
Dim TArray.S_Thread(#G_PREF_NBThread)
Open_ReadOnlyDB()
Open_WriteDB()
SqlMutex = CreateMutex()
; launch several threads
For i=1 To #G_PREF_NBThread
TArray(i)\Param = i
ThreadID = CreateThread(@Thread_Main(), @TArray(i))
Next i
Debug "All threads launched......."
; wait for the end of all threads.
For i=1 To #G_PREF_NBThread
Repeat
If TArray(i)\Statut = #Ended : Debug "Thread " + Str(i) + " ended." : Break
Else : Delay(100)
EndIf
ForEver
Next i
Debug #LF$ + Str(G_CountWrite) + " DB Write within " + Str(#G_PREF_NBThread) + " threads."
Close_DB()