From charlesreid1

Notes

Zmq (abbreviation for Zero MQ) is messaging queue software.

Messaging queue software follows the pub/sub (publisher/subscriber) architecture. This involves creating asychronous messaging pipelines. Agents that are creating events can publish to a pipeline, while agents that are processing events can subscribe to a pipeline.

Documentation: https://pyzmq.readthedocs.io/

Pair Model

The pair model for ZMQ consists of a pair of instances: one client and one server.

Pair Model: Simple Client and Server

The following two files, pairclient.py and pairserver.py, illustrate the simplest possile pair model for zmq.

pairclient.py:

import zmq
import random
import sys
import time

"""
ZMQ Pair Client

This generates test messages every half second,
faster than the server will print/process the
messages. This just illustrates that ZMQ will
store messages in a queue as they come in, and
will not throw messages away.
"""

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

i = 0
while True:
    i += 1
    socket.send_string("ping %d"%(i))
    time.sleep(0.5)

pairserver.py:

import zmq
import random
import sys
import time

"""
ZMQ Pair Server

This accepts messages from a ZMQ message queue
and prints them at a rate of two per five seconds.
This is slower than the client generates messages.
Thils illustrates the queue behavior of ZMQ.
"""

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)

while True:
    msg = socket.recv()
    print(msg)

    msg = socket.recv()
    print(msg)

    time.sleep(5)

Pair Model: Json Client and Server

The following example expands on the prior example, and shows how to build a ZMQ client/server pair that send and receive JSON data:

jsonclient.py:

import json
import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

for i in range(50):
    ## Send as string
    #d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000))
    #print("sending data %s"%(json.dumps(d)))
    #socket.send_string("%s"%json.dumps(d))
    #time.sleep(0.1)

    # Send as dict
    d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000))
    print("sending data %s"%(d))
    socket.send_json(d)
    time.sleep(0.1)

jsonserver.py:

import json
import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)

while True:
    ## Receive as string
    #msg = socket.recv()
    #d = json.loads(msg)
    #print(d)
    #time.sleep(1)

    # Receive as json
    d = socket.recv_json()
    print(type(d))
    print(d)
    time.sleep(1)

Using the Server as a Filter

One of the extremely useful patterns that can be used in this pair framework is, the messaging server can act as both a message buffer and as a filter.

To explain with an example, suppose that we instrument a code to send a message in a section that may be called many times per second, but we are only interested in the progression of the code in wall time intervals of 0.1 or 1 second. To accomplish this, we can store messages in the queue with a timestamp, and discard any messages that arrived between now and the interval of interest (0.1 or 1 second).

Implementing this in code, we do the following:

  • Create a zmq context
  • Listen for Json on the Zmq socket
  • Each time a new Json comes in, check the timestamp and compare it to the poll interval desired
  • If the message arrived sooner than the poll interval, ignore it
import dateutil.parser
from pymongo import MongoClient
import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s"%(port))

poll_interval = 0.1 # in seconds

prior_ts = None
prior_d = None
while True:
    try:
        d = socket.recv_json()
        ts = dateutil.parser.parse(d['timestamp'])

        if(prior_d is None and prior_ts is None):
            prior_ts = ts
            prior_d = d
        else:
            diff = (ts-prior_ts).total_seconds()
            if(diff>poll_interval):
                print(ts)
                prior_d = d
                prior_ts = ts

    except zmq.ZMQError:
        time.sleep(1)

Flags