From charlesreid1


Zmq (abbreviation for Zero MQ) is messaging queue software.

Messaging queue software follows the pub/sub (publisher/subscriber) architecture. This involves creating asychronous messaging pipelines. Agents that are creating events can publish to a pipeline, while agents that are processing events can subscribe to a pipeline.


Pair Model

The pair model for ZMQ consists of a pair of instances: one client and one server.

Pair Model: Simple Client and Server

The following two files, and, illustrate the simplest possile pair model for zmq.

import zmq
import random
import sys
import time

ZMQ Pair Client

This generates test messages every half second,
faster than the server will print/process the
messages. This just illustrates that ZMQ will
store messages in a queue as they come in, and
will not throw messages away.

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

i = 0
while True:
    i += 1
    socket.send_string("ping %d"%(i))

import zmq
import random
import sys
import time

ZMQ Pair Server

This accepts messages from a ZMQ message queue
and prints them at a rate of two per five seconds.
This is slower than the client generates messages.
Thils illustrates the queue behavior of ZMQ.

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)

while True:
    msg = socket.recv()

    msg = socket.recv()


Pair Model: Json Client and Server

The following example expands on the prior example, and shows how to build a ZMQ client/server pair that send and receive JSON data:

import json
import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

for i in range(50):
    ## Send as string
    #d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000))
    #print("sending data %s"%(json.dumps(d)))

    # Send as dict
    d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000))
    print("sending data %s"%(d))

import json
import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)

while True:
    ## Receive as string
    #msg = socket.recv()
    #d = json.loads(msg)

    # Receive as json
    d = socket.recv_json()

Using the Server as a Filter

One of the extremely useful patterns that can be used in this pair framework is, the messaging server can act as both a message buffer and as a filter.

To explain with an example, suppose that we instrument a code to send a message in a section that may be called many times per second, but we are only interested in the progression of the code in wall time intervals of 0.1 or 1 second. To accomplish this, we can store messages in the queue with a timestamp, and discard any messages that arrived between now and the interval of interest (0.1 or 1 second).

Implementing this in code, we do the following:

  • Create a zmq context
  • Listen for Json on the Zmq socket
  • Each time a new Json comes in, check the timestamp and compare it to the poll interval desired
  • If the message arrived sooner than the poll interval, ignore it
import dateutil.parser
from pymongo import MongoClient
import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)

poll_interval = 0.1 # in seconds

prior_ts = None
prior_d = None
while True:
        d = socket.recv_json()
        ts = dateutil.parser.parse(d['timestamp'])

        if(prior_d is None and prior_ts is None):
            prior_ts = ts
            prior_d = d
            diff = (ts-prior_ts).total_seconds()
                prior_d = d
                prior_ts = ts

    except zmq.ZMQError:
