The Java Application API allows streaming applications to be written in Java for IBM Streams. Tuples on a stream can be any Java object that is serializable. A stream, represented by the interface TStream, is processed using a functional programming style, when a function transforms the stream by being called on each tuple, and the returned value drives the contents of the new stream.

Let’s dive into an example to demonstrate the concept. We will write an application that plays Fizz Buzz.

An application is represented by an instance of Topology, which will contain all the streams declared by the Java code.

/**
 * Entry point for a streaming Fizz Buzz!
 */
 public static void main(String[] args) throws Exception {
     Topology topology = new Topology();

A streaming application starts with a source stream (typically multiple source streams) that bring external data into the application to be processed, for example a source stream could be live transit data from nextbus.com. Topology provides a number of methods to declare source streams using an application provided function, but here we will use a utility method that provides a source stream of Java Long values, a TStream<Long>.

// Declare an infinite stream of Long values
TStream<Long> counting = BeaconStreams.longBeacon(topology);

Typically it is assumed that streams are an infinite sequence of tuples, so streaming applications run forever, for example, analyzing traffic in a city is an application that needs to be running 24/7, the external data sources never stop producing data (bus & taxi positions, speed sensors, traffic cameras etc.).

To play Fizz Buzz we transform the stream of long values into a stream of strings using a function that implements the rules of Fizz Buzz. When the topology is executed the apply() method will be called once for each tuple on stream ‘counting‘, that is for each number. The return of apply, a String, will be submitted as a tuple on the output stream ‘shouts

        /*
         * Transform an input stream of longs TStream to a stream of strings TStream that follow
         * the Fizz Buzz rules based upon each value in the input stream.
         */
        TStream<String> shouts = counting.transform(new Function<Long,String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String apply(Long v) {
                // Skip 0, humans count from 1!
                if (v == 0)
                    return null;
                
                StringBuilder sb = new StringBuilder();
                if (v % 3 == 0)
                    sb.append("Fizz");
                if (v % 5 == 0)
                    sb.append("Buzz");
                
                if (sb.length() == 0)
                    sb.append(Long.toString(v));
                else
                    sb.append("!");
                
                return sb.toString();
            }}, String.class);

The function is provided by an instance of the class Function, in this case as an anonymous class. The TStream.transform() method declares that the Function’s apply() method will be called for each tuple on the input streaming counting, and the returned value will be a tuple on the output stream shouts. Note, for clarity or reuse the implementation of Function may be a regular class instead of an anonymous class.

Finally we print the tuples to standard out to see the “shouts” following the rules of Fizz Buzz (the playFizzBuzz() method contains the code just described and returns the TStream shouts) .

// Print the tuples to standard output
playFizzBuzz(counting).print();

Streaming applications can provide a variety of outputs depending on the purpose of the analytics in the application, in the city traffic example outputs may be control signals to traffic lights and dashboard instructions to bus drivers to slow down to stop buses bunching up, a telco application may detect fraud and send messages to disable a phone.

At this point the streaming application is not running, we have just declared the topology. The application is started by submitting to a StreamsContext.

Here we submit it to the EMBEDDED context, that runs the topology within the current Java virtual machine. This is useful for testing that the application logic is correct.

// Since this is an streaming graph with an endless
// data source it will run for ever
Future<?> runningTopology = StreamsContextFactory.getEmbedded().submit(topology);

When we execute the Java application, the streaming topology is declared and then executed, resulting in this initial output (the application runs for a minute, counting to around 600):

1
2
Fizz!
4
Buzz!
Fizz!
7
8
Fizz!
Buzz!
11
Fizz!
13
14
FizzBuzz!
16

The DISTRIBUTED context allows execution of the Java streaming application (instance of Topology) in distributed mode in an IBM Streams instance. In this case the application will run as an IBM Streams job, and can take advantage of the functionality supported by IBM Streams, including distribution across multiple hosts and automatic restart of failed elements.

The complete code for the Fizz Buzz streaming application is available here.

Summary

This has been a quick introduction to the Java Application API, for more visit http://ibmstreams.github.io/streamsx.topology/ including a getting started guide,  documentation and sample applications.  Much more functionality exists than demonstrated here, such as filtering, parallelism, integration with SPL and JSON. The API is currently in beta as we look for feedback and add support for more functionality available in IBM Streams.

Coming soon there will be more articles on Streamsdev, including how using Java 8 (supported in IBM Streams 4.0.1) simplifies functional programming.

3 comments on"Java Application API — An Introduction"

  1. Hi friends, I am very new to streams .. Not sure from where to start and what to start.. can some one guide me how can i proceed to learning the streams .. thnx

  2. angst2000 July 05, 2016

    I tried to run this in Eclipse but I am getting a NullPointerException.

Join The Discussion