Zmq
From charlesreid1
Contents
Notes
Zmq (abbreviation for Zero MQ) is messaging queue software.
Messaging queue software follows the pub/sub (publisher/subscriber) architecture. This involves creating asychronous messaging pipelines. Agents that are creating events can publish to a pipeline, while agents that are processing events can subscribe to a pipeline.
Documentation: https://pyzmq.readthedocs.io/
Pair Model
The pair model for ZMQ consists of a pair of instances: one client and one server.
Pair Model: Simple Client and Server
The following two files, pairclient.py and pairserver.py, illustrate the simplest possile pair model for zmq.
pairclient.py:
import zmq
import random
import sys
import time
"""
ZMQ Pair Client
This generates test messages every half second,
faster than the server will print/process the
messages. This just illustrates that ZMQ will
store messages in a queue as they come in, and
will not throw messages away.
"""
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
i = 0
while True:
i += 1
socket.send_string("ping %d"%(i))
time.sleep(0.5)
pairserver.py:
import zmq
import random
import sys
import time
"""
ZMQ Pair Server
This accepts messages from a ZMQ message queue
and prints them at a rate of two per five seconds.
This is slower than the client generates messages.
Thils illustrates the queue behavior of ZMQ.
"""
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)
while True:
msg = socket.recv()
print(msg)
msg = socket.recv()
print(msg)
time.sleep(5)
Pair Model: Json Client and Server
The following example expands on the prior example, and shows how to build a ZMQ client/server pair that send and receive JSON data:
jsonclient.py:
import json
import zmq
import random
import sys
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
for i in range(50):
## Send as string
#d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000))
#print("sending data %s"%(json.dumps(d)))
#socket.send_string("%s"%json.dumps(d))
#time.sleep(0.1)
# Send as dict
d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000))
print("sending data %s"%(d))
socket.send_json(d)
time.sleep(0.1)
jsonserver.py:
import json
import zmq
import random
import sys
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)
while True:
## Receive as string
#msg = socket.recv()
#d = json.loads(msg)
#print(d)
#time.sleep(1)
# Receive as json
d = socket.recv_json()
print(type(d))
print(d)
time.sleep(1)
Using the Server as a Filter
One of the extremely useful patterns that can be used in this pair framework is, the messaging server can act as both a message buffer and as a filter.
To explain with an example, suppose that we instrument a code to send a message in a section that may be called many times per second, but we are only interested in the progression of the code in wall time intervals of 0.1 or 1 second. To accomplish this, we can store messages in the queue with a timestamp, and discard any messages that arrived between now and the interval of interest (0.1 or 1 second).
Implementing this in code, we do the following:
- Create a zmq context
- Listen for Json on the Zmq socket
- Each time a new Json comes in, check the timestamp and compare it to the poll interval desired
- If the message arrived sooner than the poll interval, ignore it
import dateutil.parser
from pymongo import MongoClient
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s"%(port))
poll_interval = 0.1 # in seconds
prior_ts = None
prior_d = None
while True:
try:
d = socket.recv_json()
ts = dateutil.parser.parse(d['timestamp'])
if(prior_d is None and prior_ts is None):
prior_ts = ts
prior_d = d
else:
diff = (ts-prior_ts).total_seconds()
if(diff>poll_interval):
print(ts)
prior_d = d
prior_ts = ts
except zmq.ZMQError:
time.sleep(1)