In my last post, Scale a Web App with Microservices and IBM Message Hub, I talked about separating the producer of data from the consumer of the data by putting a broker or queue in between. This allows simplification of production and consumption code, breaking the tasks into microservices which are small, simple applications that do one job each.

In this article, we’ll look at the differences between the various queue and broker technologies, listed here in order of increasing complexity, resilience, and capacity:

Before we start, let’s sort out some definitions. Firstly, a queue is a list of data items (payloads) that are stored in the order they were generated. One or more worker processes can pick a payload from the end of the queue and perform their work on it.

Queue

PubSub is similar, except that many consumers of the queue all get the data.

PubSub

In most cases, the contents of a queue persist until they are consumed by a worker process. In contrast, PubSub channels tend to be ephemeral, with the contents being discarded if there are no consumers present.

Async

Async is a JavaScript library that simplifies concurrency. One of its tools is a very simple in-memory queue to allow your in-browser or Node.js code to sequence waiting tasks.

First you define the function that will deal with each payload item in the queue:

var q = async.queue(function(payload, callback) {

    // do the work here to deal with the payload

    // callback when complete
    callback(null, data);
  });

Then you can start adding items to the queue (usually on an event, such as an incoming API call):

  q.push({ a:1, b:2});
  q.push({ a:2, b:3});
  q.push({ a:3, b:4});
  .
  .
  .

Our queue function will be called with each payload, in turn. Crucially, we can control the concurrency of the queue by either passing a second parameter to async.queue when we create the queue, or setting q.concurrency after the fact.

  // deal with up to five payloads at any one time
  q.concurrency = 5;

This simple concurrency control allows the code generating the work (q.push(...)) to stay the same, but you can add more queue workers by altering the value of concurrency to suit.

  • This queue is on a single node. Adding concurrency is only coordinating work on that node.
  • As JavaScript is single-threaded, your workers are not really running on different CPU cores. They are sharing CPU cycles on a single core.
  • If your Node.js process crashes, your queue items are lost.
  • The queue size is limited by the memory size of your single node.
  • This model doesn’t separate producers and consumers onto separate machines, but it does show how the producer/queue/consumer model can be adopted inside your application as well as between your microservices. Building your application this way makes it easier to move to an external queue at a later date.

Redis

Redis is an in-memory database which has two modes that are of interest to a microservices architect:

  • Queues. You can use Redis’s list data type as a queue of payload data.
  • PubSub. You can use Redis’s PubSub feature to let several clients consume the same stream of data.

Redis Queues

Adding an item to a Redis list is as simple as using the LPUSH command to push an item to the ‘left’ side of a list:


127.0.0.1:6379> LPUSH myqueue '{"a":1,"b":2}'
(integer) 1
127.0.0.1:6379> LPUSH myqueue '{"a":2,"b":3}'
(integer) 2
127.0.0.1:6379> LPUSH myqueue '{"a":3,"b":4}'
(integer) 3

and then the consumer uses RPOP to retrieve the right-most entry from the list:


127.0.0.1:6379> RPOP myqueue
"{"a":1,"b":2}"
127.0.0.1:6379> RPOP myqueue
"{"a":2,"b":3}"
127.0.0.1:6379> RPOP myqueue
"{"a":3,"b":4}"
127.0.0.1:6379> RPOP myqueue
(nil)

Using this mechanism, you can have as many producers (LPUSHing data in the queue) and as many consumers (RPOPping data from the queue).

  • Redis is an in-memory database. It is only flushed to disk periodically, so a server crash is likely to lose data
  • Although in its clustered form, data is distributed around the machines in the cluster, a single key belongs on one node, so our entire queue would live on a single server

Redis PubSub

You can use Redis PubSub to let several consumers receive notifications of data arriving on a channel they are listening to. Data is pushed from Redis to the clients.

Firstly, one or more clients connects to a channel and waits to be notified of any arriving data.


127.0.0.1:6379> SUBSCRIBE mychannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "mychannel"
3) (integer) 1

Then the producers can start generating data:


127.0.0.1:6379> PUBLISH mychannel '{"a":1,"b":2}'
(integer) 1

and all the consumers see the data arrive immediately:


1) "message"
2) "mychannel"
3) "{"a":1,"b":2}"
  • All subscribers to the channel receive each every message.
  • If the subscribers are offline at the time the message is published, they miss it. There is no means of replaying the messages in a channel.

