Page 1 of 1

Sqlite concurrent Write queries in Threads

Posted: Sun Apr 19, 2015 10:26 pm
by Starwolf20
Hope it helps.
NB : Plz, tell me about the perfs you get.
With the debugger on, i get #70000 write queries in 14 sec.

Code: Select all

; Name: Example Sqlite Write Within Threads
; Version: 1.0 2015-04-19
; PureBasic version: 5.31
; Author: Starwolf20

; SQLITE support threads natively but managing concurrent write queries (Insert, Delete, Update) 
; can be tricky !!
; Here my example to show a way to do it. Hope it helps.
; Don't forget to activate the Thread checkmark in the PB options.
;
; NB : All the write queries are managed in the 'Thread_SqlitePoolWrite()' procedure with a DB opened specifically.
;    : This choice permits doing some DB managing easily and in only one place.
;
;
; Thousands of thanks for the invaluable information finded in the PB Forums !!!!!!
; and to the PB Team.


EnableExplicit
UseSQLiteDatabase()

Global G_DBName$ = "TestTable.sq3"
Global G_ixDB = -1
Global G_ixDBWrite = -1
Global SqlMutex
Global G_CountWrite.l

#G_PREF_SeuilAutoCommit = 100
#G_PREF_NBThread = 20
#G_PREF_MAXNBLines = 100

Enumeration
  #Running
  #Ended
EndEnumeration

Structure S_Thread
  Statut.i
  Param.i
EndStructure

