ZeroMQ - Lightweight messaging

Just starting out? Need help? Post your questions and find answers here.
Mike Trader
User
User
Posts: 43
Joined: Tue Jul 10, 2007 8:09 pm

ZeroMQ - Lightweight messaging

Post by Mike Trader »

IF anyone has in interest in lightweight messaging, check out ZeroMQ
http://www.coastrd.com/zeromq-messaging
Last edited by Mike Trader on Wed Jul 14, 2010 2:09 am, edited 1 time in total.
User avatar
idle
Always Here
Always Here
Posts: 5834
Joined: Fri Sep 21, 2007 5:52 am
Location: New Zealand

Re: ZeroMQ - Lightweight messaging

Post by idle »

I used search and replace, so you will need to check it

Code: Select all

#ZMQ_HAUSNUMERO   = 1000 ; 156384712 ; avoid collision With other errno ranges

#ENOTSUP          = #ZMQ_HAUSNUMERO + 1  ; POSIX errors
#EPROTONOSUPPORT  = #ZMQ_HAUSNUMERO + 2  ;
#ENOBUFS          = #ZMQ_HAUSNUMERO + 3  ;
#ENETDOWN         = #ZMQ_HAUSNUMERO + 4  ;
#EADDRINUSE       = #ZMQ_HAUSNUMERO + 5  ;
#EADDRNOTAVAIL    = #ZMQ_HAUSNUMERO + 6  ;
#ECONNREFUSED     = #ZMQ_HAUSNUMERO + 7  ;
#EINPROGRESS      = #ZMQ_HAUSNUMERO + 8  ;

#EMTHREAD         = #ZMQ_HAUSNUMERO + 50 ; Native 0MQ error codes
#EFSM             = #ZMQ_HAUSNUMERO + 51 ;
#ENOCOMPATPROTO   = #ZMQ_HAUSNUMERO + 52 ;

#ZMQ_MAX_VSM_SIZE = 30  ; Max size of Very Small Message. VSMs are passed by value to reduce memory alloc/dealloc

#ZMQ_DELIMITER    = 31  ; Message structure - may be stored in ;content; member of the
#ZMQ_VSM          = 32  ; Message structure - message instead of regular pointer to the data

#ZMQ_MSG_TBC      = 1   ; Message flag
#ZMQ_MSG_SHARED   = 128 ; strictly speaking not a message flag
                     
#ZMQ_POLL         = 1   ; 0MQ infrastructure (a.k.a. context) initialisation & termination

#ZMQ_P2P          = 0   ; 0MQ socket definition
#ZMQ_PUB          = 1   ;
#ZMQ_SUB          = 2   ;
#ZMQ_REQ          = 3   ;
#ZMQ_REP          = 4   ;
#ZMQ_XREQ         = 5   ;
#ZMQ_XREP         = 6   ;
#ZMQ_UPSTREAM     = 7   ;
#ZMQ_DOWNSTREAM   = 8   ;
                       
#ZMQ_HWM          = 1   ;
#ZMQ_LWM          = 2   ;
#ZMQ_SWAP         = 3   ;
#ZMQ_AFFINITY     = 4   ;
#ZMQ_IDENTITY     = 5   ;
#ZMQ_SUBSCRIBE    = 6   ;
#ZMQ_UNSUBSCRIBE  = 7   ;
#ZMQ_RATE         = 8   ;
#ZMQ_RECOVERY_IVL = 9   ;
#ZMQ_MCAST_LOOP   = 10  ;
#ZMQ_SNDBUF       = 11  ;
#ZMQ_RCVBUF       = 12  ;

#ZMQ_NOBLOCK      = 1   ;
#ZMQ_NOFLUSH      = 2   ;           
       
#ZMQ_POLLIN       = 1   ; I/O multiplexing
#ZMQ_POLLOUT      = 2
#ZMQ_POLLERR      = 4
               

Structure zmq_pollitem_t     ; I/O multiplexing
    pSocket.i    ; void *socket;
    fd.i   ; int fd;
    events.w  ; short events;
    revents.w  ; short revents;
EndStructure
         

;  Shared message buffer. Message data are either allocated in one
;  continuous block along with this structure - thus avoiding one
;  malloc/free pair or they are stored in used-supplied memory.
;  In the latter case, ffn member stores pointer to the function to be
;  used to deallocate the data. If the buffer is actually shared
;  (there are at least 2 references to it) refcount member contains number of references.
Structure msg_content_t
    pData.i ; void *DATA;
    DataLen.i ; size_t SIZE;
    pFree.i ; zmq_free_fn *ffn;
    phInt.i ; void *hint;   
    refcnt.i; atomic_counter_t refcnt - Set counter value (not thread-safe).