RabbitMQ

RabbitMQ is a robust messaging application used in wide variety of production systems. You can set it up to speak a number of protocols (AMQP, STOMP, MQTT) and configure it in a number of ways.

RabbitMQ Work Queue

When behaving like a Work Queue, items are distributed to the consumers of the queue in round-robin fashion. A single consumer would receive all the messages in a queue, two consumers would share the workload 50/50, and so on. This makes it very simple to scale out asynchronous workloads by adding more workers to deal with the throughput of data.

Our producer simply pushes payloads of data onto a queue called queueevents using PUSH mode from the rabbit.js Node.js module:

var context = require('rabbit.js').createContext();
context.on('ready', function() {
  var pub = context.socket('PUSH');
  pub.connect('queueevents', function() {
    var i = 0;
    setInterval(function() {
      var obj = {a: i++, b:true};
      pub.write(JSON.stringify(obj), 'utf8');
      console.log("Written", obj);
    },100);
  });
});

which generates the following output:

Written { a: 0, b: true }
Written { a: 1, b: true }
Written { a: 2, b: true }
Written { a: 3, b: true }
Written { a: 4, b: true }
Written { a: 5, b: true }
Written { a: 6, b: true }
Written { a: 7, b: true }
Written { a: 8, b: true }
Written { a: 9, b: true }

We can then run two consumer scripts which wait for data arriving on the queueevents channel by connecting in WORKER mode:

var context = require('rabbit.js').createContext();
context.on('ready', function() {
  var sub = context.socket('WORKER', {prefetch: 1});
  sub.on('data', function(payload) {
    console.log("!", JSON.parse(payload.toString()));
    // simulate our work taking 50ms to complete
    setTimeout(function() {
      sub.ack();
    }, 50);
  });
  sub.connect('queueevents', function(e) {
  });
});

and the work is distributed evenly:

-- worker 1
! { a: 0, b: true }
! { a: 2, b: true }
! { a: 4, b: true }
! { a: 6, b: true }
! { a: 8, b: true }

-- worker 2
! { a: 1, b: true }
! { a: 3, b: true }
! { a: 5, b: true }
! { a: 7, b: true }
! { a: 9, b: true }
  • Even if there are no consumers running, the queue simply builds up inside RabbitMQ.
  • By default, RabbitMQ loses data if the broker is restarted. To perist the data on disk, you must set up the Queues to be durable. Even then, data loss is possible if RabbitMQ is terminated before data is fsynced to disk.
  • In this example, each worker only receives one item from the queue at a time ({prefetch: 1}). This number can be increased, but this Node.js library doesn’t have a clean way of acknowledging each message.

RabbitMQ PubSub

If you want several consumers to receive the same payload, then engage RabbitMQ’s PubSub mode. Using the rabbit.js Node.js module again, we can write data using the PUBLISH mode into a queue called pubsubevents:

var context = require('rabbit.js').createContext();
context.on('ready', function() {
  var pub = context.socket('PUBLISH');
  pub.connect('pubsubevents', function() {
    var i = 0;
    setInterval(function() {
      var obj = {a: i++, b:true};
      pub.write(JSON.stringify(obj), 'utf8');
      console.log("Written", obj);
    },100);
  });
});

which produces the following output:

Written { a: 0, b: true }
Written { a: 1, b: true }
Written { a: 2, b: true }
Written { a: 3, b: true }
Written { a: 4, b: true }

And any number of consumer processes can listen to the stream of events arriving on the pubsubevents channel by listening in SUBSCRIBE mode:

var context = require('rabbit.js').createContext();
context.on('ready', function() {
  var sub = context.socket('SUBSCRIBE');
  sub.on('data', function(payload) {
    console.log("!", JSON.parse(payload.toString()));
  });
  sub.connect('pubsubevents', function(e) {
  });
});
-- worker 1
! { a: 0, b: true }
! { a: 1, b: true }
! { a: 2, b: true }
! { a: 3, b: true }
! { a: 4, b: true }

-- worker 2
! { a: 0, b: true }
! { a: 1, b: true }
! { a: 2, b: true }
! { a: 3, b: true }
! { a: 4, b: true }
  • All subscribers to the channel receive every message.
  • If the subscribers are offline at the time the message is published, then they miss it. There is no means of replaying the messages in a channel.

