SPL supports the concept of windows, the ability to perform some processing on a window of data. A stream is an infinite sequence of data (as tuples), so to perform certain operations, a subset of the data must be used. For example, calculating the maximum value of or sorting an infinite stream would have to wait an infinite amount of time to see all the data. SPL provides the ability to process subsets of streams though windows.

Operators perform processing against the set of tuples in its window, such as sorting all the tuples in the window and then emitting the tuples in the sort order.

SPL has powerful definitions for defining a window’s characteristics. A single window definition is applied to an input port and thus defines a window of tuples on that input port. There are two types of windows, tumbling and sliding.¬†In this post I will provide an introduction to tumbling windows, sliding windows will be a subsequent post, with follow on posts describing more window functionality.

Tumbling Windows

Tuples are inserted into a input port’s tumbling window until the window is full, with its¬†eviction policy definingfull. Once the tumbling window is full, all of its tuples are evicted, that is it “tumbles“. Subsequent tuples that arrive at the input port are inserted into a new window and the cycle starts again. Note that for a tumbling window, a tuple is only a member of a single window eviction, the one it was inserted into, once it is evicted it is not seen again in any future evictions.

Typically the eviction processing logically performs some operation or aggregation against the set of eviction tuples, the tumbled window, and submits tuples that represent the required output, for example the Aggregate operator submits tuples that represents aggregations of the window, such as the sum of an an attribute’s value for all tuples in the tumbled window. As later articles will show, primitive operators interact with windows through a windowing library (in Java and C++) and can utilize windowing in a variety of ways, and thus are not limited to just submission of tuples upon eviction.

Tumbling windows support four types of eviction policies, but first let’s see a simple SPL example with a windowed input port.

SPL Example

Here’s an example of an invocation of Aggregate operator with a tumbling window, with an eviction policy of count(4):

 // Sum the cost attribute over a tumbling
 // window of four tuples
 stream<V> VSUM = Aggregate(V)
 {
     window V : tumbling, count(4) ;
     output VSUM: cost = Sum(cost);
 }

Count Eviction Policy

The eviction policy can be specified several ways, let’s start with the count-based eviction policy. An eviction policy count(N) states that the window is full when it has contains N tuples, thus logically when full it contains the last N tuples to arrive at the input port.

For example, with count(4), at every fourth input tuple, the window becomes full and then all the tuples are evicted, leaving the window empty, ready for the next four tuples. Let’s walk though the sequence of events:

Starting with the window being empty (light yellow rectangle, on the right), tuples are moving from left to right, so A will be the first to arrive at the input port:

tw01

After two tuples have arrived, the window will contain A and B:

tw02

Then after two additional tuples, for a total of four, the window contains four tuples A,B,C & D and is full, according to its eviction policy of count(4), note the tuples within the window are ordered by insertion (arrival):

tw03

Now, as part of the insertion processing for tuple D, the window has become full, and so as a tumbling window it evicts all tuples to become empty again:

tw04

As a result of the eviction, the Aggregate operator will perform its aggregation of {A,B,C,D} and submit output tuples. In our simple case, the operator VSUM would emit a single tuple with the cost attribute set to the sum of the cost attributes for A,B,C & D.

Note that while processing is performed against the logical set of tuples in the window, many algorithms can be implemented incrementally, thus being updated as each tuple is inserted into the window, in this case Sum(cost) is updated for each input tuple. Such incremental algorithms are ideal for a streaming application to reduce latency and memory cost, as there is no need to keep all the tuples in the window, just the partial calculation.

Time Eviction Policy

An eviction policy time(T) states the tumbling window is full when T seconds have elapsed since the window last tumbled. So with a eviction policy of time(5),  every five seconds the window is defined to become full and all  tuples within it are evicted. Thus at eviction, the window may contain any number of tuples, namely all those that have arrived at the input port in the last five seconds, including the possibility the window is empty.

Window Punctuation Eviction Policy

An eviction policy punct() states that the tumbling window is full when a window punctuation mark arrives at the input port. On arrival of the window punctuation mark the window becomes full and and any tuples within it are evicted. This eviction policy allows window processing over a set of tuples defined by an operator upstream of the windowed  input port.

For example, the FileSource operator submits a window punctuation mark at the end of each file, so that a windowed operator using punct() can execute some function over the set of tuples that make up the contents of the file.