EndStructure              ; A class that represents an integer that can be incremented/decremented in atomic fashion.

   
; Note: ;content; is not a pointer to the raw data, it is pointer to msg_content_t structure
Structure zmq_msg_t                          ; Message structure
    pContent.msg_content_t      ; pointer to msg_content_t structure
    flags.b                   ; unsigned char flags;
    vsm_size.b                   ; unsigned char vsm_size
    vsm_data.b{ZMQ_MAX_VSM_SIZE} ; unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]
EndStructure


Structure ZMQ_free_fn        ;
    pData.i      ; void *DATA
    hInt .i      ; void *hint
EndStructure 


;------------ Sockets
ImportC libzmq.dll       
;ZMQ_EXPORT void *zmq_init        (int app_threads, int io_threads, int flags)

zmq_init.i(app_threads.i,io_threads.i,flags.i)

;ZMQ_EXPORT INT  zmq_connect (void *s, const char *ADDR)
zmq_connect(pSoc.i,psAddr.i)

;ZMQ_EXPORT INT  zmq_bind (void *s, const char *ADDR)

zmq_bind(pSoc.i,psAddr.i ) 

;ZMQ_EXPORT void *zmq_socket (void *context, INT structure)

zmq_socket (pContext.i,Socstructure  ).i ; PTR 

;ZMQ_EXPORT INT  zmq_setsockopt (void *s, INT OPTION, const void *optval, size_t optvallen)
zmq_setsockopt (pSoc.i,Optn.i ,OptVal.i,OptValLen.i ) 

;ZMQ_EXPORT INT  zmq_send (void *s, zmq_msg_t *msg, INT flags)
zmq_send (pSoc.i,*pMsg.ZMQ_msg_t,flags.i) 

;ZMQ_EXPORT INT  zmq_flush (void *s)
zmq_flush (pSoc.i ) 

;ZMQ_EXPORT INT  zmq_recv (void *s, zmq_msg_t *msg, INT flags)
zmq_recv (pSoc.i,*pMsg.ZMQ_msg_t,flags.i) 
           
;ZMQ_EXPORT INT  zmq_poll ( ZMQ_pollitem_t *items, INT nitems, LONG TIMEOUT) 
zmq_poll (pItems.i,nItems.i,Tymeout.i) 
               
;ZMQ_EXPORT INT  zmq_close (void *s)
zmq_close (pSoc.i) 

;ZMQ_EXPORT int  zmq_term          (void *context)
zmq_term (pCtxt.i) 


;------------ Messages

;ZMQ_EXPORT INT  zmq_msg_init      ( ZMQ_msg_t *msg )
zmq_msg_init (*pMsg.ZMQ_msg_t) 

;ZMQ_EXPORT int  zmq_msg_init_size ( ZMQ_msg_t *msg, size_t size)
zmq_msg_init_size (*pMsg.ZMQ_msg_t,tSize.i ) 

;ZMQ_EXPORT INT  zmq_msg_init_data ( ZMQ_msg_t *msg, void *DATA, size_t SIZE, zmq_free_fn *ffn, void *hint)
zmq_msg_init_data (*pMsg.ZMQ_msg_t,pData.i,tSize.i,*ffn.i,hint.i) 

;ZMQ_EXPORT int  zmq_msg_move      ( ZMQ_msg_t *dest, zmq_msg_t *src)
zmq_msg_move (*dest.ZMQ_msg_t,*src.ZMQ_msg_t) 
                                 
;ZMQ_EXPORT int  zmq_msg_copy      ( ZMQ_msg_t *dest, zmq_msg_t *src)
zmq_msg_copy (*dest.ZMQ_msg_t,*src.ZMQ_msg_t) 

;ZMQ_EXPORT void *zmq_msg_data    ( ZMQ_msg_t *msg)
zmq_msg_data(*pMsg.ZMQ_msg_t)

;ZMQ_EXPORT size_t zmq_msg_size   ( ZMQ_msg_t *msg)
zmq_msg_size (*pMsg.ZMQ_msg_t)

;ZMQ_EXPORT int  zmq_msg_close     ( ZMQ_msg_t *msg)
zmq_msg_close (*pMsg.ZMQ_msg_t) 
                       


