Saturday, December 21, 2013

ZMQ Messaging System - lala-pi

ZeroMQ Messaging Service
The first hurdle I ran into with this over-engineered room security system, was the complexity of the monolithic application, I was putting together, to control the various sensors and devices.  Luckily, a coworker suggested I break the system up into smaller components and use a messaging system.  This is a common practice for enterprise level applications.

Message-oriented middleware (MOM)

For reference: "Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems. MOM allows application modules to be distributed over heterogeneous platforms and reduces the complexity of developing applications that span multiple operating systems and network protocols. The middleware creates a distributed communications layer that insulates the application developer from the details of the various operating systems and network interfaces." (ref)

That is a fancy way of saying that instead of a monolithic application, smaller self contained applications send messages and requests between each other.  This allows individual components to be maintained individually, and also allows one to potentially duplicate components for distributed load balancing.

Would my little security system become more complex, or simplified by adding a messaging system?  I determined that the education alone would be worth it, so I jumped onto the messaging system bandwagon.

Messaging System Options

I researched several messaging system options, with my primary requirements being:

  • Compatibility with the Raspberry Pi
  • Compatibility with Python
  • Low memory footprint
  • Fast performance
After investigating popular options such as ActiveMQ and RabbitMQ, I settled on a tiny messaging system called ZeroMQ.

ZeroMQ

ZeroMQ (also ØMQ or ZMQ) is a minimal Message-oriented middleware (MOM).  It is only a small step above just using bare TCP sockets, but provides enough messaging power to do pretty much anything you could want.

The first benefit of ZeroMQ is it takes the TCP socket stream and breaks it down into the individual messages for me.  I then don't have to worry if the partial data stream I receive is complete or not.  With ZeroMQ you receive the whole message, or not.  Messages are all "string" based, which makes them easy for Programming languages like Python to handle.  If you need to send non-string messages, you will need to decode them from the received string, or play with the optional RAW messaging functions.

The second benefit of ZeroMQ is it handles connections for me.  I don't have to worry about if a connection is lost, as it will reconnect for me, when able to.  I also don't have to worry about if a server doesn't exist at the time a client connects, as ZeroMQ will auto connect to the server when the server eventually comes online.

The third benefit of ZeroMQ is it handles distributed messages out of the box.  I can send a single message to my publishing pipe, and any clients that are subscribed to that pipe will receive the message.

ZMQ Patterns

According to the ZeroMQ documentation, the following messaging patterns are baked into it's core:
  • Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
  • Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
  • Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
  • Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with "normal" pairs of sockets.
The pattern I have chosen to use for my room security system is the Publisher-Subscriber model, with a message router in the middle that will replay all messages to every other application in the system.  Applications can then choose which messages to subscribe to (or even all of them), and which to ignore.

For example, my keypad application will be waiting for a key press.  When a key is pressed, such as the "LEFT" key, it will generate a "KEY: LEFT" message and send it to the message router.  The message router will then turn around and replay the "KEY: LEFT" to any application that is subscribed to messages prefixed with "KEY:".  The music player application, which is listening for "KEY:" will then pick up the message and take the predetermined action, such as going to the previous song.

Another fascinating side effect from this setup is I could generate a "KEY: LEFT" message from any application, such as a web interface, or even a cell phone app.  Security issues aside, think of the possibilities.  I know, this blew my mind too!

Installation

There are two parts to get ZeroMQ installed and working with Python.  First, you need the system libraries, and second the Python bindings.

System Libraries:
# For Raspbian:
apt-get install python-zmq

Python Binding:
easy_install pyzmq
# or
pip install pyzmq

Source Code

If you are interested in the full code (still a work in progress), see the lala-pi repo on GitHub.
# git clone https://github.com/oeey/lala-pi.git

Message Router

As ZeroMQ is a minimal messaging system, you are left on your own to build the messaging router services.  Thankfully this is quite simple.

Here is the minimal message router I started with:
#!/usr/bin/env python

import os
import sys
import time

import zmq

# ZMQ context, only need one per application
context = zmq.Context()

# for sending messages
z_send = context.socket(zmq.PUB)
z_send.bind("tcp://*:5555")

# for receiving messages
z_recv = context.socket(zmq.SUB)
z_recv.bind("tcp://*:5556")
z_recv.setsockopt(zmq.SUBSCRIBE, '')  # subscribe to everything

print "ZMQ server started."
while True:
    message = None

    # wait for incoming message
    try:
        message = z_recv.recv()
    except zmq.ZMQError as err:
        print "Receive error: " + str(err)

    # replay message to all subscribers
    if message:
        try:
            z_send.send(message)
        except zmq.ZMQError as err:
            print "Send error: " + str(err)

I then added some logging of the messages, and a bit of clean up code for a more robust message router:
#!/usr/bin/env python

import os
import sys
import time

import zmq

# ZMQ context, only need one per application
context = zmq.Context()

# for sending messages
z_send = context.socket(zmq.PUB)
z_send.bind("tcp://*:5555")

# for receiving messages
z_recv = context.socket(zmq.SUB)
z_recv.bind("tcp://*:5556")
z_recv.setsockopt(zmq.SUBSCRIBE, '')  # subscribe to everything

# record all message requests to a file
record = open('router-records.txt', 'w')

# counters for messages
last_time = time.time()
count = 0

print "ZMQ server started."
while True:
    message = None

    # wait for incoming message
    try:
        message = z_recv.recv()
    except zmq.ZMQError as err:
        print "Receive error: " + str(err)

    # if message received, and not an error, then
    # replay message to subscribers
    if message:
        count += 1
        record.write(str(count) + ':' + message + '\n')
        
        # occasionally flush the router-record.txt file
        if time.time() > last_time + 2:
            record.flush()
            last_time = time.time()

        try:
            z_send.send(message)
        except zmq.ZMQError as err:
            print "Send error: " + str(err)

        if message.strip() == "DEATH":
            print "Death received, shutting down."
            break


print "Shutting down..."
record.close()
z_send.close()
z_recv.close()
context.term()
print "Shut down."

My final version also includes some "forking" code to daemonize (background) the application. As print statements would be lost to the void, I also converted all of the print statements over to using syslog.  The full source can be found in the Bitbucket repo.

Now, with a working router, I can build any application, connect to the message router and have a fully functional messaging system.

Message Client

For a sample messaging client, here is my "test" client that I use. Using this as a template, a full application could be constructed
#!/usr/bin/env python

import os
import sys
import time

import zmq

context = zmq.Context()

z_recv = context.socket(zmq.SUB)
z_recv.connect("tcp://localhost:5555")

z_send = context.socket(zmq.PUB)
z_send.connect("tcp://localhost:5556")
# z_recv.setsockopt(zmq.SUBSCRIBE, 'KEYBOARD:')
z_recv.setsockopt(zmq.SUBSCRIBE, '')  # subscribe to everything

print "ZMQ Client Started!"

while True:
    sys.stdout.write("Message: ")
    message = raw_input().strip()

    if message:
        try:
            print 'SEND:' + message
            z_send.send(message)
        except zmq.ZMQError as err:
            print 'Send error: ' + str(err)

    try:
        # don't block if no message waiting
        in_message = z_recv.recv(zmq.DONTWAIT)
        print 'RECV:' + in_message
    except zmq.ZMQError as err:
        print 'Receive error: ' + str(err)


More to come...   see label: lala-pi

1 comment:

Please be respectful.