Procedure   Open_ReadOnlyDB()
 
  Protected iFilesize
  Protected ixDB, ixFile
  Protected Query$, Result
  
  G_ixDB = 0
  DeleteFile(G_DBName$)
    
  ixFile = CreateFile(#PB_Any, G_DBName$)
  If ixFile = 0 : Debug "ERR : DB creation error "  + G_DBName$ : End
  Else      
    CloseFile(ixFile)
    ixDB = OpenDatabase(#PB_Any, G_DBName$, "", "")
    If ixDB = 0 : Debug "ERR : DB open error " + G_DBName$ + ". - " + DatabaseError() : End
    Else
      G_ixDB = ixDB        
            
      Query$ =          "CREATE TABLE TestTable "
      Query$ = Query$ + " (ThreadID                 TEXT, "
      Query$ = Query$ + "  ID                       TEXT, "
      Query$ = Query$ + "  Info                     INTEGER "   
      Query$ = Query$ + " );" 
      Result = DatabaseUpdate(G_ixDB, Query$)
      If Result = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : End : EndIf    
    EndIf
 EndIf

                  
EndProcedure
Procedure   Open_WriteDB()
 
  Protected Query$, Resultat

   G_ixDBWrite = OpenDatabase(#PB_Any, G_DBName$, "", "")
   If G_ixDBWrite = 0 : Debug "ERR : DB open error " + G_DBName$ + ". - " + DatabaseError() : EndIf
         
   ;--- PRAGMA  definition
   Query$ = "PRAGMA synchronous=OFF;"
   Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
   If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf     
   Query$ = "PRAGMA read_uncommitted=ON;"
   Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
   If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf           
   
   Query$ = "PRAGMA journal_mode=WAL;"
   Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
   If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf         
   ;Though limit the size of the journal, by using a small autocheckpoint And a journal_size_limit about thrice that size 
   ;Query$ = "PRAGMA wal_autocheckpoint = 160;" ; /* number of 32KiB pages in a 512KiB journal */
   ;Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
   ;If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf     
   ;   Query$ = "PRAGMA journal_size_limit = 15360;" ;/* 512KiB * 3 */
   ;Resultat = DatabaseUpdate(G_ixDBWrite, Query$)
   ;If Resultat = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf     
   
EndProcedure
Procedure   Close_DB()
 
  If IsDatabase(G_ixDB) <> 0      : CloseDatabase(G_ixDB)      : EndIf
  If IsDatabase(G_ixDBWrite) <> 0 : CloseDatabase(G_ixDBWrite) : EndIf  
  
EndProcedure

Procedure   Thread_SqlitePoolWrite(Query$, Type$ = "")
 
 Protected Query1$, Result = -1
 Static ISqlCompteur
 
  ; MUTEX Activation.
  LockMutex(SqlMutex)
 
  ;Debug "SqlitePoolWrite     " + Query$
  Type$ = UCase(Type$)
  Select Type$
    ; AutoCommit activated by the programm logic. 
    Case "FORCECOMMIT"
      If iSqlCompteur > 0 
        Query1$ =          "COMMIT TRANSACTION"
        Result = DatabaseUpdate(G_ixDBWrite, Query1$)
        If Result = 0 : Debug "ERR : DB Error on " + Query1$ + " - " + DatabaseError() : EndIf     
        iSqlCompteur = 0
      EndIf
    ; BEGIN/COMMIT TRANSACTION managing.   
    Default
      If iSqlCompteur = 0 
         Query1$ =          "BEGIN TRANSACTION"
         Result = DatabaseUpdate(G_ixDBWrite, Query1$)
         If Result = 0 : Debug "ERR : DB Error on " + Query1$ + " - " + DatabaseError() : EndIf     
      EndIf
     
      Result = DatabaseUpdate(G_ixDBWrite, Query$)
      If Result = 0 : Debug "ERR : DB Error on " + Query$ + " - " + DatabaseError() : EndIf     
      iSqlCompteur = iSqlCompteur + 1
     
      If iSqlCompteur >= #G_PREF_SeuilAutoCommit
         Query1$ =          "COMMIT TRANSACTION"
         Result = DatabaseUpdate(G_ixDBWrite, Query1$)
         If Result = 0 : Debug "ERR : DB Error on " + Query1$ + " - " + DatabaseError() : EndIf     
         iSqlCompteur = 0
      EndIf
  EndSelect
   
  ; MUTEX release.  
  UnlockMutex(SqlMutex)
  ProcedureReturn Result
EndProcedure
Procedure   Thread_Main(*Parm.S_Thread)
  
 Protected Query$, Result, LInfo, L_IxDB, i, IMax, LID$, Param.l = *Parm\Param

 *Parm\Statut = #Running
 Debug "Thread " + Str(*Parm\Param) + " running."
  
  ; open a thread specific DB to permit the read part. 
  L_IxDB = OpenDatabase(#PB_Any, G_DBName$, "", "")
  If L_ixDB = 0 : Debug "ERR : DB open error " + G_DBName$ + ". - " + DatabaseError() : EndIf
  
  ; add some data in DB with a FORCED COMMIT at the end of the loop to ""write"" data.
  IMax = Random(#G_PREF_MAXNBLines, 10)
  For i = 1 To IMax
    Query$ =          "INSERT INTO TestTable "
    Query$ = Query$ + "(ThreadID, ID, Info) "
    Query$ = Query$ + "VALUES ('" + Str(param) + "', '" + Str(i) + "', 1 ) "
    
    Result = Thread_SqlitePoolWrite(Query$)
    If Result = 0 : Debug "ERROR : DB Err on " + Query$ + " - " + DatabaseError() : EndIf 
    G_CountWrite +1
  Next i
   
  Result = Thread_SqlitePoolWrite("", "FORCECOMMIT")
  If Result = 0 : Debug "ERR : DB Err on FORCECOMMIT " + Query$ + " - " + DatabaseError() : EndIf 
  
  ; then read the same data to update them (not a useful process here , just to mix a lot of DB Read/Write )
  For i = 1 To IMax
    Query$ =          "SELECT ID, Info "
    Query$ = Query$ + "FROM TestTable "
    Query$ = Query$ + "WHERE ThreadID = '" + Str(Param) + "' "

    If DatabaseQuery(L_ixDB, Query$) = 0 : Debug "ERR : DB error on " + Query$ + " - " + DatabaseError() : EndIf
    While NextDatabaseRow(L_ixDB) 
      LID$  = GetDatabaseString (L_ixDB,  0)
      Linfo = GetDatabaseLong(L_ixDB,  1)
      LInfo = Linfo + 1
       
      Query$ =          "UPDATE TestTable "
      Query$ = Query$ + "SET Info = " + LInfo + " "
      Query$ = Query$ + "WHERE ThreadID = '" + Str(Param) + "' "
      Query$ = Query$ + "  AND ID = '" + LID$ + "' "
      
      Result = Thread_SqlitePoolWrite(Query$)
      If Result = 0 : Debug "ERROR : DB Err on " + Query$ + " - " + DatabaseError() : EndIf 
      G_CountWrite +1
    Wend
    FinishDatabaseQuery(L_ixDB)
   Next i  
   
   ; do a FORCED COMMIT to ""write"" data
   Result = Thread_SqlitePoolWrite("", "FORCECOMMIT")
   If Result = 0 : Debug "ERR : DB Err on FORCECOMMIT " + Query$ + " - " + DatabaseError() : EndIf 
   
   CloseDatabase(L_IxDB)
   *Parm\Statut = #Ended
EndProcedure



Global i, ThreadID,
Dim TArray.S_Thread(#G_PREF_NBThread)
     
  Open_ReadOnlyDB()             
  Open_WriteDB()       
         
  SqlMutex = CreateMutex()
  
  ; launch several threads
  For i=1 To #G_PREF_NBThread
    TArray(i)\Param = i
    ThreadID = CreateThread(@Thread_Main(), @TArray(i)) 
  Next i
  Debug "All threads launched......."
   
  ; wait for the end of all threads.
  For i=1 To #G_PREF_NBThread
    Repeat
      If TArray(i)\Statut = #Ended : Debug "Thread " + Str(i) + " ended." : Break
      Else                         : Delay(100)
      EndIf   
    ForEver    
   Next i
   
   Debug #LF$ + Str(G_CountWrite) + " DB Write within " + Str(#G_PREF_NBThread) + " threads."  
  Close_DB() 
 

Re: Sqlite concurrent Write queries in Threads

Posted: Mon Apr 20, 2015 1:48 pm
by blueb
My results...
Win7 Pro - PureBasic 5.31(x86)

82990 DB Write within 20 threads.
Time taken: 12028msecs

Re: Sqlite concurrent Write queries in Threads

Posted: Thu Apr 23, 2015 9:49 pm
by Ocean
This is extremely helpful, thank you for posting this snippet!

Ocean