;------------ Misc
           
;ZMQ_EXPORT void zmq_version (INT *major, INT *minor, INT *patch)
zmq_version (*major,*minor,*patch)
                 
;ZMQ_EXPORT INT  zmq_errno()
zmq_errno () 

; ZMQ_EXPORT const char *zmq_strerror (INT errnum) 
zmq_strerror(errnum.i)

;ZMQ_EXPORT void *zmq_stopwatch_start () 
zmq_stopwatch_start() ; Returns the handle to the watch.

;ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_)

zmq_stopwatch_stop (pWatch.i ) ; Returns the number of microseconds elapsed since

;ZMQ_EXPORT void zmq_sleep (INT seconds_)
zmq_sleep (Secs  ) ;   Sleeps for specified number of seconds.

EndImport 

Mike Trader
User
User
Posts: 43
Joined: Tue Jul 10, 2007 8:09 pm

Re: ZeroMQ - Lightweight messaging

Post by Mike Trader »

Thank you that looks good. A simple client to test it below. (if someone could translate pls)
Server and dll downloaded at http://www.coastrd.com/download

http://www.coastrd.com/zeromq-messaging ... zmq-client

Code: Select all

FUNCTION SendMessageZmq( sConnection AS STRING ) AS LONG 

  LOCAL i, RetVal, MsgSz AS LONG
  LOCAL pCtx, pSoc AS DWORD
  LOCAL pData AS ASCIIZ PTR
  LOCAL pMsgIn, pMsgOut AS zmq_msg_t
  LOCAL sQuery, sResponse AS STRING
  LOCAL pzErr AS ASCIIZ PTR


    pCtx = zmq_init( 1, 1, 0 ) ' Initialise 0MQ context - app_threads, io_threads, flags
                       
    DO ' device to jump out and still close socket and release ZMQ

        pSoc = zmq_socket( pCtx, %ZMQ_REQ ) ' ZMQ_REQ socket to send requests and receive replies

        RetVal = zmq_connect( pSoc, STRPTR(sConnection) ) ' PRINT #hDbg, "sConnection="+sConnection
                   

        '- Send a message
        sQuery = "SELECT * FROM mytable, END SERVER" '
PRINT #hDbg, "Query=" + sQuery
        RetVal = zmq_msg_init_size( VARPTR(pMsgOut), LEN(sQuery) ) ' Allocate a message

        CopyMem( zmq_msg_data(VARPTR(pMsgOut)), STRPTR(sQuery), LEN(sQuery) ) ' Copy message to ZMQ

        RetVal = zmq_send( pSoc, VARPTR(pMsgOut), 0) ' Send Message

                 
        '- Recover response
        RetVal = zmq_msg_init(VARPTR(pMsgIn)) ' Init must be called before zmq_recv

        RetVal = zmq_recv( pSoc, VARPTR(pMsgIn), 0 ) ' Receive a message, blocks until one is available

        MsgSz = zmq_msg_size(VARPTR(pMsgIn)) ' Check Message for content

        pData = zmq_msg_data(VARPTR(pMsgIn)) ' PRINT #hDbg, "Response=" + @pData 

        sResponse = NUL$(MsgSz) ' Allocate memory
        CopyMem( STRPTR(sResponse), pData, MsgSz ) ' Copy message to local string

        RetVal = zmq_msg_close(VARPTR(pMsgIn))

PRINT #hDbg, "Response=" + sResponse ' @pData 
MSGBOX "Response",64,sResponse

        EXIT DO
    LOOP
               
    IF pSoc THEN RetVal = zmq_close(pSoc)

    RetVal = zmq_term(pCtx)

  FUNCTION = 0

END FUNCTION
User avatar
Rook Zimbabwe
Addict
Addict
Posts: 4322
Joined: Tue Jan 02, 2007 8:16 pm
Location: Cypress TX
Contact:

Re: ZeroMQ - Lightweight messaging

Post by Rook Zimbabwe »

translate it into what? :)
Binarily speaking... it takes 10 to Tango!!!

Image
http://www.bluemesapc.com/
Mike Trader
User
User
Posts: 43
Joined: Tue Jul 10, 2007 8:09 pm

Re: ZeroMQ - Lightweight messaging

Post by Mike Trader »

into Purebasic please
Mike Trader
User
User
Posts: 43
Joined: Tue Jul 10, 2007 8:09 pm

Re: ZeroMQ - Lightweight messaging

Post by Mike Trader »

anyone
Post Reply