Threading problem...

Just starting out? Need help? Post your questions and find answers here.
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Threading problem...

Post by RichAlgeni »

I'm having a problem with threads in a web server program. The program opens a console window, and asks if you want to run it multi-threaded, or single-threaded. The program is fairly simple, it just reads from a single directory, in this case 'd:\webroot'. If the requested item does not exist, the program sends a simple 'not found' page.

In multi-threaded mode, the program is supposed to check for when threads end, then de-allocate the memory used.

In single thread mode, the program works fine. In multi-thread mode, I am getting strange memory errors. There is something I am just not understanding.

Some help would be greatly appreciated!

Rich

Code: Select all

; ------------------------------------------------------------
; Program name: con_srv.pb
; ------------------------------------------------------------

EnableExplicit

#ServerPort    = 12000
#BaseDirectory = "d:\webroot"
#DefaultPage   = "index.html"
#backSlash     = "\"

Define *MemoryID.i
Define *MemLoc.i
Define SEvent.i
Define ClientID.i
Define ClientIP.i
Define Thread.i
Define Result.i
Define llIndex.i
Define ListCt.i
Define ListNdx.i
Define rightNow.i
Define KeyPressed.s
Define mode.s
Define modeNumber
Define lastCheck.i   = ElapsedMilliseconds()

Declare ProcessRequest(*MemoryID)
Declare ErrorHandler()

OpenConsole()

; make sure we can initialize the network

If InitNetwork() = 0
    PrintN("Can't initialize the network!")
    Input()    
    End
EndIf

