IF anyone has in interest in lightweight messaging, check out ZeroMQ
http://www.coastrd.com/zeromq-messaging
ZeroMQ - Lightweight messaging
-
- User
- Posts: 43
- Joined: Tue Jul 10, 2007 8:09 pm
ZeroMQ - Lightweight messaging
Last edited by Mike Trader on Wed Jul 14, 2010 2:09 am, edited 1 time in total.
Re: ZeroMQ - Lightweight messaging
I used search and replace, so you will need to check it
Code: Select all
#ZMQ_HAUSNUMERO = 1000 ; 156384712 ; avoid collision With other errno ranges
#ENOTSUP = #ZMQ_HAUSNUMERO + 1 ; POSIX errors
#EPROTONOSUPPORT = #ZMQ_HAUSNUMERO + 2 ;
#ENOBUFS = #ZMQ_HAUSNUMERO + 3 ;
#ENETDOWN = #ZMQ_HAUSNUMERO + 4 ;
#EADDRINUSE = #ZMQ_HAUSNUMERO + 5 ;
#EADDRNOTAVAIL = #ZMQ_HAUSNUMERO + 6 ;
#ECONNREFUSED = #ZMQ_HAUSNUMERO + 7 ;
#EINPROGRESS = #ZMQ_HAUSNUMERO + 8 ;
#EMTHREAD = #ZMQ_HAUSNUMERO + 50 ; Native 0MQ error codes
#EFSM = #ZMQ_HAUSNUMERO + 51 ;
#ENOCOMPATPROTO = #ZMQ_HAUSNUMERO + 52 ;
#ZMQ_MAX_VSM_SIZE = 30 ; Max size of Very Small Message. VSMs are passed by value to reduce memory alloc/dealloc
#ZMQ_DELIMITER = 31 ; Message structure - may be stored in ;content; member of the
#ZMQ_VSM = 32 ; Message structure - message instead of regular pointer to the data
#ZMQ_MSG_TBC = 1 ; Message flag
#ZMQ_MSG_SHARED = 128 ; strictly speaking not a message flag
#ZMQ_POLL = 1 ; 0MQ infrastructure (a.k.a. context) initialisation & termination
#ZMQ_P2P = 0 ; 0MQ socket definition
#ZMQ_PUB = 1 ;
#ZMQ_SUB = 2 ;
#ZMQ_REQ = 3 ;
#ZMQ_REP = 4 ;
#ZMQ_XREQ = 5 ;
#ZMQ_XREP = 6 ;
#ZMQ_UPSTREAM = 7 ;
#ZMQ_DOWNSTREAM = 8 ;
#ZMQ_HWM = 1 ;
#ZMQ_LWM = 2 ;
#ZMQ_SWAP = 3 ;
#ZMQ_AFFINITY = 4 ;
#ZMQ_IDENTITY = 5 ;
#ZMQ_SUBSCRIBE = 6 ;
#ZMQ_UNSUBSCRIBE = 7 ;
#ZMQ_RATE = 8 ;
#ZMQ_RECOVERY_IVL = 9 ;
#ZMQ_MCAST_LOOP = 10 ;
#ZMQ_SNDBUF = 11 ;
#ZMQ_RCVBUF = 12 ;
#ZMQ_NOBLOCK = 1 ;
#ZMQ_NOFLUSH = 2 ;
#ZMQ_POLLIN = 1 ; I/O multiplexing
#ZMQ_POLLOUT = 2
#ZMQ_POLLERR = 4
Structure zmq_pollitem_t ; I/O multiplexing
pSocket.i ; void *socket;
fd.i ; int fd;
events.w ; short events;
revents.w ; short revents;
EndStructure
; Shared message buffer. Message data are either allocated in one
; continuous block along with this structure - thus avoiding one
; malloc/free pair or they are stored in used-supplied memory.
; In the latter case, ffn member stores pointer to the function to be
; used to deallocate the data. If the buffer is actually shared
; (there are at least 2 references to it) refcount member contains number of references.
Structure msg_content_t
pData.i ; void *DATA;
DataLen.i ; size_t SIZE;
pFree.i ; zmq_free_fn *ffn;
phInt.i ; void *hint;
refcnt.i; atomic_counter_t refcnt - Set counter value (not thread-safe).
EndStructure ; A class that represents an integer that can be incremented/decremented in atomic fashion.
; Note: ;content; is not a pointer to the raw data, it is pointer to msg_content_t structure
Structure zmq_msg_t ; Message structure
pContent.msg_content_t ; pointer to msg_content_t structure
flags.b ; unsigned char flags;
vsm_size.b ; unsigned char vsm_size
vsm_data.b{ZMQ_MAX_VSM_SIZE} ; unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]
EndStructure
Structure ZMQ_free_fn ;
pData.i ; void *DATA
hInt .i ; void *hint
EndStructure
;------------ Sockets
ImportC libzmq.dll
;ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags)
zmq_init.i(app_threads.i,io_threads.i,flags.i)
;ZMQ_EXPORT INT zmq_connect (void *s, const char *ADDR)
zmq_connect(pSoc.i,psAddr.i)
;ZMQ_EXPORT INT zmq_bind (void *s, const char *ADDR)
zmq_bind(pSoc.i,psAddr.i )
;ZMQ_EXPORT void *zmq_socket (void *context, INT structure)
zmq_socket (pContext.i,Socstructure ).i ; PTR
;ZMQ_EXPORT INT zmq_setsockopt (void *s, INT OPTION, const void *optval, size_t optvallen)
zmq_setsockopt (pSoc.i,Optn.i ,OptVal.i,OptValLen.i )
;ZMQ_EXPORT INT zmq_send (void *s, zmq_msg_t *msg, INT flags)
zmq_send (pSoc.i,*pMsg.ZMQ_msg_t,flags.i)
;ZMQ_EXPORT INT zmq_flush (void *s)
zmq_flush (pSoc.i )
;ZMQ_EXPORT INT zmq_recv (void *s, zmq_msg_t *msg, INT flags)
zmq_recv (pSoc.i,*pMsg.ZMQ_msg_t,flags.i)
;ZMQ_EXPORT INT zmq_poll ( ZMQ_pollitem_t *items, INT nitems, LONG TIMEOUT)
zmq_poll (pItems.i,nItems.i,Tymeout.i)
;ZMQ_EXPORT INT zmq_close (void *s)
zmq_close (pSoc.i)
;ZMQ_EXPORT int zmq_term (void *context)
zmq_term (pCtxt.i)
;------------ Messages
;ZMQ_EXPORT INT zmq_msg_init ( ZMQ_msg_t *msg )
zmq_msg_init (*pMsg.ZMQ_msg_t)
;ZMQ_EXPORT int zmq_msg_init_size ( ZMQ_msg_t *msg, size_t size)
zmq_msg_init_size (*pMsg.ZMQ_msg_t,tSize.i )
;ZMQ_EXPORT INT zmq_msg_init_data ( ZMQ_msg_t *msg, void *DATA, size_t SIZE, zmq_free_fn *ffn, void *hint)
zmq_msg_init_data (*pMsg.ZMQ_msg_t,pData.i,tSize.i,*ffn.i,hint.i)
;ZMQ_EXPORT int zmq_msg_move ( ZMQ_msg_t *dest, zmq_msg_t *src)
zmq_msg_move (*dest.ZMQ_msg_t,*src.ZMQ_msg_t)
;ZMQ_EXPORT int zmq_msg_copy ( ZMQ_msg_t *dest, zmq_msg_t *src)
zmq_msg_copy (*dest.ZMQ_msg_t,*src.ZMQ_msg_t)
;ZMQ_EXPORT void *zmq_msg_data ( ZMQ_msg_t *msg)
zmq_msg_data(*pMsg.ZMQ_msg_t)
;ZMQ_EXPORT size_t zmq_msg_size ( ZMQ_msg_t *msg)
zmq_msg_size (*pMsg.ZMQ_msg_t)
;ZMQ_EXPORT int zmq_msg_close ( ZMQ_msg_t *msg)
zmq_msg_close (*pMsg.ZMQ_msg_t)
;------------ Misc
;ZMQ_EXPORT void zmq_version (INT *major, INT *minor, INT *patch)
zmq_version (*major,*minor,*patch)
;ZMQ_EXPORT INT zmq_errno()
zmq_errno ()
; ZMQ_EXPORT const char *zmq_strerror (INT errnum)
zmq_strerror(errnum.i)
;ZMQ_EXPORT void *zmq_stopwatch_start ()
zmq_stopwatch_start() ; Returns the handle to the watch.
;ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_)
zmq_stopwatch_stop (pWatch.i ) ; Returns the number of microseconds elapsed since
;ZMQ_EXPORT void zmq_sleep (INT seconds_)
zmq_sleep (Secs ) ; Sleeps for specified number of seconds.
EndImport
-
- User
- Posts: 43
- Joined: Tue Jul 10, 2007 8:09 pm
Re: ZeroMQ - Lightweight messaging
Thank you that looks good. A simple client to test it below. (if someone could translate pls)
Server and dll downloaded at http://www.coastrd.com/download
http://www.coastrd.com/zeromq-messaging ... zmq-client
Server and dll downloaded at http://www.coastrd.com/download
http://www.coastrd.com/zeromq-messaging ... zmq-client
Code: Select all
FUNCTION SendMessageZmq( sConnection AS STRING ) AS LONG
LOCAL i, RetVal, MsgSz AS LONG
LOCAL pCtx, pSoc AS DWORD
LOCAL pData AS ASCIIZ PTR
LOCAL pMsgIn, pMsgOut AS zmq_msg_t
LOCAL sQuery, sResponse AS STRING
LOCAL pzErr AS ASCIIZ PTR
pCtx = zmq_init( 1, 1, 0 ) ' Initialise 0MQ context - app_threads, io_threads, flags
DO ' device to jump out and still close socket and release ZMQ
pSoc = zmq_socket( pCtx, %ZMQ_REQ ) ' ZMQ_REQ socket to send requests and receive replies
RetVal = zmq_connect( pSoc, STRPTR(sConnection) ) ' PRINT #hDbg, "sConnection="+sConnection
'- Send a message
sQuery = "SELECT * FROM mytable, END SERVER" '
PRINT #hDbg, "Query=" + sQuery
RetVal = zmq_msg_init_size( VARPTR(pMsgOut), LEN(sQuery) ) ' Allocate a message
CopyMem( zmq_msg_data(VARPTR(pMsgOut)), STRPTR(sQuery), LEN(sQuery) ) ' Copy message to ZMQ
RetVal = zmq_send( pSoc, VARPTR(pMsgOut), 0) ' Send Message
'- Recover response
RetVal = zmq_msg_init(VARPTR(pMsgIn)) ' Init must be called before zmq_recv
RetVal = zmq_recv( pSoc, VARPTR(pMsgIn), 0 ) ' Receive a message, blocks until one is available
MsgSz = zmq_msg_size(VARPTR(pMsgIn)) ' Check Message for content
pData = zmq_msg_data(VARPTR(pMsgIn)) ' PRINT #hDbg, "Response=" + @pData
sResponse = NUL$(MsgSz) ' Allocate memory
CopyMem( STRPTR(sResponse), pData, MsgSz ) ' Copy message to local string
RetVal = zmq_msg_close(VARPTR(pMsgIn))
PRINT #hDbg, "Response=" + sResponse ' @pData
MSGBOX "Response",64,sResponse
EXIT DO
LOOP
IF pSoc THEN RetVal = zmq_close(pSoc)
RetVal = zmq_term(pCtx)
FUNCTION = 0
END FUNCTION
- Rook Zimbabwe
- Addict
- Posts: 4322
- Joined: Tue Jan 02, 2007 8:16 pm
- Location: Cypress TX
- Contact:
Re: ZeroMQ - Lightweight messaging
translate it into what? 

-
- User
- Posts: 43
- Joined: Tue Jul 10, 2007 8:09 pm
Re: ZeroMQ - Lightweight messaging
into Purebasic please
-
- User
- Posts: 43
- Joined: Tue Jul 10, 2007 8:09 pm