|ZeroMQ Messaging Service|
Message-oriented middleware (MOM)For reference: "Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems. MOM allows application modules to be distributed over heterogeneous platforms and reduces the complexity of developing applications that span multiple operating systems and network protocols. The middleware creates a distributed communications layer that insulates the application developer from the details of the various operating systems and network interfaces." (ref)
That is a fancy way of saying that instead of a monolithic application, smaller self contained applications send messages and requests between each other. This allows individual components to be maintained individually, and also allows one to potentially duplicate components for distributed load balancing.
Would my little security system become more complex, or simplified by adding a messaging system? I determined that the education alone would be worth it, so I jumped onto the messaging system bandwagon.
Messaging System OptionsI researched several messaging system options, with my primary requirements being:
- Compatibility with the Raspberry Pi
- Compatibility with Python
- Low memory footprint
- Fast performance
After investigating popular options such as ActiveMQ and RabbitMQ, I settled on a tiny messaging system called ZeroMQ.
ZeroMQZeroMQ (also ØMQ or ZMQ) is a minimal Message-oriented middleware (MOM). It is only a small step above just using bare TCP sockets, but provides enough messaging power to do pretty much anything you could want.
The first benefit of ZeroMQ is it takes the TCP socket stream and breaks it down into the individual messages for me. I then don't have to worry if the partial data stream I receive is complete or not. With ZeroMQ you receive the whole message, or not. Messages are all "string" based, which makes them easy for Programming languages like Python to handle. If you need to send non-string messages, you will need to decode them from the received string, or play with the optional RAW messaging functions.
The second benefit of ZeroMQ is it handles connections for me. I don't have to worry about if a connection is lost, as it will reconnect for me, when able to. I also don't have to worry about if a server doesn't exist at the time a client connects, as ZeroMQ will auto connect to the server when the server eventually comes online.
The third benefit of ZeroMQ is it handles distributed messages out of the box. I can send a single message to my publishing pipe, and any clients that are subscribed to that pipe will receive the message.
ZMQ PatternsAccording to the ZeroMQ documentation, the following messaging patterns are baked into it's core:
- Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
- Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
- Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
- Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with "normal" pairs of sockets.
For example, my keypad application will be waiting for a key press. When a key is pressed, such as the "LEFT" key, it will generate a "KEY: LEFT" message and send it to the message router. The message router will then turn around and replay the "KEY: LEFT" to any application that is subscribed to messages prefixed with "KEY:". The music player application, which is listening for "KEY:" will then pick up the message and take the predetermined action, such as going to the previous song.
Another fascinating side effect from this setup is I could generate a "KEY: LEFT" message from any application, such as a web interface, or even a cell phone app. Security issues aside, think of the possibilities. I know, this blew my mind too!
InstallationThere are two parts to get ZeroMQ installed and working with Python. First, you need the system libraries, and second the Python bindings.
# For Raspbian: apt-get install python-zmq
easy_install pyzmq # or pip install pyzmq
Source CodeIf you are interested in the full code (still a work in progress), see the lala-pi repo on GitHub.
# git clone https://github.com/oeey/lala-pi.git
Message RouterAs ZeroMQ is a minimal messaging system, you are left on your own to build the messaging router services. Thankfully this is quite simple.
Here is the minimal message router I started with:
#!/usr/bin/env python import os import sys import time import zmq # ZMQ context, only need one per application context = zmq.Context() # for sending messages z_send = context.socket(zmq.PUB) z_send.bind("tcp://*:5555") # for receiving messages z_recv = context.socket(zmq.SUB) z_recv.bind("tcp://*:5556") z_recv.setsockopt(zmq.SUBSCRIBE, '') # subscribe to everything print "ZMQ server started." while True: message = None # wait for incoming message try: message = z_recv.recv() except zmq.ZMQError as err: print "Receive error: " + str(err) # replay message to all subscribers if message: try: z_send.send(message) except zmq.ZMQError as err: print "Send error: " + str(err)
I then added some logging of the messages, and a bit of clean up code for a more robust message router:
#!/usr/bin/env python import os import sys import time import zmq # ZMQ context, only need one per application context = zmq.Context() # for sending messages z_send = context.socket(zmq.PUB) z_send.bind("tcp://*:5555") # for receiving messages z_recv = context.socket(zmq.SUB) z_recv.bind("tcp://*:5556") z_recv.setsockopt(zmq.SUBSCRIBE, '') # subscribe to everything # record all message requests to a file record = open('router-records.txt', 'w') # counters for messages last_time = time.time() count = 0 print "ZMQ server started." while True: message = None # wait for incoming message try: message = z_recv.recv() except zmq.ZMQError as err: print "Receive error: " + str(err) # if message received, and not an error, then # replay message to subscribers if message: count += 1 record.write(str(count) + ':' + message + '\n') # occasionally flush the router-record.txt file if time.time() > last_time + 2: record.flush() last_time = time.time() try: z_send.send(message) except zmq.ZMQError as err: print "Send error: " + str(err) if message.strip() == "DEATH": print "Death received, shutting down." break print "Shutting down..." record.close() z_send.close() z_recv.close() context.term() print "Shut down."
My final version also includes some "forking" code to daemonize (background) the application. As print statements would be lost to the void, I also converted all of the print statements over to using syslog. The full source can be found in the Bitbucket repo.
Now, with a working router, I can build any application, connect to the message router and have a fully functional messaging system.
Message ClientFor a sample messaging client, here is my "test" client that I use. Using this as a template, a full application could be constructed
#!/usr/bin/env python import os import sys import time import zmq context = zmq.Context() z_recv = context.socket(zmq.SUB) z_recv.connect("tcp://localhost:5555") z_send = context.socket(zmq.PUB) z_send.connect("tcp://localhost:5556") # z_recv.setsockopt(zmq.SUBSCRIBE, 'KEYBOARD:') z_recv.setsockopt(zmq.SUBSCRIBE, '') # subscribe to everything print "ZMQ Client Started!" while True: sys.stdout.write("Message: ") message = raw_input().strip() if message: try: print 'SEND:' + message z_send.send(message) except zmq.ZMQError as err: print 'Send error: ' + str(err) try: # don't block if no message waiting in_message = z_recv.recv(zmq.DONTWAIT) print 'RECV:' + in_message except zmq.ZMQError as err: print 'Receive error: ' + str(err)
More to come... see label: lala-pi