If CreateNetworkServer(0, #ServerPort) = 0
    PrintN("Can't create the Server on port " + Str(#ServerPort))
    Input()    
    End
EndIf

; define the lists needed to keep track of the threads used

NewList MemoryList.i()
NewList ThreadList.i()
NewList TimedList.i()

OnErrorCall(@ErrorHandler())

PrintN("Press Escape to exit")
PrintN("")
PrintN("Run as multi-threaded or single-threaded")
PrintN("")
PrintN("1. Multi-threaded")
PrintN("2. Single-threaded")
PrintN("Enter 1 or 2:")

mode = Input()
If mode = "1" Or mode = "2"
    modeNumber = Val(mode)
    If modeNumber = 1
        PrintN("Multi-threaded mode selected")
    Else
        PrintN("Single-threaded mode selected")
    EndIf
Else
    End
EndIf

; **********************************************************************************
; main loop here
; **********************************************************************************

Repeat

    SEvent = NetworkServerEvent() ; if we receive data, it will be indicated here

    Select SEvent
    Case #PB_NetworkEvent_Data ;    raw data has been received
        ClientID  = EventClient()
        ClientIP  = GetClientIP(ClientID)
        *MemoryID = AllocateMemory(SizeOf(ClientID))
        PokeI(*MemoryID, ClientID)

        If modeNumber = 1
            Thread = CreateThread(@ProcessRequest(), *MemoryID)
            Gosub AddToLists
        Else    
            ProcessRequest(*MemoryID)
            FreeMemory(*MemoryID)
        EndIf

    Default; nothing of importance to us has happened, go around again
        Delay(50); sleep 1/20th of a second so we don't take all of the processor
    EndSelect

; now see if any threads have completed, so that we can deallocate the memory

    If modeNumber = 1
        Gosub CheckThreads
    EndIf

    KeyPressed = Inkey()

Until KeyPressed = #ESC$

If modeNumber = 1
    PrintN("")
    PrintN("Checking thread for completion...")
    Repeat
        Delay(1000)
        lastCheck = lastCheck - 10000
        Gosub CheckThreads
    Until ListSize(MemoryList()) = 0
EndIf

CloseNetworkServer(0)

; **********************************************************************************
; End of main loop, procedures below process the web requests
; **********************************************************************************

; **********************************************************************************
; procedure below serves the web request
; **********************************************************************************

Procedure ProcessRequest(*MemoryLoc.i)

    Protected *memLocID.i
    Protected fileBuffer.s
    Protected connectNumber.i
    Protected RequestStr.s = Space(1000)
    Protected ContentType.s
    Protected RequestedFile.s
    Protected fileLength.i = 0
    Protected htmlHeader.s
    Protected fileName.s
    Protected Suffix.s
    Protected fileHandle.i
    Protected Result.i = 0
    Protected lenHeader.l
    Protected firstLine.s

    connectNumber = PeekI(*MemoryLoc)
    
    Result    = ReceiveNetworkData(connectNumber, @RequestStr, 1000)
    If Result <= 0
        RequestStr = ""
    EndIf

    If Left(RequestStr, 3) = "GET"
        firstLine = StringField(RequestStr, 1, #CR$)
        RequestedFile = StringField(firstLine, 2, " ")

        RequestedFile = ReplaceString(RequestedFile, "/", #backSlash)
        If Left(RequestedFile, 1) <> #backSlash
            RequestedFile = #backSlash + RequestedFile
        EndIf

; now determine the directory for the file needed

        RequestedFile = LCase(RequestedFile)
        fileName = StringField(RequestedFile, 1, ".")
        Suffix   = StringField(RequestedFile, 2, ".")

        RequestedFile = #BaseDirectory + RequestedFile

        fileHandle = ReadFile(#PB_Any, RequestedFile)
        If fileHandle
            fileLength = Lof(fileHandle)
        EndIf

; now see if the file exists and if so, read it, if not just close the socket

        If fileLength
            fileBuffer = Space(fileLength)
            Result     = ReadData(fileHandle, @fileBuffer, fileLength)
            CloseFile(fileHandle)
        EndIf

        If fileLength  = 0
            Suffix     = "htm"
            fileBuffer = "<html>" + #CRLF$
            fileBuffer = fileBuffer + "<head>" + #CRLF$
            fileBuffer = fileBuffer + "<title>Page Not Found</title>" + #CRLF$
            fileBuffer = fileBuffer + "</head>" + #CRLF$
            fileBuffer = fileBuffer + "<body>" + #CRLF$
            fileBuffer = fileBuffer + "<h2 align=" + #DQUOTE$ + "center" + #DQUOTE$ + ">Page Not Found</h2>" + #CRLF$
            fileBuffer = fileBuffer + "</body>" + #CRLF$
            fileBuffer = fileBuffer + "</html>" + #CRLF$
            fileBuffer = fileBuffer + #CRLF$
            fileLength = Len(fileBuffer)
        EndIf

        PrintN("Requested file: " + RequestedFile)

; check the file type requested

        Select Suffix
        Case "ico"
            ContentType = "image/ico"    + #CRLF$ + "Cache-Control: max-age=172800, public, must-revalidate"
        Case "png"
            ContentType = "image/png"    + #CRLF$ + "Cache-Control: max-age=172800, public, must-revalidate"
        Case "gif"
            ContentType = "image/gif"    + #CRLF$ + "Cache-Control: max-age=172800, public, must-revalidate"
        Case "jpg"
            ContentType = "image/jpeg"   + #CRLF$ + "Cache-Control: max-age=172800, public, must-revalidate"
        Case "txt"
            ContentType = "text/plain"   + #CRLF$ + "Cache-Control: max-age=172800, public, must-revalidate"
        Case "wav"
            ContentType = "audio/x-wav"  + #CRLF$ + "Cache-Control: max-age=172800, public, must-revalidate"
        Case "css"
            ContentType = "text/css"     + #CRLF$ + "Cache-Control: max-age=86400, public, must-revalidate"
        Case "xml"
            ContentType = "text/xml"     + #CRLF$ + "Cache-Control: no-cache, must-revalidate" + #CRLF$ + "Pragma: no-cache"
        Case "js"
            ContentType = "application/javascript" + #CRLF$ + "Cache-Control: max-age=86400, public, must-revalidate"
        Case "zip"
            ContentType = "application/zip" + #CRLF$ + "Cache-Control: no-cache, must-revalidate" + #CRLF$ + "Pragma: no-cache"
        Case "csv", "xls", "xlsm", "xlsx"
            ContentType = "application/vnd.ms-excel" + #CRLF$ + "filename=" + RequestedFile
            ContentType = ContentType    + #CRLF$ + "Cache-Control: private, max-age=15"
        Default
            ContentType = "text/html"    + #CRLF$ + "Cache-Control: no-cache, must-revalidate" + #CRLF$ + "Pragma: no-cache"
        EndSelect

; create the html header

        htmlHeader  = "HTTP/1.1 200 OK" + #CRLF$
        htmlHeader  = htmlHeader + "Date: Wed, 07 May 2011 11:15:43 GMT" + #CRLF$
        htmlHeader  = htmlHeader + "Content-Length: " + Str(fileLength) + #CRLF$
        htmlHeader  = htmlHeader + "Content-Type: " + ContentType + #CRLF$ + #CRLF$
        lenHeader   = Len(htmlHeader)

; now send the data

        SendNetworkString(connectNumber, htmlHeader)
        SendNetworkData(connectNumber, @fileBuffer, fileLength)
    EndIf

    CloseNetworkConnection(connectNumber)

EndProcedure

; **********************************************************************************
; error handler
; **********************************************************************************

Procedure ErrorHandler()
 
    PrintN("A program error was detected:")
    PrintN("")
    PrintN("Error Message:   " + ErrorMessage())
    PrintN("Error Code:      " + Str(ErrorCode()))
    PrintN("Code Address:    " + Str(ErrorAddress()))

    If ErrorCode() = #PB_OnError_InvalidMemory
        PrintN("Target Address:  " + Str(ErrorTargetAddress()))
    EndIf

    PrintN("Sourcecode line: " + Str(ErrorLine()))
    PrintN("Sourcecode file: " + ErrorFile())

    PrintN("Register content:")

    CompilerSelect #PB_Compiler_Processor
    CompilerCase #PB_Processor_x86
        PrintN("EAX = " + Str(ErrorRegister(#PB_OnError_EAX)))
        PrintN("EBX = " + Str(ErrorRegister(#PB_OnError_EBX)))
        PrintN("ECX = " + Str(ErrorRegister(#PB_OnError_ECX)))
        PrintN("EDX = " + Str(ErrorRegister(#PB_OnError_EDX)))
        PrintN("EBP = " + Str(ErrorRegister(#PB_OnError_EBP)))
        PrintN("ESI = " + Str(ErrorRegister(#PB_OnError_ESI)))
        PrintN("EDI = " + Str(ErrorRegister(#PB_OnError_EDI)))
        PrintN("ESP = " + Str(ErrorRegister(#PB_OnError_ESP)))

    CompilerCase #PB_Processor_x64
        PrintN("RAX = " + Str(ErrorRegister(#PB_OnError_RAX)))
        PrintN("RBX = " + Str(ErrorRegister(#PB_OnError_RBX)))
        PrintN("RCX = " + Str(ErrorRegister(#PB_OnError_RCX)))
        PrintN("RDX = " + Str(ErrorRegister(#PB_OnError_RDX)))
        PrintN("RBP = " + Str(ErrorRegister(#PB_OnError_RBP)))
        PrintN("RSI = " + Str(ErrorRegister(#PB_OnError_RSI)))
        PrintN("RDI = " + Str(ErrorRegister(#PB_OnError_RDI)))
        PrintN("RSP = " + Str(ErrorRegister(#PB_OnError_RSP)))
        PrintN("Display of registers R8-R15 skipped.")
    CompilerEndSelect

    Input()
    End

EndProcedure

; **********************************************************************************
; end of procedures
; **********************************************************************************

End

; **********************************************************************************
; add to the lists so we can deallocate memmory as needed
; **********************************************************************************

AddToLists:

AddElement(MemoryList())
MemoryList() = *MemoryID
AddElement(ThreadList())
ThreadList() = Thread
AddElement(TimedList())
TimedList()  = ElapsedMilliseconds()

Return

; **********************************************************************************
; check the threads, so we can deallocate memmory as needed
; **********************************************************************************

CheckThreads:

rightNow = ElapsedMilliseconds()

If rightNow - lastCheck > 9999; only check once every 10 seconds
    lastCheck = rightNow
    ListCt = ListSize(ThreadList())
    For ListNdx=ListCt To 1 Step -1
        llIndex = ListNdx - 1; offset is 0
        SelectElement(MemoryList(), llIndex)
        SelectElement(ThreadList(), llIndex)
        SelectElement(TimedList(),  llIndex)
        If IsThread(ThreadList()) = 0
            *MemLoc = MemoryList()
            FreeMemory(*MemLoc)
            DeleteElement(MemoryList())
            DeleteElement(ThreadList())
            DeleteElement(TimedList())
        Else
            If rightNow - TimedList() > 59999; kill the thread after 60 seconds
                KillThread(ThreadList())
                *MemLoc = MemoryList()
                FreeMemory(*MemLoc)
                DeleteElement(MemoryList())
                DeleteElement(ThreadList())
                DeleteElement(TimedList())
            EndIf
        EndIf
    Next

; re-initialize the lists if they are empty

    If ListSize(MemoryList()) = 0
        ClearList(MemoryList())
        ClearList(ThreadList())
        ClearList(TimedList())
        PrintN("Lists are cleared, remember to press Escape to exit")
    EndIf
EndIf

Return

; IDE Options = PureBasic 4.51 (Windows - x64)
; CursorPosition = 68
; FirstLine = 54
; Folding = -
; EnableThread
; EnableXP
; EnableOnError
; Executable = con_srv.exe
; HideErrorLog
; CurrentDirectory = D:\dev\PureBasic\temp\
; CompileSourceDirectory
; Compiler = PureBasic 4.51 (Windows - x64)
; EnablePurifier
auser
Enthusiast
Enthusiast
Posts: 195
Joined: Wed Sep 06, 2006 6:59 am

Re: Threading problem...

Post by auser »

The built in network functions are not threadsafe - as far I know. But I'm no expert so please excuse me if I'm wrong. Anyway I've found that library that inserts mutex stuff in lot of the network-functions that replace the built in ones (and I guess there was a reason for the creator to create it):

http://forums.purebasic.com/german/view ... 1144c3642c


But even with that lib... what would happen if you got the second request and a second networkserverevent before the first thread started/finished it's work (and even would close the connection afterwards)?

Greetings...
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Re: Threading problem...

Post by RichAlgeni »

Does anyone know if the built-in network functions are thread-safe?
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Re: Threading problem...

Post by RichAlgeni »

After testing much of the night, then again this morning, I believe at this point (emphasis on 'at this point!'), that the network functions ARE thread safe. Why do I say this? I wrote a new server process that just open receives a client socket, reads in the data, create a thread, opens another socket, then sends the data on without modifying it. To test this server process, I wrote another process that created 100 threads, each opened a client socket to the server, and sent it a small string. There was a 1/10 second delay between each thread being created. I tested these two processes a number of times, and not once did either crash.

Like I said, as of right now, it does not look like it's the network. To paraphrase Mr. Spock:
if you eliminate the impossible, whatever remains, however improbable, must be the truth.
It looks like it's string functions that are the problem. Thinking out loud, what if we created a dimensioned array structure, incremented the array, then passed that structured array data to the thread?

Any thoughts?

Rich
PMV
Enthusiast
Enthusiast
Posts: 727
Joined: Sat Feb 24, 2007 3:15 pm
Location: Germany

Re: Threading problem...

Post by PMV »

German, but you can translate it with google if needed :)
http://forums.purebasic.com/german/view ... 9b8a280080
Bei den Netzwerkbefehlen ist das Erstellen und Schließen eines Servers nicht sicher, genau so wie dessen Ereignisbehandlung dürfen mehrere Threads nicht gleichzeitig diese Befehle nutzen können. Für Multiserverprogramme müssen die Serverfunktionen also manuell abgesichert werden

If you really want to use the network lib in a safe way, use
the linked project from cxAlex and activate the threadsafe
option in compiler.

MFG PMV
auser
Enthusiast
Enthusiast
Posts: 195
Joined: Wed Sep 06, 2006 6:59 am

Re: Threading problem...

Post by auser »

Probably even take a look here. I run into troubles regarding that myself in the past: http://www.purebasic.fr/german/viewtopi ... 20&t=23366 (again german)

I think you have to be carefull when using CloseNetworkConnection. I'm using a function like that to check if the connection is (still) valid (and use ConnectionID() to get the socket after opennetworkconnection for example).

Code: Select all

; you can get the socket via PBs built in ConnectionID()

Procedure IsSocketValid(hSocket.l)
  If ioctlsocket_(hSocket.l, #FIONREAD, @Length) = 0
    ProcedureReturn(1)
  Else
    ProcedureReturn(0)
  EndIf 
EndProcedure
Greetings...
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Re: Threading problem...

Post by RichAlgeni »

Thanks A!
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Re: Threading problem...

Post by RichAlgeni »

From MSDN:
FIONREAD or #FIONREAD in our case

Use to determine the amount of data pending in the network's input buffer that can be read from socket s. The argp parameter points to an unsigned long value in which ioctlsocket stores the result. FIONREAD returns the amount of data that can be read in a single call to the receive function, which may not be the same as the total amount of data queued on the socket. If s is message oriented (for example, type SOCK_DGRAM), FIONREAD still returns the amount of pending data in the network buffer, however, the amount that can actually be read in a single call to the receive function is limited to the data size written in the send or send to function call.
If this is the case, and we use "AUser's" code from above, it seems like we are just checking to see if the socket's receive buffer is clear. If so, should we not read in the receive buffer until the buffer is empty? I am going to stress test the following code. Basically, it checks the receive buffer and reads in the data, until the receive buffer is clear. At that point it closes the socket.

Code: Select all

Procedure CloseValidSocket(connectNumber.i)

    Protected Length.l = 0
    Protected Result.l
    Protected Tries.l  = 0

    Repeat
        Result = ioctlsocket_(connectNumber, #FIONREAD, @Length)
        If Result = 0
            CloseNetworkConnection(connectNumber)
            ProcedureReturn 1; receive buffer was clear so the socket was succesfully closed
        Else
            Tries = Tries + 1
            If Tries < 5; arbitrary number of tries
                NetworkRead(connectNumber.i); just read in the receive buffer and discard the data
            Else
                ProcedureReturn 0; close was unsuccesful as there still was data in the receive buffer
            Endif
        EndIf
    Forever

EndProcedure
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Re: Threading problem...

Post by RichAlgeni »

Modified slightly, but importantly, to add Result < 0

Code: Select all

Procedure CloseValidSocket(connectNumber.i)

    Protected Length.l = 0
    Protected Result.l
    Protected Tries.l  = 0

    Repeat
        Result = ioctlsocket_(connectNumber, #FIONREAD, @Length)
        If Result < 0
            ProcedureReturn Result; socket already closed
        ElseIf Result = 0
            CloseNetworkConnection(connectNumber)
            ProcedureReturn 1; receive buffer was clear so the socket was succesfully closed
        Else; > 0
            Tries = Tries + 1
            If Tries < 5; arbitrary number of tries
                NetworkRead(connectNumber.i); just read in the receive buffer and discard the data
            Else
                ProcedureReturn 0; close was unsuccesful as there still was data in the receive buffer
            Endif
        EndIf
    Forever

EndProcedure
cxAlex
User
User
Posts: 88
Joined: Fri Oct 24, 2008 11:29 pm
Location: Austria
Contact:

Re: Threading problem...

Post by cxAlex »

Here is a big include to use Network and Threads in an easy and safe way: http://www.purebasic.fr/german/viewtopic.php?t=23777
It also can help you to send and receive data in a safe way and implement your own protocols and much more.

Greets, Alex
auser
Enthusiast
Enthusiast
Posts: 195
Joined: Wed Sep 06, 2006 6:59 am

Re: Threading problem...

Post by auser »

@RichAlgeni

I'm pretty sure your code would not work currently cause "CloseNetworkConnection" needs the PB-Connection but ioctlsocket needs the socketdescriptor instead (which you can get with ConnectionID(PB-Connection)). You used both times the same.

I don't know what the returnvalue from OpenNetworkConnection really is. It seems to be a pointer to... don't know. If somebody knows exactly please tell me. If I peek it then it seems I get "1" returned if the connection is ok (maybe by luck?) and some changing number if not. But because I don't really know what that pointer points to and if I can use and trust it even if the connection is already closed I have to tow that annoying socketdescriptor additional to the connectionnumber through my code because I need it to check the status of the connection first but I need the connectionnumber returned by opennetworkconnection for all the other stuff (like closing) instead.

Btw. I don't really care regarding the return-value of #FIONREAD and using it just to get zero (then it's good) or a number of a socket-error (which I don't care which one - then for me it's not valid anymore). You can even use #FIONBIO but I disliked that because I would not allways set something at the socket there so I thought it's nicer to use that #FIONREAD cause it seems it's intended to tell me something ;)

Greetings,
auser
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Re: Threading problem...

Post by RichAlgeni »

So you'd rather see it this way???

Code: Select all

Procedure CloseValidSocket(connectNumber.i)

    Protected Length.l = 0
    Protected Result.l
    Protected handle.l
    Protected Tries.l  = 0

    handle = ConnectionID(connectNumber); get the handle needed by 'ioctlsocket'

    Repeat
        Result = ioctlsocket_(handle, #FIONREAD, @Length)
        If Result < 0
            CloseNetworkConnection(connectNumber)
            ProcedureReturn Result; socket already closed
        ElseIf Result = 0
            CloseNetworkConnection(connectNumber)
            ProcedureReturn 1; receive buffer was clear so the socket was successfully closed
        Else
            Tries = Tries + 1
            If Tries < 5; arbitrary number of tries
                NetworkReadAscii(connectNumber.i); just read in the receive buffer and discard the data
            Else
                ProcedureReturn 0; close was unsuccessful, as there still was data in the receive buffer
            Endif
        EndIf
    Forever

EndProcedure
auser
Enthusiast
Enthusiast
Posts: 195
Joined: Wed Sep 06, 2006 6:59 am

Re: Threading problem...

Post by auser »

I'm not sure if it's the best idea to get the socket at such a late moment.
If you run "ConnectionID()" to an invalid connection-number it would crash your app. It seems it does not crash your app if you use ConnectionID to an already closed connection-number that was closed just a short time before but it doesn't really feel save to me. I think it's better to call it just once after you successfully opened a network connection ang got a connection-number and pull the socket through the code together with the connection-number.

Greetings,
auser
User avatar
RichAlgeni
Addict
Addict
Posts: 935
Joined: Wed Sep 22, 2010 1:50 am
Location: Bradenton, FL

Re: Threading problem...

Post by RichAlgeni »

You're right, I removed that line.

Thanks again!
JustinJack
User
User
Posts: 89
Joined: Thu Feb 04, 2010 7:34 am
Location: Decatur, TX
Contact:

Re: Threading problem...

Post by JustinJack »

I wrote client / server software for our company using the native network lib. It had to be multi-threaded b/c several clients had to be connected with a thread for each one to handle several sometimes long running procedures, what I did was to create a mutex-protected set of functions that the threads would call when they wanted to access the network functions. My main thread would call LogServerEvent() which would get the latest ServerEvent() in a mutex-protected linked list, and the clients would call NextServerEvent( clientID, @lpSrvEvent.serverEvent ) which would return a structure containing all the relevant data or return 0 if there was no event for it.
Post Reply