Page 1 of 1

Problem with threads and network functions

Posted: Tue Nov 20, 2007 3:16 pm
by Kukulkan
Hi,

I programed a server-application which should provide some cryptographic functions to clients. If I use this code without threads, everything went fine (exept the bad performance). If I use threads, everything is fine until I use two clients. Somethimes the message is lost and sometimes the program even crashes.

Here is the server:

Code: Select all

; Network-Test

; ##############################################
; THREAD ROUTINE
; ##############################################

Procedure ManageConnection(ConnectionID.l)
  ; Threaded Management. The parameter is the used network-connection-id
  Protected length.l, Received.s
  Protected *NetworkBuffer.l
  Protected tStart.l, tDuration.l, tCount.l
  Shared lngMutex.l
  
  ; receive data from network connection until the last char is "X" or for max. 5 seconds
  Received.s = ""
  tCount.l = 0
  tStart.l = ElapsedMilliseconds()
  Repeat
    If tCount.l > 0: Delay(10): EndIf       ; to avoid 100% CPU
    *NetworkBuffer.l = AllocateMemory(8192)  ; 8 KB Puffer for Server
    length.l = ReceiveNetworkData(ConnectionID.l, *NetworkBuffer.l, 8192)
    If length.l > 0
      Received.s = Received.s + PeekS(*NetworkBuffer.l, length.l)
      tCount.l = tCount.l + 1
    EndIf
    FreeMemory(*NetworkBuffer.l)
  Until Right(Received.s, 1) = "X" Or ElapsedMilliseconds() > tStart.l + 5000 ; 5 seconds timeout
  
  ; close network connection
  CloseNetworkConnection(ConnectionID.l)
  
  ; do some work for 30ms
  tStart.l = ElapsedMilliseconds() + 30
  While ElapsedMilliseconds() < tStart.l: Wend
  
  LockMutex(lngMutex.l)
  PrintN(FormatDate("%yyyy/%mm/%dd %hh:%ii:%ss", Date()) + " - " + Left(Received.s, 35))
  UnlockMutex(lngMutex.l)
  
EndProcedure

; ##############################################
; MAIN ROUTINE
; ##############################################

InitNetwork()

lngMutex.l = CreateMutex() ; create a mutex to sync output between threads

OpenConsole()