Operators that support windowing usually submit a window punctuation mark after they have tumbled, to indicate the set of tuples that resulted from processing the input windowed set of tuples. This mark is submitted regardless of the eviction policy, and allows downstream operators to perform punct() based window processing on the output set. For example, an operator may process a time based window and a downstream operator can process the time-based output using punct(), thus not requiring any time synchronization between the operators.

Delta Eviction Policy

The eviction policy delta(attribute, delta) defines the condition for tumbling window becoming full  in terms of the value of attribute in an input tuple, and the value of delta. With the eviction policy delta(temperature, 1.5), when a new tuple T arrives at the input port, the value of its temperature attribute  is compared with the  value of the temperature  attribute of the oldest tuple in the window, Toldest. If the difference in the values is greater than 1.5 (the delta) the tumbling window becomes full and its tuples are evicted. More formally, the window is full if this condition is true:

delta

With this example, then if the window contained tuples with temperature values of [17.1, 16.4,16.0] (newest to oldest) then the arrival of a tuple with temperature value 17.6 would tumble the window because (17.6 – 16.0) is greater than 1.5. Remember, though that the oldest tuple is used for the delta calculation, not the tuple with the lowest value, thus in the same situation if the window contained [17.1, 16.0, 16.4] then it would not tumble, since (17.6-16.4) is not greater than 1.5.

Note that tuple T that initiated the eviction is not part of the eviction, it is inserted into the (empty) window after the eviction.

If attribute has SPL type timestamp, then delta is a float64 representing seconds, and the difference in seconds between the timestamp values for attribute of ¬†T and Toldest is compared to delta. This is useful for calculating values over time windows defined by an event’s timestamp, and not real time (wall clock time).

Summary

SPL’s tumbling windows provide the ability to perform some function over a well-defined set of tuples on an input port. You may experiment with tumbling windows for aggregation using the Aggregate operator, or ordering using the Sort operator, both from the SPL Standard toolkit.

This has been a basic introduction to tumbling windows, subsequent posts will cover addition topics, such as window partitions, grouping and how to develop operators that support windowed input ports.

8 comments on"SPL Tumbling Windows Explained"

  1. Smarterkey May 07, 2014

    Great article about Streams window concept.
    Which eviction policy would you advise for streams with irregular peaks of tuples ?
    “Count” may leave a window “untumbled” waiting for enough tuples to fill the window.
    “Time” may lead to very large windows using expensive memory.
    So suppose “punct” would be best if data permits.
    Delta looks great for log files and for example tumbling by date.

    Assuming we have an Aggregator operator would you know how to make the value of delta dynamic such that its value is created by the previous tumbled windows output ?

    Been working on a project for sensor data and it is true that not all the data arrives in sequential “wall clock” time order, so this is cool for processing data based on the true event timestamp.

    Its kind of ironic that we use stream processing to “hold” data in windows for batch processing ūüôā

    • DanDebrunner May 09, 2014

      > Which eviction policy would you advise for streams with irregular peaks of tuples ?

      A useful trick is to have two stage aggregation, the output of one Aggregate operator invocation feeding another one, with different eviction policies. Exact details depend on what the functional requirements are, but a count based Aggregate feeding a time based Aggregate can handle irregular peaks, without incurring unlimited memory usage.

      • DanDebrunner May 09, 2014

        Also in some cases the windowing algorithm is incremental, so the window size is fixed and independent of the number of tuples logically in the window, e.g. count, min, max, sum etc.

  2. Been working on a project for sensor data and it is true that not all the data arrives in sequential “wall clock” time order, so this is cool for processing data based on the true event timestamp.

    Its kind of ironic that we use stream processing to “hold” data in windows for batch processing ūüôā

  3. Juan Camilo June 06, 2014

    Thank you very much for this tip. Greetings from Colombia ūüôā

  4. what do “sliding,count(0) means?

  5. DanDebrunner August 27, 2014

    Have you seen the next article in the series, which covers sliding windows?

    https://developer.ibm.com/streamsdev/2014/08/22/spl-sliding-windows-explained/

  6. AaqibMukhtar August 07, 2018

    Is there a way we can evict the data once a day. Lets suppose 11:59 PM every day.

Join The Discussion