Hourly Moving Average - Streams makes it simple

 View Only

Hourly Moving Average - Streams makes it simple 

Tue August 18, 2020 06:30 PM

In a recent discussion around using Streams, the following use case was considered problematic for an existing system. Given a set of devices that produce metrics, calculate the hourly moving average of the metric per device. In Streams this is very simple and a sample application took about 15 minutes to construct. It took a bit longer (another 30 minutes) to add data generation and validation and then test the application. In under an hour with very little programming what was considered a major problem was solved.

To solve the problem there are the following considerations:

  • separate devices — each device needs to produce its own hourly moving average
  • calculate only using the previous hours values — only the last 60 minutes of values should be used

Calculating the average is easy, getting the right set of values for the calculation can be harder.

To do this in Streams use the Aggregate operator. This operator was built to make these type of problems simple. Like the name suggests it aggregates values and perform analyitcs on those aggregations.

Calculate only using the previous hours values — the Streams Window concept handles this problem. Using a sliding window with a time based eviction policy (1 hour = 3600 seconds) creates a window that will evict any data that is older than the policy value. Setting a trigger policy of 1 causes the window calculations to occur on every tuple arriving. The window clause below creates this.

window
       deviceMetrics : sliding, time(3600), count(1), partitioned ;


Separate the devices
— windows can be partitioned. allowing you to specify how the data will be partitioned into separate “sub-windows”. In this case we want to partition by an input stream attribute deviceID. The best part of this is that anytime a new device id arrives a new sub-window is automatically created. If new devices come on line, you do not need to update or restart the application. This is a parameter on the Aggregate operator.

param 
      partitionBy : deviceID ;

We now have the data ready, each partition contains one hour of values, all we need to do is calculate the average for the partition. Aggregate supports an Average output function which will calculate the average for the window and since the window only contains the previous hour of data for a single device, we have the problem solved.

Here is what the operator looks like:

(stream<rstring deviceID, float64 metric, float64 hourlyMovingAvg,
         int32 count> hourlyAvgByDeviceMetric) as Aggregate_2 =
            Aggregate(deviceMetrics)
        {
            window
                deviceMetrics : sliding, time(3600), count(1), partitioned ;
            param
                partitionBy : deviceID ;
            output
                hourlyAvgByDeviceMetric : hourlyMovingAvg = Average(metric), count = Count() ;
        }

I added count to the output stream and used another output function Count() to output the number of tuples in the window — in this case it should be 3600 (although you may periodically see 3599 depending upon the exact timing of the tuples).

You can then add a downstream operator from Aggregate to determine if the metric deviates too much from the moving average and should trigger an action.

Testing the operator

Now some people think it is a good practice to test your code so we need to do a couple of things:
1. generate device metric data for more than one device (otherwise how will we know partitioning is working)
2. generate sample metric values that are easy to validate the calculation is correct.
3. examine/admire the output to make sure it is working

This is another use case where Streams makes hard things very easy. Beacon operators can generate tuples for testing. We need several devices (10), need to generate simple checkable metric values, and output a new tuple every second (so each window should have 3600 if it contains an hours worth of data).
Here is a simple beacon to do this:

(stream<rstring deviceID, float64 metric> deviceMetrics) as Beacon_1 =
            Beacon()
        {
            logic
                state :
                {
                    mutable int32 i = 0 ;
                }

            param
                period : 1.0 ;
            output
                deviceMetrics : deviceID = "dev1",
                    metric =(float64)(i ++) ;
        }

This will produce the following output:
dev1,0
dev1,1
dev1,2
etc...

Now we want 10 devices, and once again Streams makes this simple. We will use an @parallel annotation to create 10 beacon operators and with a small code change to the beacon append a channel number to each device creating a unique id for all 10 devices. If you want a different number of devices change the width.

@parallel(width=10)
(stream<rstring deviceID, float64 metric> deviceMetrics) as Beacon_1 =
            Beacon()
        {
            logic
                state :
                {
                    mutable int32 i = 0 ;
                }

            param
                period : 1.0 ;
            output
                deviceMetrics : 
                    deviceID = "dev" +(rstring) getChannel(),
                    metric =(float64)(i ++) ;
        }

Last thing is how to admire the data and validate it. Typically I use a custom operator for this. Initially I do not add any logic to the operator and simply use the Streams Studio Show Data feature to watch the tuples flowing. Once I have corrected any functional errors I add validation logic. In this case each average is the midpoint of the series (that is why I used a simple incrementing series in the beacon because I can instantly calculate the average for that based only on the last metric(n-1800+0.5). Here is some show data results after I let this run for a while:

Adding more Analytics

Need to calculate standard deviation? It is simple as adding another output attribute and using the SampleStdDeviation output function. Refer to the Aggregate operator documentation for list of available output functions.

If the function you need is not there, Streams allows you to write your own. Streams handles all the hard parts of this problem for you. Next week I will show how to add a custom output function to Aggregate.

Here is the whole SPL program. This took about 45 minutes in total. It was the weekend so I was a little slower than I may have been on a Wednesday morning.

namespace application ;

composite movingAverage
{
    graph
        @parallel(width = 10)
        (stream<rstring deviceID, float64 metric> deviceMetrics) as Beacon_1 =
            Beacon()
        {
            logic
                state :
                {
                    mutable int32 i = 0 ;
                }

            param
                period : 1.0 ;
            output
                deviceMetrics : deviceID = "dev" +(rstring) getChannel(),
                    metric =(float64)(i ++) ;
        }

        (stream<rstring deviceID, float64 metric, float64 hourlyMovingAvg,
         int32 count> hourlyAvgByDeviceMetric) as Aggregate_2 =
            Aggregate(deviceMetrics)
        {
            window
                deviceMetrics : sliding, time(3600), count(1), partitioned ;
            param
                partitionBy : deviceID ;
            output
                hourlyAvgByDeviceMetric : hourlyMovingAvg = Average(metric), count = Count() ;
        }

        () as Custom_3 = Custom(hourlyAvgByDeviceMetric as inputStream)
        {
            logic
                onTuple inputStream :
                {
                int32 x = count/2;
                mutable float64 avg = metric - (float64)x;
                if (count%2 == 0)
                   avg = avg +0.5;
                
                
                if (avg != hourlyMovingAvg) {
                    appLog(Log.error,"average " + (rstring)hourlyMovingAvg + " is wrong for " + (rstring)metric + " on device " + deviceID);
                }
                

                }

        }

}

#CloudPakforDataGroup

Statistics

0 Favorited
13 Views
0 Files
0 Shares
0 Downloads