MQTT Framework
Re: MQTT Framework
Sorry for the late reply, and thanks for the retain addition in 'MQTT_Client.pbi'.
I don't have any other points at the moment for future updates.
Maybe one publish code sample in Client.pbi may help someone else,
or other application examples, I don't know.
I will create an application COM port <-> MQTT to transmit measured values coming in (from COM), but also to send configuration data to the device on COM.
Let's see if I notice any problems later when implementing it.
Regards.
I don't have any other points at the moment for future updates.
Maybe one publish code sample in Client.pbi may help someone else,
or other application examples, I don't know.
I will create an application COM port <-> MQTT to transmit measured values coming in (from COM), but also to send configuration data to the device on COM.
Let's see if I notice any problems later when implementing it.
Regards.
Re: MQTT Framework
Now after some time I have probably found a small bug. (?)
Using two (or more) MQTT-Explorer-0.4.0-beta1.exe - instances running and connected via 127.0.0.1 to your broker.
( MQTT Client ID's: e,g, "mqtt-explorer-1" / "mqtt-explorer-2" )
If I do a ' publish topic' from either client without content the broker crahed immediately. (Marked below)
The same happens if i do a delete Topic out of MQTT-Explorer after "Yes" in the following window:
If I disconnect one of the two clients before publishing, or if there were never multiple clients: No problem.
Sure I can fiddle myself for a workaround for this, but I think it would be better improved by the author himself.
Regards.
Using two (or more) MQTT-Explorer-0.4.0-beta1.exe - instances running and connected via 127.0.0.1 to your broker.
( MQTT Client ID's: e,g, "mqtt-explorer-1" / "mqtt-explorer-2" )
If I do a ' publish topic' from either client without content the broker crahed immediately. (Marked below)
Code: Select all
[09:28:52] Waiting for executable to start...
[09:28:52] Executable type: Windows - x64 (64bit, Unicode, Thread)
[09:28:52] Executable started.
[10:03:48] [ERROR] MQTT_Broker.pbi (Line: 848)
[10:03:48] [ERROR] The specified length is 0.
Case #PUBLISH
If \PayLoad\BufferLengh > 0
i = \PayLoad\BufferLengh
EndIf
Length = StringByteLength(\TopicName, #PB_UTF8) + i + 2
If \QoS > 0
Length + 2
EndIf
*Buff = AllocateMemory(64 + Length)
If *Buff
*Buff\PacketType = \Type << 4
*Buff\PacketType = *Buff\PacketType | (\QoS << 1)
*Buff\PacketType = *Buff\PacketType | (\DUP << 3)
*Buff\PacketType = *Buff\PacketType | \Retain
CurrPos = _SetStreamLength(*Buff, Length)
Length = StringByteLength(\TopicName, #PB_UTF8)
*Buff\bytes[CurrPos + 1] = Length & $FF
*Buff\bytes[CurrPos] = (Length & $FF00) >> 8
CurrPos + 2
PokeS(*Buff + OffsetOf(HEADER\bytes) + CurrPos, \TopicName, -1, #PB_UTF8)
CurrPos + StringByteLength(\TopicName, #PB_UTF8)
If \Qos > 0
*Buff\bytes[CurrPos + 1] = \PacketIdentifier & $FF
*Buff\bytes[CurrPos] = (\PacketIdentifier & $FF00) >> 8
CurrPos + 2
EndIf
CompilerIf #USE_BASE64_PAYLOAD
Base64Decoder(\PayLoad\PayLoadBase64, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i) ; <<<<<< Crashed here !
CompilerElse
CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
CompilerEndIf
CurrPos + i
LengthIs = SendNetworkData(SERVER\Sessions()\ClientID, *Buff, CurrPos + 1)
LengthShould = CurrPos + 1
; FreeMemory(*Buff)
EndIf
EndSelect
---------------------------------------------------------------------------------------
Confirm delete
Do you want to clear Home/WKZ/1/RT and 1 child topic ?
This function will send an empty payload (QoS 0 retain) to this an every subtopic,
clearing retained topics in the process. Only use this function if you know what you
are doing.
[ YES ] [ NO ]
--------------------------------------------------------------------------------------
If I disconnect one of the two clients before publishing, or if there were never multiple clients: No problem.
Sure I can fiddle myself for a workaround for this, but I think it would be better improved by the author himself.

Regards.
Re: MQTT Framework
An empty payload... nice, somehow I didn't had that on my radar.
I'm too lazy to try to recreate it (I have no use for the broker these days), but I think it would be enough to add an If/EndIf here:
Can you confirm?
I'm too lazy to try to recreate it (I have no use for the broker these days), but I think it would be enough to add an If/EndIf here:
Code: Select all
If i > 0
CompilerIf #USE_BASE64_PAYLOAD
Base64Decoder(\PayLoad\PayLoadBase64, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
CompilerElse
CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
CompilerEndIf
EndIf
{Home}.:|:.{Dialog Design0R}.:|:.{Codes}.:|:.{History Viewer Online}.:|:.{Send a Beer}
Re: MQTT Framework
Yes, the error no longer occurs that way. (I was afraid to disturb something else in the broker doing it that simple way.)
My short solution works also, CopyMemory() has no problem with 0 length
My short solution works also, CopyMemory() has no problem with 0 length
Code: Select all
CompilerIf #USE_BASE64_PAYLOAD
If (i) : Base64Decoder(\PayLoad\PayLoadBase64, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i) : EndIf
CompilerElse
CopyMemory(\PayLoad\Buffer, *Buff + OffsetOf(HEADER\bytes) + CurrPos, i)
CompilerEndIf
Re: MQTT Framework
Sometimes (not often), the simple solutions rule the world 

{Home}.:|:.{Dialog Design0R}.:|:.{Codes}.:|:.{History Viewer Online}.:|:.{Send a Beer}
Re: MQTT Framework
Very nice... the link in the first post now leads to the updated broker source V1.12.
But apart from the code addition you suggested above, the following line (Just a few lines above) was also changed:
If I assume the smallest length is 0 (and not -1 or something) ... Doesn't this make the whole if-query unnecessary ?
But apart from the code addition you suggested above, the following line (Just a few lines above) was also changed:
Code: Select all
Case #PUBLISH
If \PayLoad\BufferLengh >= 0 ; <== In V1.11 it was only > 0
i = \PayLoad\BufferLengh
EndIf
Re: MQTT Framework
It's just to avoid unforseen problems, you never know where a bug is hiding, and what if BufferLength reaches _SendCommand(), for whatever reason (maybe some exploit I couldn't even think about), with a negative value?
{Home}.:|:.{Dialog Design0R}.:|:.{Codes}.:|:.{History Viewer Online}.:|:.{Send a Beer}
Re: MQTT Framework
Ah, a good idea, I agree. 

Re: MQTT Framework
Hi again,
I created a simple client (based on your Client.pbi) for logging/testing. Everything is fine, but if I start the client before a broker can be reached,
there is only a log message ("Can't connect to Broker!") and never another connection attempt.
The same if broker is offline for a short time (->"Somehow we have been disconnected!")
What is the best way to check whether the broker connection was (and is) successful?
The return values from InitClient(() and StartClient() didn't get me anywhere.
For example my Eventloop:
I created a simple client (based on your Client.pbi) for logging/testing. Everything is fine, but if I start the client before a broker can be reached,
there is only a log message ("Can't connect to Broker!") and never another connection attempt.
The same if broker is offline for a short time (->"Somehow we have been disconnected!")
What is the best way to check whether the broker connection was (and is) successful?
The return values from InitClient(() and StartClient() didn't get me anywhere.
For example my Eventloop:
Code: Select all
ClientID = MQTT_CLIENT::InitClient(@ClientConf, #False) ; #True here if the next line should be omitted
MQTT_CLIENT::StartClient(ClientID, 1) ; if no Broker reachable here comes the log "09:56:30 Can't connect to Broker!
If ClientID
Repeat ; Always running here, connected or not with broker !
Event = WaitWindowEvent()
Select Event
Case #PB_Event_Gadget
Select EventGadget()
Case #Button_ClearLog : ClearGadgetItems(#Editor) : LogIT("Log cleared")
EndSelect
EndSelect
Until Event = #PB_Event_CloseWindow
MQTT_CLIENT::DeInitClient(ClientID)
Re: MQTT Framework
This is a message-driven framework, all magic happens in your EventIncoming Procedure.
e.g. (taken from my own Client):
That will not work of course, but it should show the way to go.
And when the timer fires, you connect (again)
Start Client only starts the Clients Thread btw. of course it can't really tell you if it worked or not.
Just make sure to handle the error messages in your Incoming procedure correctly and react accordingly.
e.g. (taken from my own Client):
Code: Select all
Select *Values\Type
;[...]
Case MQTT_Common::#MQTTEvent_Error
If Error = MQTT_Common::#Error_BeingDisconnected And GetGadgetState(#check_auto_reconnect)
ReSendTime = Val(GetGadgetText(#string_reconnect_wait))
If ReSendTime = -1
;auto calculate
;for error "BeingDisconnected", the \Reserved[0] will contain the ms since we last had contact to the broker
ReSendTime = Val(GetGadgetText(#string_keepAlive)) - (LastActivity / 1000)
If ReSendTime <= 0
ReSendTime = Val(GetGadgetText(#string_keepAlive))
EndIf
EndIf
LogIt("auto reconnecting in " + GetTimeFromSecond(ReSendTime))
SetGadgetAttribute(#button_go, #PB_Button_Image, ImageID(#Image_Connect))
DisableGadget(#button_go, 0)
AddWindowTimer(DialogWindow(#WinMain), #Timer_Reconnect, 1000 * ReSendTime)
And when the timer fires, you connect (again)
Start Client only starts the Clients Thread btw. of course it can't really tell you if it worked or not.
Just make sure to handle the error messages in your Incoming procedure correctly and react accordingly.
{Home}.:|:.{Dialog Design0R}.:|:.{Codes}.:|:.{History Viewer Online}.:|:.{Send a Beer}
Re: MQTT Framework
That was the point I missed: Add the necessary things in MQTT_EventIncoming() / Case MQTT_Common::#MQTTEvent_Error
I'm sure I'll get it done now, but not until the weekend. thanx for your support
I'm sure I'll get it done now, but not until the weekend. thanx for your support

Re: MQTT Framework
I've added some comments to the #Error constants (in MQTT_Common.pbi) to clarify their meaning a little better.
{Home}.:|:.{Dialog Design0R}.:|:.{Codes}.:|:.{History Viewer Online}.:|:.{Send a Beer}
Re: MQTT Framework
Here my simple solution to avoid having to restart the client when common problems arise.
That's enough for me, because the client side is only for testing purposes in the moment.
I've picked out a few errors that I think repeating connection attempts making sense and implemented it like this:
BTW: In MQTT_Broker.pbi and MQTT_Client.pbi you are using 'Case #PB_NetworkEvent_None Delay(20)'
Is there a deeper reason for this? I shortened this in broker to 5ms without noticing any drawbacks (using localhost),
but if lots of (small) MQTT payloads occur they are simply processed more quickly.
Greets.
That's enough for me, because the client side is only for testing purposes in the moment.
I've picked out a few errors that I think repeating connection attempts making sense and implemented it like this:
Code: Select all
Procedure MQTT_EventIncoming()
. . . . .
Case MQTT_Common::#MQTTEvent_Error
LogIT(ErrorText)
UseModule MQTT_Common
If ((error=#Error_CantConnect) Or (error=#Error_BeingDisconnected) Or (error=#Error_BadUsernameOrPassword) Or
(error=#Error_WrongAnswerReceived) Or (error=#Error_TimedOut) Or (error=#Error_UnsupportedIdentifier))
AddWindowTimer(#Window, #Timer_Reconnect, 5000) ; If these errors occur, we would like to retry MQTT_CLIENT::StartClient(ClientID, 1) in 5sec.
EndIf
UnuseModule MQTT_Common
. . . . .
;Part of Main-Eventloop
. . . . .
Case #PB_Event_Timer
If (EventTimer() = #Timer_Reconnect)
LogIT("Trying to (re-)connect broker ...") : RemoveWindowTimer(#Window, #Timer_Reconnect)
MQTT_CLIENT::StartClient(ClientID, 1)
EndIf
. . . . . .
Is there a deeper reason for this? I shortened this in broker to 5ms without noticing any drawbacks (using localhost),
but if lots of (small) MQTT payloads occur they are simply processed more quickly.
Greets.
Re: MQTT Framework
I don't think you can see any noticable difference, if really many clients are sending their payloads (at once), this delay will never be reached (o.k., not true, it will be reached once at the beginning).
It's only used, when the server idles.
And if the first packet arrives with a delay of a maximum of 20ms... who cares?
This is MQTT, here it is most important that packets arrive, and not to get them as quick as possible.
But sure, you can decrease it, depending on the usage it would make even sense.
btw.
When you receive #Error_BadUsernameOrPassword Or #Error_UnsupportedIdentifier it doesn't help to simply retry it, because in that case there is something wrong with your access details.
It's only used, when the server idles.
And if the first packet arrives with a delay of a maximum of 20ms... who cares?
This is MQTT, here it is most important that packets arrive, and not to get them as quick as possible.
But sure, you can decrease it, depending on the usage it would make even sense.
btw.
When you receive #Error_BadUsernameOrPassword Or #Error_UnsupportedIdentifier it doesn't help to simply retry it, because in that case there is something wrong with your access details.
{Home}.:|:.{Dialog Design0R}.:|:.{Codes}.:|:.{History Viewer Online}.:|:.{Send a Beer}
Re: MQTT Framework
Hi again,
I don't get tired of playing around with your work
Now back to Broker.pbi (Only changed Delay(20) to Delay(4) when the server idles)
I've stressed the broker with MQTT_BROKER::_PublishViaServer() : All 10ms I publish a /counter (4Bytes) and some /data (around 30Bytes).
Certainly not a normal use case, but simpler as dozens of connected clients that publish something that must be handled.
On my Ryzen 8-Core that works upto two via localhost connected clients without any noticeable delay, If I connected more clients, publish e.g faster (e.g. all 7ms) or let the Ryzen do some other heavy Tasks, the "Broker-Buffer" fills up.
So when I stop _PublishViaServer() the subscribed clients still receive the buffered publishes for maybe minutes.
Feedback: No broker publish get lost, no broker-hick'ups, even with thousands of buffered publishes.
I've already experimented with an added public procedure for that, but I don't find the correct entry point. It would be nice to know whether the broker doing his tasks easily at the moment or is currently overloaded and must buffer.
I don't get tired of playing around with your work

Absolutely, this was only for testing. I have started one or many of your clients in background and change broker-parameters (like Username) .btw. When you receive #Error_BadUsernameOrPassword Or #Error_UnsupportedIdentifier it doesn't help to simply retry it ...
Now back to Broker.pbi (Only changed Delay(20) to Delay(4) when the server idles)
I've stressed the broker with MQTT_BROKER::_PublishViaServer() : All 10ms I publish a /counter (4Bytes) and some /data (around 30Bytes).
Certainly not a normal use case, but simpler as dozens of connected clients that publish something that must be handled.
On my Ryzen 8-Core that works upto two via localhost connected clients without any noticeable delay, If I connected more clients, publish e.g faster (e.g. all 7ms) or let the Ryzen do some other heavy Tasks, the "Broker-Buffer" fills up.
So when I stop _PublishViaServer() the subscribed clients still receive the buffered publishes for maybe minutes.
Feedback: No broker publish get lost, no broker-hick'ups, even with thousands of buffered publishes.

Can we find out the broker-bufferstate out of the main program, or does the MQTT_Broker.pbi need to be expanded before?Which "future updates" do you expect?
I've already experimented with an added public procedure for that, but I don't find the correct entry point.
Code: Select all
Procedure GetPendingPackets()
ProcedureReturn ListSize(SERVER\Sessions()\Packets()) ; This don't work .....
EndProcedure