MQTT Framework

Share your advanced PureBasic knowledge/code with the community.
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Sorry for the late reply, and thanks for the retain addition in 'MQTT_Client.pbi'.
I don't have any other points at the moment for future updates.
Maybe one publish code sample in Client.pbi may help someone else,
or other application examples, I don't know.

I will create an application COM port <-> MQTT to transmit measured values coming in (from COM), but also to send configuration data to the device on COM.
Let's see if I notice any problems later when implementing it.
Regards.
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Now after some time I have probably found a small bug. (?)
Using two (or more) MQTT-Explorer-0.4.0-beta1.exe - instances running and connected via 127.0.0.1 to your broker.
( MQTT Client ID's: e,g, "mqtt-explorer-1" / "mqtt-explorer-2" )
If I do a ' publish topic' from either client without content the broker crahed immediately. (Marked below)

Code: Select all

[09:28:52] Waiting for executable to start...
[09:28:52] Executable type: Windows - x64  (64bit, Unicode, Thread)
[09:28:52] Executable started.
[10:03:48] [ERROR] MQTT_Broker.pbi (Line: 848)
[10:03:48] [ERROR] The specified length is 0.


					Case #PUBLISH
						If \PayLoad\BufferLengh > 0
							i = \PayLoad\BufferLengh
						EndIf
						Length = StringByteLength(\TopicName, #PB_UTF8) + i + 2
						If \QoS > 0
							Length + 2
						EndIf
						*Buff = AllocateMemory(64 + Length)
						If *Buff
							*Buff\PacketType         = \Type << 4
							*Buff\PacketType         = *Buff\PacketType | (\QoS << 1)
							*Buff\PacketType         = *Buff\PacketType | (\DUP << 3)
							*Buff\PacketType         = *Buff\PacketType | \Retain
							CurrPos                  = _SetStreamLength(*Buff, Length)
							Length                   = StringByteLength(\TopicName, #PB_UTF8)
							*Buff\bytes[CurrPos + 1] = Length & $FF
							*Buff\bytes[CurrPos]     = (Length & $FF00) >> 8
							CurrPos + 2
							PokeS(*Buff + OffsetOf(HEADER\bytes) + CurrPos, \TopicName, -1, #PB_UTF8)
							CurrPos + StringByteLength(\TopicName, #PB_UTF8)
							If \Qos > 0
								*Buff\bytes[CurrPos + 1] = \PacketIdentifier & $FF
								*Buff\bytes[CurrPos]     = (\PacketIdentifier & $FF00) >> 8
								CurrPos + 2
							EndIf
							CompilerIf #USE_BASE64_PAYLOAD
								Base64Decoder(\PayLoad\PayLoadBase64, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)		; <<<<<< Crashed here !
							CompilerElse
								CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
							CompilerEndIf
							CurrPos + i
							LengthIs     = SendNetworkData(SERVER\Sessions()\ClientID, *Buff, CurrPos + 1)
							LengthShould = CurrPos + 1
							; 							FreeMemory(*Buff)
						EndIf
					EndSelect

The same happens if i do a delete Topic out of MQTT-Explorer after "Yes" in the following window:
---------------------------------------------------------------------------------------
Confirm delete
Do you want to clear Home/WKZ/1/RT and 1 child topic ?
This function will send an empty payload (QoS 0 retain) to this an every subtopic,
clearing retained topics in the process. Only use this function if you know what you
are doing.
[ YES ] [ NO ]
--------------------------------------------------------------------------------------

If I disconnect one of the two clients before publishing, or if there were never multiple clients: No problem.

Sure I can fiddle myself for a workaround for this, but I think it would be better improved by the author himself. :)
Regards.
User avatar
HeX0R
Addict
Addict
Posts: 1187
Joined: Mon Sep 20, 2004 7:12 am
Location: Hell

Re: MQTT Framework

Post by HeX0R »

An empty payload... nice, somehow I didn't had that on my radar.
I'm too lazy to try to recreate it (I have no use for the broker these days), but I think it would be enough to add an If/EndIf here:

Code: Select all

If i > 0
	CompilerIf #USE_BASE64_PAYLOAD
		Base64Decoder(\PayLoad\PayLoadBase64, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
	CompilerElse
		CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
	CompilerEndIf
EndIf
Can you confirm?
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Yes, the error no longer occurs that way. (I was afraid to disturb something else in the broker doing it that simple way.)
My short solution works also, CopyMemory() has no problem with 0 length

Code: Select all

CompilerIf #USE_BASE64_PAYLOAD
  If (i) : Base64Decoder(\PayLoad\PayLoadBase64, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i) : EndIf
CompilerElse
   CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i) 
CompilerEndIf
User avatar
HeX0R
Addict
Addict
Posts: 1187
Joined: Mon Sep 20, 2004 7:12 am
Location: Hell

Re: MQTT Framework

Post by HeX0R »

Sometimes (not often), the simple solutions rule the world :mrgreen:
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Very nice... the link in the first post now leads to the updated broker source V1.12.
But apart from the code addition you suggested above, the following line (Just a few lines above) was also changed:

Code: Select all

Case #PUBLISH
	  If \PayLoad\BufferLengh >= 0			; <== In V1.11 it was only > 0
	    i = \PayLoad\BufferLengh
	  EndIf
If I assume the smallest length is 0 (and not -1 or something) ... Doesn't this make the whole if-query unnecessary ?
User avatar
HeX0R
Addict
Addict
Posts: 1187
Joined: Mon Sep 20, 2004 7:12 am
Location: Hell

Re: MQTT Framework

Post by HeX0R »

It's just to avoid unforseen problems, you never know where a bug is hiding, and what if BufferLength reaches _SendCommand(), for whatever reason (maybe some exploit I couldn't even think about), with a negative value?
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Ah, a good idea, I agree. :mrgreen:
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Hi again,
I created a simple client (based on your Client.pbi) for logging/testing. Everything is fine, but if I start the client before a broker can be reached,
there is only a log message ("Can't connect to Broker!") and never another connection attempt.
The same if broker is offline for a short time (->"Somehow we have been disconnected!")

What is the best way to check whether the broker connection was (and is) successful?
The return values from InitClient(() and StartClient() didn't get me anywhere.

For example my Eventloop:

Code: Select all

		ClientID = MQTT_CLIENT::InitClient(@ClientConf, #False)   ; #True here if the next line should be omitted
		MQTT_CLIENT::StartClient(ClientID, 1)     ; if no Broker reachable here comes the log "09:56:30 Can't connect to Broker! 
		If ClientID
		  Repeat	; Always running here, connected or not with broker !
		    Event = WaitWindowEvent()
		    Select Event
	      Case #PB_Event_Gadget
	        Select EventGadget()
	          Case #Button_ClearLog : ClearGadgetItems(#Editor) : LogIT("Log cleared")
	        EndSelect
	      EndSelect  
	      Until Event = #PB_Event_CloseWindow
	     MQTT_CLIENT::DeInitClient(ClientID)
User avatar
HeX0R
Addict
Addict
Posts: 1187
Joined: Mon Sep 20, 2004 7:12 am
Location: Hell

Re: MQTT Framework

Post by HeX0R »

This is a message-driven framework, all magic happens in your EventIncoming Procedure.

e.g. (taken from my own Client):

Code: Select all

Select *Values\Type
	;[...]
	Case MQTT_Common::#MQTTEvent_Error
		If Error = MQTT_Common::#Error_BeingDisconnected And GetGadgetState(#check_auto_reconnect)
			ReSendTime = Val(GetGadgetText(#string_reconnect_wait))
			If ReSendTime = -1
				;auto calculate
				;for error "BeingDisconnected", the \Reserved[0] will contain the ms since we last had contact to the broker
				ReSendTime = Val(GetGadgetText(#string_keepAlive)) - (LastActivity / 1000)
				If ReSendTime <= 0
					ReSendTime = Val(GetGadgetText(#string_keepAlive))
				EndIf
			EndIf
			LogIt("auto reconnecting in " + GetTimeFromSecond(ReSendTime))
			SetGadgetAttribute(#button_go, #PB_Button_Image, ImageID(#Image_Connect))
			DisableGadget(#button_go, 0)
			AddWindowTimer(DialogWindow(#WinMain), #Timer_Reconnect, 1000 * ReSendTime)
That will not work of course, but it should show the way to go.

And when the timer fires, you connect (again)

Start Client only starts the Clients Thread btw. of course it can't really tell you if it worked or not.
Just make sure to handle the error messages in your Incoming procedure correctly and react accordingly.
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

That was the point I missed: Add the necessary things in MQTT_EventIncoming() / Case MQTT_Common::#MQTTEvent_Error
I'm sure I'll get it done now, but not until the weekend. thanx for your support :wink:
User avatar
HeX0R
Addict
Addict
Posts: 1187
Joined: Mon Sep 20, 2004 7:12 am
Location: Hell

Re: MQTT Framework

Post by HeX0R »

I've added some comments to the #Error constants (in MQTT_Common.pbi) to clarify their meaning a little better.
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Here my simple solution to avoid having to restart the client when common problems arise.
That's enough for me, because the client side is only for testing purposes in the moment.
I've picked out a few errors that I think repeating connection attempts making sense and implemented it like this:

Code: Select all

	Procedure MQTT_EventIncoming()
	 . . . . .
	Case MQTT_Common::#MQTTEvent_Error	
	  LogIT(ErrorText)
	  UseModule MQTT_Common
	  If ((error=#Error_CantConnect) Or (error=#Error_BeingDisconnected) Or (error=#Error_BadUsernameOrPassword) Or
	     (error=#Error_WrongAnswerReceived) Or (error=#Error_TimedOut) Or (error=#Error_UnsupportedIdentifier))
	     AddWindowTimer(#Window, #Timer_Reconnect, 5000)    ; If these errors occur, we would like to retry MQTT_CLIENT::StartClient(ClientID, 1) in 5sec.
      EndIf
	  UnuseModule MQTT_Common
	 . . . . .

;Part of Main-Eventloop
	 . . . . .
	    Case #PB_Event_Timer  
	      If (EventTimer() = #Timer_Reconnect)
	        LogIT("Trying to (re-)connect broker ...") : RemoveWindowTimer(#Window, #Timer_Reconnect)
	        MQTT_CLIENT::StartClient(ClientID, 1)
	      EndIf
	 . . . . . .
BTW: In MQTT_Broker.pbi and MQTT_Client.pbi you are using 'Case #PB_NetworkEvent_None Delay(20)'
Is there a deeper reason for this? I shortened this in broker to 5ms without noticing any drawbacks (using localhost),
but if lots of (small) MQTT payloads occur they are simply processed more quickly.
Greets.
User avatar
HeX0R
Addict
Addict
Posts: 1187
Joined: Mon Sep 20, 2004 7:12 am
Location: Hell

Re: MQTT Framework

Post by HeX0R »

I don't think you can see any noticable difference, if really many clients are sending their payloads (at once), this delay will never be reached (o.k., not true, it will be reached once at the beginning).
It's only used, when the server idles.
And if the first packet arrives with a delay of a maximum of 20ms... who cares?
This is MQTT, here it is most important that packets arrive, and not to get them as quick as possible.
But sure, you can decrease it, depending on the usage it would make even sense.

btw.
When you receive #Error_BadUsernameOrPassword Or #Error_UnsupportedIdentifier it doesn't help to simply retry it, because in that case there is something wrong with your access details.
User avatar
TheCube
User
User
Posts: 17
Joined: Sat Jul 31, 2021 2:00 pm

Re: MQTT Framework

Post by TheCube »

Hi again,
I don't get tired of playing around with your work :D
btw. When you receive #Error_BadUsernameOrPassword Or #Error_UnsupportedIdentifier it doesn't help to simply retry it ...
Absolutely, this was only for testing. I have started one or many of your clients in background and change broker-parameters (like Username) .

Now back to Broker.pbi (Only changed Delay(20) to Delay(4) when the server idles)
I've stressed the broker with MQTT_BROKER::_PublishViaServer() : All 10ms I publish a /counter (4Bytes) and some /data (around 30Bytes).
Certainly not a normal use case, but simpler as dozens of connected clients that publish something that must be handled.
On my Ryzen 8-Core that works upto two via localhost connected clients without any noticeable delay, If I connected more clients, publish e.g faster (e.g. all 7ms) or let the Ryzen do some other heavy Tasks, the "Broker-Buffer" fills up.
So when I stop _PublishViaServer() the subscribed clients still receive the buffered publishes for maybe minutes.
Feedback: No broker publish get lost, no broker-hick'ups, even with thousands of buffered publishes. 8)
Which "future updates" do you expect?
Can we find out the broker-bufferstate out of the main program, or does the MQTT_Broker.pbi need to be expanded before?
I've already experimented with an added public procedure for that, but I don't find the correct entry point.

Code: Select all

	Procedure GetPendingPackets()
	  ProcedureReturn ListSize(SERVER\Sessions()\Packets())   ; This don't work .....
	EndProcedure	
It would be nice to know whether the broker doing his tasks easily at the moment or is currently overloaded and must buffer.
Post Reply