Zmq
From charlesreid1
Contents
Notes
Zmq (abbreviation for Zero MQ) is messaging queue software.
Messaging queue software follows the pub/sub (publisher/subscriber) architecture. This involves creating asychronous messaging pipelines. Agents that are creating events can publish to a pipeline, while agents that are processing events can subscribe to a pipeline.
Documentation: https://pyzmq.readthedocs.io/
Pair Model
The pair model for ZMQ consists of a pair of instances: one client and one server.
Pair Model: Simple Client and Server
The following two files, pairclient.py and pairserver.py, illustrate the simplest possile pair model for zmq.
pairclient.py:
import zmq import random import sys import time """ ZMQ Pair Client This generates test messages every half second, faster than the server will print/process the messages. This just illustrates that ZMQ will store messages in a queue as they come in, and will not throw messages away. """ port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://localhost:%s" % port) i = 0 while True: i += 1 socket.send_string("ping %d"%(i)) time.sleep(0.5)
pairserver.py:
import zmq import random import sys import time """ ZMQ Pair Server This accepts messages from a ZMQ message queue and prints them at a rate of two per five seconds. This is slower than the client generates messages. Thils illustrates the queue behavior of ZMQ. """ port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.bind("tcp://*:%s" % port) while True: msg = socket.recv() print(msg) msg = socket.recv() print(msg) time.sleep(5)
Pair Model: Json Client and Server
The following example expands on the prior example, and shows how to build a ZMQ client/server pair that send and receive JSON data:
jsonclient.py:
import json import zmq import random import sys import time port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://localhost:%s" % port) for i in range(50): ## Send as string #d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000)) #print("sending data %s"%(json.dumps(d))) #socket.send_string("%s"%json.dumps(d)) #time.sleep(0.1) # Send as dict d = dict(apples=random.randint(1,1000), oranges=random.randint(1,1000), bananas=random.randint(1,1000)) print("sending data %s"%(d)) socket.send_json(d) time.sleep(0.1)
jsonserver.py:
import json import zmq import random import sys import time port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.bind("tcp://*:%s" % port) while True: ## Receive as string #msg = socket.recv() #d = json.loads(msg) #print(d) #time.sleep(1) # Receive as json d = socket.recv_json() print(type(d)) print(d) time.sleep(1)
Using the Server as a Filter
One of the extremely useful patterns that can be used in this pair framework is, the messaging server can act as both a message buffer and as a filter.
To explain with an example, suppose that we instrument a code to send a message in a section that may be called many times per second, but we are only interested in the progression of the code in wall time intervals of 0.1 or 1 second. To accomplish this, we can store messages in the queue with a timestamp, and discard any messages that arrived between now and the interval of interest (0.1 or 1 second).
Implementing this in code, we do the following:
- Create a zmq context
- Listen for Json on the Zmq socket
- Each time a new Json comes in, check the timestamp and compare it to the poll interval desired
- If the message arrived sooner than the poll interval, ignore it
import dateutil.parser from pymongo import MongoClient import zmq import time port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.bind("tcp://*:%s"%(port)) poll_interval = 0.1 # in seconds prior_ts = None prior_d = None while True: try: d = socket.recv_json() ts = dateutil.parser.parse(d['timestamp']) if(prior_d is None and prior_ts is None): prior_ts = ts prior_d = d else: diff = (ts-prior_ts).total_seconds() if(diff>poll_interval): print(ts) prior_d = d prior_ts = ts except zmq.ZMQError: time.sleep(1)