In previous posts on microservices, we’ve explored how you can use queues and pubsub channels to broker data between the producers of the information and the consumers of it, and also how to scale a web app using Microservices.

I’ve also explained how to convert the Metrics Collector app from a monolithic codebase, to a set of composable microservices that can be attached to streams of data.

Metrics Collector schematic

The advantages of microservices are:

  • the producer and consumer services are decoupled from each other
  • you can add producers and consumers at will, even adding multiple consumers to the same data stream for storage, analytics, and visualisation
  • each microservice is a self-contained block of code that has well-defined inputs and outputs
  • you can distribute the workload of writing microservices around a development team. Although they consume the same data, you can use different programming languages for each service, depending on the skill set available

In the last blog post we set up the Metrics Collector Microservice to write to either a Redis, RabbitMQ, or Apache Kafka queue or pubsub channel. We also added multiple instances of the Metrics Collector Storage Microservice to consume the data, writing it to either Cloudant, MongoDB, or Elasticsearch, depending on runtime configuration.

In this post, we’ll look at adding the Metrics Collector Aggregation Microservice to the same data stream, and show how you can use it to perform simple streaming analytics on the web metrics data channel it’s connected to.

Metrics Collector Aggregation Microservice

The Metrics Collector Aggregation Microservice, like the other microservices I have written, uses a plugin architecture and can be configured at runtime to read its data from any of the following sources:

  • redis_queue, a Redis list data structure
  • redis_pubsub, a Redis PubSub channel
  • rabbit_queue, a RabbitMQ PUSH/WORKER queue
  • rabbit_pubsub, a RabbitMQ PUBLISH/SUBSCRIBE channel
  • kafka, an Apache Kafka topic

It also has collection of aggregation plugins that summarise the data according to runtime instructions:

  • count count incoming data messages
  • sum sum a single element of the message
  • stats calculate stats on a single element of the message
  • countdistinct count numbers of distinct values of a single element of the message
  • customstats calculate stats on a value returned by a supplied function

By default, the Aggregation Microservice uses the null aggregator which does nothing: we must use the service’s API to instruct it to aggregate.

Installing the aggregator

Assuming you’ve got the Metrics Collector Microservice running and generating data to a Redis pubsub channel, then you can install the Metrics Collector Aggregation Microservice with the following calls:

    git clone https://github.com/ibm-cds-labs/metrics-collector-aggregation-microservice
    cd metrics-collector-aggregation-microservice
    npm install

and run it with:

    export QUEUE_TYPE=redis_pubsub
    node app.js

The app tells you that it’s running:

    Aggregator type: null
    Queue mode: redis_pubsub
    Connecting to Redis server on localhost/6379
    App starting on http://localhost:6019
    Subscribed to PubSub channel mcpubsub

Make a note of the host and port that the app is running on, in this case http://localhost:6019. You can substitute redis_pubsub with redis_queue, rabbit_queue, rabbit_pubsub or kafka to point the microservice to a different data source.

Counting things

We can instruct the app to start aggregating by calling its /configure API endpoint:

    > curl 'http://localhost:6019/configure?mode=count'
    {"ok":true,"mode":"count"}

Each item of data that arrives from the pubsub channel is counted. We can query the current count at any time using this service’s API:

    > curl 'http://localhost:6019/query'
    {"ok":true,"err":null,"data":110}

We can also reset any of the aggregators back to zero by calling the /reset endpoint:

    > curl 'http://localhost:6019/reset'
    {"ok":true}
    > curl 'http://localhost:6019/query'
    {"ok":true,"err":null,"data":0}

Summing things

Imagine our source data looks like this:

{ a: 1, b: 2, c: 14753, d: 'rat', ip: '::1' }

Instead of just counting we can accumulate the sum of an individual value:

    > curl 'http://localhost:6019/configure?mode=sum&selector=c'
    {"ok":true,"mode":"sum","selector":"c"}
    > curl 'http://localhost:6019/query'
    {"ok":true,"err":null,"data":4189963}

Averages, totals, and standard deviations

In order to collect stats about a single value, we can use the stats aggregator:

    > curl 'http://localhost:6019/configure?mode=stats&selector=c'
    {"ok":true,"mode":"stats","selector":"c"}
    > curl 'http://localhost:6019/query'
    {"ok":true,"err":null,"data":{"sum":8635955,"count":545,"min":26,"max":32689,"sumsqr":685130625}}

We can calculate the average, variance, and standard deviation of c using the values returned by the stats aggregator.

Counting distinct values

We can count the numbers of distinct values in the data stream with the countdistinct aggregator:

    > curl 'http://localhost:6019/configure?mode=countdistinct&selector=d'
    {"ok":true,"mode":"countdistinct","selector":"d"}
    > curl 'http://localhost:6019/query'   {"ok":true,"err":null,"data":{"donkey":44,"robin":38,"cat":49,"dog":39,"rat":44,"cow":37,"crab":45,"chicken":32,"ant":33,"fox":38,"squirrel":36,"wolf":34,"gerbil":43}}

Custom code

We can also provide a JavaScript function that receives each message that arrives and whose return value is sent to the stats aggregator. We can write a custom function like this:

function(doc) {
  return (doc.c>20000) ? doc.c : null;
}

which returns c from a each message only if its value exceeds 20000.

We can send the custom function to the Aggregator Microservice like this:

    > curl 'http://localhost:6019/configure?mode=customstats&selector=function(doc)%7B%20return%20(doc.c>20000)%3F%20doc.c%20:%20null%7D'
    {"ok":true,"mode":"customstats","selector":"function(doc){ return (doc.c>20000)? doc.c : null}"}
    > curl 'http://localhost:6019/query'
    {"ok":true,"err":null,"data":{"sum":8385261,"count":315,"min":20063,"max":32726,"sumsqr":553378576}}

Conclusion

Using a PubSub channel connected to our data producer service (the Metrics Collector Microservice), we can attach multiple microservices to the channel to consume the data. The Metrics Collector Storage Microservice writes data to a choice of JSON data stores, and the Metrics Collector Aggregation Microservice performs simple streaming analytics, controlled by an HTTP API.

Combining microservices like this makes it easy for your app to grow and handle a variety of jobs. Multiple producers add scale and resilience. Multiple consumer microservices let you store data in multiple databases and analyse the data in different ways. When you want your app to perform a new trick, it’s easy to introduce an additional microservice into the mix.

Join The Discussion

Your email address will not be published. Required fields are marked *