ServerConnectionID.l = CreateNetworkServer(1, 999, #PB_Network_TCP)

PrintN("Server started listening on port 999. Press ESC to stop.")
PrintN(" ")

While Quit.b = #False
  
  ; press ESC to close
  key_pressed.s = Inkey()
  If Left(key_pressed.s, 1) = Chr(27)
    Erg.l = MessageRequester("Shutdown Server?","Do you really want to shutdown the server?",#PB_MessageRequester_YesNo + #MB_SYSTEMMODAL)
    If Erg.l = #PB_MessageRequester_Yes
      Quit.b = #True
    EndIf
  EndIf
  
  ; check network data
  SEvent.l = NetworkServerEvent()
  If SEvent.l <> 0: CID.l = EventClient(): EndIf
  
  Select SEvent.l
    Case 1 ; new Client
      
    Case 2 ; data arrival
      
      ; uncomment to avoid threading
      ; ManageConnection(CID.l)
      
      ; uncomment to use threading
      CreateThread(@ManageConnection(), CID.l)
      
    Case 4 ; client disconnected

  EndSelect
  
  Delay(1) ; avoid 100% CPU
Wend

; cleanup
FreeMutex(lngMutex.l) ; free mutex
CloseNetworkServer(1)
CloseConsole()
End
Here is the test-client which will give some pressure:

Code: Select all

; Test-Client

InitNetwork()

For x.l = 1 To 500

  ConnectionID.l = OpenNetworkConnection("localhost", 999)
  
  If ConnectionID.l <> 0
  
    SendNetworkString(ConnectionID, "This should get received..." + Space(2000) + "X")
    CloseNetworkConnection(ConnectionID.l)
    
  Else
  
    Debug "no connection!"
    
  EndIf
  
  Delay(20)
Next
Please compile both programs as windows-executable. Start the server.

Now start one client and everything will be ok (needs about 17 seconds here). But if you start the client two times, there will be some curious behaviour (the server will get very slow and sometimes the message get's lost).

The question is: Is it my application or is it the network stack? How can I count the number of running threads?

Do you know any hint for me to let this baby run smooth? Is there a bug inside? Such situations may occur and I need to handle this in a clean way. It is ok, in such situation, that the performance slows down. But not in this way (one per second and less) :-(

Any ideas?

Kukulkan

Posted: Wed Nov 21, 2007 11:03 am
by Kukulkan
Noone knows?

I updated the server-program to use a maximum of 10 threads. If you start this and start the client later, everything goes fine. But if you try to scroll through the output of the server, you will get invalid memory access in the ReceiveNetworkData() function. This may be a bug in the ReceiveNetworkData() function if used inside of threads.

Here is the new server-code:

Code: Select all

; Network-Test

; ##############################################
; THREAD ROUTINE
; ##############################################

Procedure ManageConnection(ConnectionID.l)
  ; Threaded Management. The parameter is the used network-connection-id
  Protected length.l, Received.s
  Protected *NetworkBuffer.l
  Protected tStart.l, tDuration.l, tCount.l
  Shared lngMutex.l
  Shared lngTheadCount.l
  
  ; receive data from network connection until the last char is "X" or for max. 5 seconds
  Received.s = ""
  tCount.l = 0
  tStart.l = ElapsedMilliseconds()
  Repeat
    If tCount.l > 0: Delay(10): EndIf       ; to avoid 100% CPU
    *NetworkBuffer.l = AllocateMemory(8192)  ; 8 KB Puffer for Server
    length.l = ReceiveNetworkData(ConnectionID.l, *NetworkBuffer.l, 8192)
    If length.l > 0
      Received.s = Received.s + PeekS(*NetworkBuffer.l, length.l)
      tCount.l = tCount.l + 1
    EndIf
    FreeMemory(*NetworkBuffer.l)
  Until Right(Received.s, 1) = "X" Or ElapsedMilliseconds() > tStart.l + 5000 ; 5 seconds timeout
  
  ; close network connection
  CloseNetworkConnection(ConnectionID.l)
  
  If ElapsedMilliseconds() > tStart.l + 5000
    ; timeout
    LockMutex(lngMutex.l)
    PrintN(FormatDate("%yyyy/%mm/%dd %hh:%ii:%ss", Date()) + " - TIMEOUT")
    UnlockMutex(lngMutex.l)
  Else
    
    ; do some work for 30ms
    tStart.l = ElapsedMilliseconds() + 30
    While ElapsedMilliseconds() < tStart.l: Wend
  
    ; output the received data for test purposes
    LockMutex(lngMutex.l)
    PrintN(FormatDate("%yyyy/%mm/%dd %hh:%ii:%ss", Date()) + " - " + Left(Received.s, 35))
    UnlockMutex(lngMutex.l)
  EndIf
  
  ; remove one thread
  lngTheadCount.l = lngTheadCount.l - 1
EndProcedure

; ##############################################
; MAIN ROUTINE
; ##############################################

InitNetwork()

lngMutex.l = CreateMutex() ; create a mutex to sync output between threads
lngTheadCount.l = 0

OpenConsole()

ServerConnectionID.l = CreateNetworkServer(1, 999, #PB_Network_TCP)

PrintN("Server started listening on port 999. Press ESC to stop.")
PrintN(" ")

While Quit.b = #False
  
  ; press ESC to close
  key_pressed.s = Inkey()
  If Left(key_pressed.s, 1) = Chr(27)
    Erg.l = MessageRequester("Shutdown Server?","Do you really want to shutdown the server?",#PB_MessageRequester_YesNo + #MB_SYSTEMMODAL)
    If Erg.l = #PB_MessageRequester_Yes
      Quit.b = #True
    EndIf
  EndIf
  
  ; check network data
  SEvent.l = NetworkServerEvent()
  If SEvent.l <> 0: CID.l = EventClient(): EndIf
  
  Select SEvent.l
    Case 1 ; new Client
      
    Case 2 ; data arrival
      
      ; uncomment to avoid threading
      ; ManageConnection(CID.l)
      
      ; uncomment to use threading
      While lngTheadCount.l >= 10
        ; maximum number of threads
        Delay(1)
      Wend
      lngTheadCount.l = lngTheadCount.l + 1
      LockMutex(lngMutex.l)
      PrintN("start thread " + Str(lngTheadCount.l))
      UnlockMutex(lngMutex.l)
      CreateThread(@ManageConnection(), CID.l)
      
      
    Case 4 ; client disconnected

  EndSelect
  
  Delay(1) ; avoid 100% CPU
Wend

; cleanup
FreeMutex(lngMutex.l) ; free mutex
CloseNetworkServer(1)
CloseConsole()
End
How to crash (PureBasic V4.02):

1. Start the server (debug-mode)
2. Start the compiled client (see first post)
3. Scroll through the output of the server
crash using ReceiveNetworkData() with invalid memory access.

Is there a bug in my code or in the function?

Kukulkan

Posted: Wed Nov 21, 2007 4:46 pm
by Trond
Are you compiling with threadsafe?

Posted: Wed Nov 21, 2007 4:58 pm
by Kukulkan
Are you compiling with threadsafe?
Yes,

Please try to test with the source above. It is strange!

Kukulkan

Posted: Wed Nov 21, 2007 5:22 pm
by Trond
There is neither slowdown nor crash when I test your first code with the latest version of PB with threadsafe enabled.

Posted: Wed Nov 21, 2007 5:22 pm
by Foz
I think your problem is that you are using ReceiveNetworkData in threads. Treat that command more like a FIFO queue.

Try dealing with the data in threads after you have received the data - it makes a lot more sense that way.

I'm creating my own functions to do exactly this - so there is one main loop that receives all the data and when the data is complete, it then threads it off to be dealt with by the appropriate function.

Posted: Wed Nov 21, 2007 5:56 pm
by Kukulkan
Hi Trond,
There is neither slowdown nor crash when I test your first code with the latest version of PB with threadsafe enabled.
If I compile exactly this code, start the server, start the client, while running I try to scroll through the output of the server, it slows down rapidly and then crashes with invalid memory access. Compiled using threadsafety.

Hi Foz,
Try dealing with the data in threads after you have received the data - it makes a lot more sense that way.
I need to be responsible while receiving data. As you can see I need to wait for the final "X" to be sure that everything is received (my data is only hexadecimal, so the X is good for this job). If one client needs a lot of time for sending the whole data, all other users need to wait for this. If I use ReceiveNetworkData() in the thread, everything keeps on going smooth. How would you solve this problem?

Kukulkan

Posted: Wed Nov 21, 2007 6:18 pm
by Foz
I think you are doing a disservice to the speed at which PureBasic can receive things - I have done a threaded send test, and at 1.5kb each, and PB can handle without any slow down 40 concurrent requests.

Note, that is about 10 more requests than .net processing similar size requests in a threaded environment (my current work place is running this - the same server app, running side by side on two ports, can deal with 30 concurrent requests each - or about 100-200 normal users before breaking)

What you need to handle is the *sending* of data. If the server buffer is full, then not all of your send data will be sent - if anything at all - which is where the client side checks and timeouts need to be monitored.

*** edit: I am working on such a method at the moment, but it'll be another day or two before I'm happy with it. I'm on my 3rd rewrite as it is :?

Posted: Wed Nov 21, 2007 6:19 pm
by Trond
Kukulkan wrote:Hi Trond,
There is neither slowdown nor crash when I test your first code with the latest version of PB with threadsafe enabled.
If I compile exactly this code, start the server, start the client, while running I try to scroll through the output of the server, it slows down rapidly and then crashes with invalid memory access. Compiled using threadsafety.
But you said you used PB 4.02...

Posted: Wed Nov 21, 2007 6:50 pm
by rsts
I'm using the latest PB and it will crash on my Vista machine.

Not every time, but rather consistently.

cheers

Posted: Fri Nov 23, 2007 8:33 am
by Kukulkan
Hi,

I finally made it getting stable. There is surely a bug in thread-save ReceiveNetworkData() function. Now I handle connections and threads seperately. The following code works much more stable. Maybe someone can use it:

Code: Select all

; Network-Test Server

Structure strConnection
  ClientID.l
  ReceivedContent.s
  Status.l            ; 0=free  1=active 2=full received 3=working
EndStructure

Global lngMaxConnections.l
Global lngMaxThreads.l
Global lngTheadCount.l
Global Dim Connection.strConnection(200)

lngMaxConnections.l = 200     ; set maximum number of handled connections
lngMaxThreads.l = 10          ; set maximum number of used threads
lngTheadCount.l = 0            ; init

; ##############################################
; THREAD ROUTINE
; ##############################################

Procedure ManageConnection(ListID.l)
  ; threaded management. The parameter is the used ID inside the connection-array
  Protected tStart.l, Received.s
  Shared lngMutex.l
  ;
  Received.s = Connection(ListID)\ReceivedContent
  ; 
  ; do some work for 30ms
  tStart.l = ElapsedMilliseconds() + 30
  While ElapsedMilliseconds() < tStart.l: Wend

  ; output the received data for test purposes
  LockMutex(lngMutex.l)
  PrintN("working connection " + Str(ListID.l))
  
  ; free array entry
  Connection(ListID)\ReceivedContent = ""
  Connection(ListID.l)\ClientID = 0
  Connection(ListID.l)\Status = 0 ; set ready
  
  UnlockMutex(lngMutex.l)
  
  ; remove one thread
  lngTheadCount.l = lngTheadCount.l - 1
EndProcedure

; ##############################################
; MAIN ROUTINE
; ##############################################

InitNetwork()

lngMutex.l = CreateMutex() ; create a mutex to sync output between threads
lngTheadCount.l = 0

OpenConsole()

ServerConnectionID.l = CreateNetworkServer(1, 999, #PB_Network_TCP)

PrintN("Server started listening on port 999. Press ESC to stop.")
PrintN(" ")

While Quit.b = #False
  
  ; press ESC to close
  key_pressed.s = Inkey()
  If Left(key_pressed.s, 1) = Chr(27)
    Erg.l = MessageRequester("Shutdown Server?","Do you really want to shutdown the server?",#PB_MessageRequester_YesNo + #MB_SYSTEMMODAL)
    If Erg.l = #PB_MessageRequester_Yes
      Quit.b = #True
    EndIf
  EndIf
  
  ; check network data
  SEvent.l = NetworkServerEvent()
  If SEvent.l <> 0: CID.l = EventClient(): EndIf
  
  Select SEvent.l
  
    Case 1 ; new Client
      ; try to find a free connection
      NewID.l = -1
      For x.l = 0 To lngMaxConnections.l
        If Connection(x.l)\Status = 0: NewID.l = x.l: Break: EndIf
      Next ; x
      
      If NewID.l > -1
        ; reserve this connection
        Connection(x.l)\Status = 1 ; set active
        Connection(x.l)\ClientID = CID.l
        Connection(x.l)\ReceivedContent = ""
        Locked.b = #False
      Else
        ; locked!
        If Locked.b = #False
          PrintN("TOO MANY CONNECTIONS TO HANDLE! NEED MORE CPU-POWER!")
          Locked.b = #True
        EndIf
      EndIf
      
    Case 2 ; data arrival
    
      ; try to find a free connection
      NewID.l = -1
      For x.l = 0 To lngMaxConnections.l
        If Connection(x.l)\ClientID = CID.l: NewID.l = x.l: Break: EndIf
      Next ; x
      
      If NewID.l > -1
        ; receive data until it ends with "X"
        *NetworkBuffer.l = AllocateMemory(8192)  ; 8 KB buffer for server
        length.l = ReceiveNetworkData(CID.l, *NetworkBuffer.l, 8192)
        If length.l > 0
          Connection(NewID.l)\ReceivedContent = Connection(NewID.l)\ReceivedContent + PeekS(*NetworkBuffer.l, length.l)
        EndIf
        FreeMemory(*NetworkBuffer.l)
        If Right(Connection(NewID.l)\ReceivedContent, 1) = "X"
          ; end-char received. Change status to 2
          Connection(NewID.l)\Status = 2
        EndIf
      Else
        ; connection not found!
        *NetworkBuffer.l = AllocateMemory(8192)  ; 8 KB buffer for server
        length.l = ReceiveNetworkData(CID.l, *NetworkBuffer.l, 8192)
        PrintN("BAD: dump unassigned data :-(  too many connections. I need more CPU to handle!")
        FreeMemory(*NetworkBuffer.l)
      EndIf
      
    Case 4 ; client disconnected

      CloseNetworkConnection(CID.l)

  EndSelect
  
  ; activate threads for received data
  For x.l = lngMaxConnections.l To 0 Step -1
    If Connection(x.l)\Status = 2 And lngTheadCount.l < lngMaxThreads.l
      Connection(x.l)\Status = 3 ; set working
      lngTheadCount.l = lngTheadCount.l + 1
      LockMutex(lngMutex.l)
      PrintN("start connection " + Str(x.l) + " as thread " + Str(lngTheadCount.l))
      UnlockMutex(lngMutex.l)
      CreateThread(@ManageConnection(), x.l)
    EndIf
  Next
  
  Delay(1) ; avoid 100% CPU
Wend

; cleanup
FreeMutex(lngMutex.l) ; free mutex
CloseNetworkServer(1)
CloseConsole()
End
The loop accepts every connection until lngMaxConnections.l is reached. It only starts threads to do the work, if lngMaxThreads.l is not reached. ReceiveNetworkData() is used in the main thread. If used in threads it will fail.

I miss a function to refuse new connections if I handle too many. Or better, there should be the possibility to wait until the other connections are managed. Anyone a good idea? What happens if I don't call NetworkServerEvent() while the maximum number of lngMaxConnections.l is reached? Is PB accepting new connections only while calling NetworkServerEvent()? But in this case I'm not able to receive data of existing connections, too...

Kukulkan