ZeroMQ Introduction and Examples
ZeroMQ (ZMQ) is a high-performance, asynchronous messaging library that simplifies building distributed systems. It provides socket-like abstractions for various messaging patterns including request-reply, publish-subscribe, and push-pull. Unlike traditional message brokers, ZeroMQ requires no centralized server, making it lightweight and ideal for embedded systems and microservices.
Table of Contents
- Prerequisites
- Installing ZeroMQ
- ZeroMQ Concepts
- Socket Types and Patterns
- Request-Reply Pattern
- Publish-Subscribe Pattern
- Push-Pull Pattern
- Advanced Patterns
- Performance Optimization
- Troubleshooting
- Conclusion
Prerequisites
Before using ZeroMQ, ensure you have:
- Linux system (Ubuntu 20.04+, CentOS 8+)
- C/C++ development tools
- Python 3.6+ (for Python examples)
- Understanding of TCP/IP networking
- Basic familiarity with asynchronous messaging
Installing ZeroMQ
Install ZeroMQ library on Ubuntu/Debian:
sudo apt-get update
sudo apt-get install -y libzmq3-dev libzmq3
For CentOS/RHEL:
sudo yum install -y zeromq-devel zeromq
Verify installation:
zmq_version
pkg-config --modversion libzmq
Install Python bindings:
pip3 install pyzmq
For C/C++ development, install development headers:
sudo apt-get install -y libzmq3-dev
Test installation with a simple Python example:
#!/usr/bin/env python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:5555")
socket.send(b"Hello")
socket.close()
context.term()
ZeroMQ Concepts
ZeroMQ operates on several key concepts:
Context: Represents the global state, similar to a thread pool. Create one per process:
import zmq
context = zmq.Context()
Socket: Abstract communication endpoint with built-in buffering:
socket = context.socket(zmq.PUSH) # Different types available
Transport: Communication mechanism (TCP, IPC, UDP, INPROC):
socket.bind("tcp://0.0.0.0:5555") # Server
socket.connect("tcp://localhost:5555") # Client
Message: Atomic unit of data, supports multi-part messages:
socket.send(b"single part")
socket.send_multipart([b"part1", b"part2", b"part3"])
Socket Types and Patterns
ZeroMQ provides specialized socket types for different messaging patterns:
PUSH/PULL: Distributes work across workers (producer-consumer). REQ/REP: Synchronous request-reply pattern. PUB/SUB: Asynchronous publish-subscribe. PAIR: Full-duplex communication between two endpoints. DEALER/ROUTER: Advanced patterns for complex routing.
Socket type combinations must be compatible:
- PUSH connects to PULL
- REQ connects to REP
- PUB connects to SUB
- PAIR connects to PAIR
Request-Reply Pattern
Implement synchronous request-reply communication. The REQ socket sends a request and waits for a reply; REP socket receives and replies.
Create a simple server (responder):
#!/usr/bin/env python3
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://0.0.0.0:5555")
print("Server listening on port 5555...")
request_count = 0
try:
while True:
# Wait for next request
message = socket.recv()
request_count += 1
print(f"[{request_count}] Received request: {message.decode()}")
# Simulate processing
time.sleep(0.1)
# Send reply
reply = f"Echo: {message.decode()}"
socket.send(reply.encode())
except KeyboardInterrupt:
print("\nServer shutting down")
finally:
socket.close()
context.term()
Create a client (requester):
#!/usr/bin/env python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
print("Client connecting to server...")
for i in range(5):
request = f"Request #{i+1}"
print(f"Sending: {request}")
socket.send(request.encode())
reply = socket.recv()
print(f"Received reply: {reply.decode()}")
socket.close()
context.term()
Run the server in one terminal:
python3 server.py
Run the client in another:
python3 client.py
The client sends requests and waits for replies. Each request-reply cycle completes before the next request.
Publish-Subscribe Pattern
Implement asynchronous publish-subscribe. PUB sockets broadcast messages; SUB sockets receive matching messages.
Create a publisher:
#!/usr/bin/env python3
import zmq
import time
import random
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://0.0.0.0:5556")
print("Publisher listening on port 5556...")
time.sleep(1) # Wait for subscribers to connect
try:
message_count = 0
while True:
# Generate different topics
topics = ["weather", "sports", "news"]
topic = random.choice(topics)
message = f"{random.randint(10, 30)}C"
# Publish as multi-part message: [topic, message]
socket.send_multipart([topic.encode(), message.encode()])
message_count += 1
print(f"Published [{topic}]: {message}")
time.sleep(1)
except KeyboardInterrupt:
print("\nPublisher shutting down")
finally:
socket.close()
context.term()
Create a subscriber:
#!/usr/bin/env python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
# Subscribe to specific topics
socket.setsockopt(zmq.SUBSCRIBE, b"weather")
socket.setsockopt(zmq.SUBSCRIBE, b"news")
# Unsubscribe from sports
# socket.setsockopt(zmq.UNSUBSCRIBE, b"sports")
print("Subscriber listening...")
try:
while True:
topic, message = socket.recv_multipart()
print(f"Received [{topic.decode()}]: {message.decode()}")
except KeyboardInterrupt:
print("\nSubscriber shutting down")
finally:
socket.close()
context.term()
Run multiple subscribers and a publisher:
# Terminal 1
python3 publisher.py
# Terminal 2
python3 subscriber.py
# Terminal 3
python3 subscriber.py
All subscribers receive all published messages matching their subscriptions.
Push-Pull Pattern
Implement work distribution with PUSH/PULL sockets. Multiple workers consume tasks from a single queue.
Create a task source (pusher):
#!/usr/bin/env python3
import zmq
import random
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://0.0.0.0:5557")
print("Task source listening on port 5557...")
time.sleep(1)
try:
for i in range(20):
# Create tasks
task = {
"task_id": i + 1,
"data": f"Process item {random.randint(1, 100)}"
}
import json
socket.send(json.dumps(task).encode())
print(f"Pushed task: {task}")
time.sleep(0.5)
except KeyboardInterrupt:
print("\nSource shutting down")
finally:
socket.close()
context.term()
Create workers (pullers):
#!/usr/bin/env python3
import zmq
import json
import time
import sys
worker_id = sys.argv[1] if len(sys.argv) > 1 else "1"
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5557")
print(f"Worker {worker_id} connected, waiting for tasks...")
task_count = 0
try:
while True:
task = json.loads(socket.recv().decode())
task_count += 1
print(f"Worker {worker_id} processing: {task}")
time.sleep(0.5) # Simulate processing
print(f"Worker {worker_id} completed: task_{task['task_id']}")
except KeyboardInterrupt:
print(f"\nWorker {worker_id} processed {task_count} tasks")
finally:
socket.close()
context.term()
Run the task source and multiple workers:
# Terminal 1
python3 pusher.py
# Terminal 2
python3 puller.py 1
# Terminal 3
python3 puller.py 2
# Terminal 4
python3 puller.py 3
Tasks are distributed fairly among workers using ZeroMQ's built-in load balancing.
Advanced Patterns
Implement DEALER/ROUTER for more complex patterns:
# DEALER client with async requests
client_socket = context.socket(zmq.DEALER)
client_socket.setsockopt(zmq.IDENTITY, b"client-1")
client_socket.connect("tcp://localhost:5555")
# ROUTER server for advanced routing
server_socket = context.socket(zmq.ROUTER)
server_socket.bind("tcp://0.0.0.0:5555")
Implement polled/multiplexed sockets for handling multiple connections:
#!/usr/bin/env python3
import zmq
context = zmq.Context()
# Create multiple sockets
socket1 = context.socket(zmq.PULL)
socket1.connect("tcp://localhost:5557")
socket2 = context.socket(zmq.SUB)
socket2.connect("tcp://localhost:5556")
socket2.setsockopt(zmq.SUBSCRIBE, b"")
# Create poller
poller = zmq.Poller()
poller.register(socket1, zmq.POLLIN)
poller.register(socket2, zmq.POLLIN)
try:
while True:
events = dict(poller.poll(1000)) # 1 second timeout
if socket1 in events:
message = socket1.recv()
print(f"From socket1: {message.decode()}")
if socket2 in events:
message = socket2.recv()
print(f"From socket2: {message.decode()}")
except KeyboardInterrupt:
print("\nShutting down")
finally:
socket1.close()
socket2.close()
context.term()
Performance Optimization
Optimize ZeroMQ for high throughput:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
# Set socket options for performance
socket.setsockopt(zmq.SNDHWM, 1000) # Send high water mark
socket.setsockopt(zmq.RCVHWM, 1000) # Receive high water mark
socket.setsockopt(zmq.SNDBUF, 65536) # Send buffer size
socket.setsockopt(zmq.RCVBUF, 65536) # Receive buffer size
# Use IPC for local processes (faster than TCP)
socket.bind("ipc:///tmp/zmq-socket")
Use inproc transport for in-process communication:
# Thread 1
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("inproc://internal")
# Thread 2
socket2 = context.socket(zmq.PAIR)
socket2.connect("inproc://internal")
Implement message batching:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://0.0.0.0:5555")
# Send multiple messages
for i in range(1000):
socket.send(f"Message {i}".encode(), zmq.SNDMORE if i < 999 else 0)
Troubleshooting
Debug ZeroMQ connections:
# Check listening ports
netstat -tlnp | grep zmq
# Monitor ZeroMQ with tcpdump
sudo tcpdump -i lo port 5555
Handle connection issues in Python:
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
# Set timeout for recv
socket.setsockopt(zmq.RCVTIMEO, 5000) # 5 seconds
try:
socket.connect("tcp://localhost:5555")
socket.send(b"Hello")
reply = socket.recv()
except zmq.error.Again:
print("Request timeout - server not responding")
except zmq.error.ZMQError as e:
print(f"ZMQ Error: {e}")
Test connectivity:
# Terminal 1 - Listen on socket
python3 -c "
import zmq
ctx = zmq.Context()
s = ctx.socket(zmq.PULL)
s.bind('tcp://0.0.0.0:5555')
print('Listening...')
s.recv()
"
# Terminal 2 - Send message
python3 -c "
import zmq
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.connect('tcp://localhost:5555')
s.send(b'test')
"
Conclusion
ZeroMQ provides a lightweight, high-performance messaging library for building distributed systems without centralized brokers. This guide covered installation, fundamental concepts, and key messaging patterns: request-reply for synchronous communication, publish-subscribe for broadcasting, and push-pull for work distribution. ZeroMQ excels in scenarios requiring low-latency, high-throughput messaging within microservices architectures, embedded systems, and scientific computing. For production deployments, implement proper error handling, use appropriate socket options for performance, monitor connections, and establish messaging patterns that match your application requirements.