IBM Message Hub (Apache Kafka)

Apache Kafka (run as-a-service as IBM Message Hub) is a scalable high-performance queue, message log, and pubsub broker. It can behave as a queue or a pubsub broker depending on how you configure the consumer clients:

  • Clients connecting with the same consumer group share the messages between them (like a work queue).
  • All clients with different consumer groups get all the messages (like PubSub).

In this case, both Queue and PubSub examples have the same producer code:

var cfenv = require('cfenv');
var appEnv = cfenv.getAppEnv();
var MessageHub = require('message-hub-rest');
var hub = new MessageHub(appEnv.services);
var instance = new MessageHub(appEnv.services);
var topicName = 'mytopic';
var i =0;

instance.topics.create(topicName)
  .then(function(response) {
    console.log("generating data");
    setInterval(function() {
      var obj = {a: i++, b: true};
      console.log("Written", obj);
      var list = new MessageHub.MessageList([
        JSON.stringify(obj)
      ]);
      instance.produce(topicName, list.messages);
    }, 100);
  })
  .fail(function(error) {
    throw new Error(error);
  });

Apache Kafka – Queue

To behave like a queue, multiple worker processes need to share the same consumer group:

var cfenv = require('cfenv');
var appEnv = cfenv.getAppEnv();
var MessageHub = require('message-hub-rest');
var instance = new MessageHub(appEnv.services);
var consumerInstance;
var topicName = 'mytopic';

var waitforResponses = function() {
  var receivedMessages = 0;
  var consumer = consumerInstance.get(topicName)
    .then(function(data) {
        for(var i in data) {
          console.log("!", JSON.parse(data[i]));
        }
      })
      .fail(function(error) {
        throw new Error(error);
      });
}

instance.topics.create(topicName)
  .then(function(response) {
      return instance.consume('my_consumer_group', 'my_consumer_instance', { 'auto.offset.reset': 'largest' });
  })
  .then(function(response) {
    consumerInstance = response[0];
    waitforResponses();
  })
  .fail(function(error) {
    throw new Error(error);
  });

It is important to note tha Apache Kafka workers are not pushed data from the server, they pull data from server with the intention that clients pull more than a single payload in one request. Kafka is designed to be able to deliver large quantities of streaming data in the order it was published to multiple consumers.

Apache Kafka – PubSub

The code for PubSub is identical, except that each client should pass in its own unique consumer group (like myconsumer_group_1, my_consumer_group_2 and so on) to ensure that they all receive copies of the data.

A question of scale

So which queue technology should you choose? The simple answer is to think about the scale of queue data your application is going to handle. This article presented these technologies (simplistically) in “scale order”, from programmatic queuing inside of your application up to a multi-node Apache Kafka cluster capable of handling millions of writes per second. This list also happens to be in “resilience order”; a crash in the Async or Redis solutions WILL cause the loss of data, RabbitMQ may lose data in certain cirumstances, and Apache Kafka is very unlikely to. But Apache Kafka is a complicated piece of software to install and manage, which is why it’s easier to get started with a hosted solution like IBM Message Hub.

The features that each broker offers are also subtly different. Kafka is the only one of the technologies present here which can do PubSub AND Worker queuing on the same queue, for example. RabbitMQ pushes payloads to connected clients, but Kafka clients poll to fetch data. RabbitMQ is long-standing product with excellent plugin support, and your choice of product may be swayed by its support for the MQTT protocol for your Internet of Things application.

Breaking your application into small independent functional blocks (microservices) that do one job very well, is good programming practice. It simplifies testing and lets your application scale easily by adding a message queue into your stack to marshall the work queues when the need arises. You don’t need a complex infrastructure to get started with microservices. A simple queue lets you separate your producer code from your consumer code.

The good news is that you may try out all of these technologies for free. Redis and RabbitMQ are available as part of IBM’s Compose.io offerring. Simply sign up for a free trial and spin up a queue or two. IBM Message Hub is in Beta and free to try inside Bluemix.

2 comments on"Get in line! An Intro to Queues and PubSub"

  1. […] previous post, Get in line! An intro to Queues and PubSub, covers this in detail, but here's a quick […]

  2. […] previous posts on microservices, we’ve explored how you can use queues and pubsub channels to broker data between the producers of the information and the consumers of it, and also […]

Join The Discussion

Your email address will not be published. Required fields are marked *