Streams provides a dynamic connection mechanism that allows you to create connections across applications. This mechanism  jobs to export, or publish, a stream of data, and for other jobs to consume that data by subscribing to the exported stream.  The operators for exporting and importing streams of data are called Export and Import, respectively. This article will show you how to add logic to your application to change  connections between importers and exporters without having to restart either application.

Import/Export Review

The Export and Import operators were introduced in this article.  To review,  a connection between an importer and an exporter can be based on a matching stream name, or based on a match between properties specified by the exported stream and the importing stream.

Using properties based Import and Export typically follows this pattern:

Export:
1   stream<MentionType> mentions = Custom(Tweets){
2       //search for tweets with #iPhone
3   }
4   () as exporter = Export(mentions) {
5      param
6        properties: {topic="#iPhone"};
7   }

 

Import:
1   stream<MentionsType> Imported = Import() {
2    param 
3      subscription: topic == "#iPhone";
4   } 

 

This application exports a stream of tweets that contain mentions of #iPhone.  The Import operator connects to the stream published by the Export operator based on matching values for the properties parameter of Export and subscription parameter of Import.

 

Changing connections at runtime

The pattern shown above works well to establish connections based on a static property. However, consider this extension to the above scenario:  Instead of only producing a stream of tweets that match #iPhone, we want to filter for whatever is currently trending on Twitter, and update the exported stream’s properties accordingly to indicate that the data being produced has changed.  This way, downstream importers interested in that topic could connect to the new stream.  So, we need a way to change the properties of the Export operator  at runtime.

Streams provides functions to do just that.  You can change the properties of an exported stream or the subscription of the Import operator using the setOutputPortExportProperties and setInputPortImportSubscription functions, respectively.

Using the functions

While it would be reasonable to think these functions would be called from somewhere in the Export or Import operator, that’s not correct, for a subtle reason.  Although they look like operators in Streams Studio and in SPL, they are actually not an operator from the Streams runtime’s viewpoint.  So, if you are using  setOutputPortExportProperties to change the properties of the Export operator, the call needs to happen from the operator that produces the stream to be exported.  Similarly, for the Import operator, the  setInputPortImportSubscription function must be called from the operator  that consumes the imported stream.

Whenever either of these functions is called, any new or existing jobs that are interested in the new topics will automatically connect to the exported stream, without any manual intervention required.  Also, any existing importers that were subscribed to the old topic will no longer be connected to the stream.  However, because of timing issues involved in resetting the connections, it’s possible to briefly get tuples sent matching the new pattern based on a subscription using the old pattern.

Updating the Export properties

Going back to the Twitter example, whenever the trending topic changes, we will need to call setOutputPortExportProperties to update the new topic.  Here is our updated exporter:

1       stream<MentionsType> mentions = Custom(Tweets;NewTopic) {
2            logic state: {
3                mutable rstring current_pattern = "#iPhone";
4            }
5            onTuple NewTopic: {
6               current_pattern = pattern;
7               setOutputPortExportProperties({topic=pattern},0u);
8            }
9            onTuple Tweets: {
10                //search for tweets matching the current_pattern
11            }
12
13         }
14            
15
16      () as exporter = Export(mentions) {
17           param 
18              properties: {topic = "#iPhone"};
19       }

First, note that we added an input port to the Custom so that it can receive new topics of interest based on what is trending (Lines 1 and 5-8).  When a new hot topic arrives, we call setOutputPortExportProperties from the Custom to update the topic for the exported stream (Line 7). Calling this will override what was set for the properties parameter at line 15.

In the next section, we will look at the changing the subscription when using the Import operator.

Updating Import subscription at runtime

Let’s look at an example of how to adjust the subscriptions at runtime using setInputPortImportSubscription. Consider the static subscription case, where we just import and write to a file:

1 stream<MentionsType> ImportedTweets = Import() {
2   param 
3     subscription: topic == "#iPhone";
4  }
5 () as sink = FileSink(ImportedTweets) {
6   param 
7    file: "importiPhone.txt";
8  }

Now let’s say we want to broaden our subscription at runtime to include any hash tags that occur in common with #iPhone. That is, if a tweet Love Siri! #iPhone #siri you’d want to change subscription to be topic == "#iPhone" || topic == "#siri". Here, you could use setInputPortImportSubscription to update the subscription as needed.  As was mentioned before, the function would  not be called directly from Import, but from the operator getting the imported stream, in this case, FileSink.

Thus, it’d look like this:

1 () as sink = FileSink(ImportedTweets) {
2        onTuple ImportedTweets: {
3            //build a list of additional tags that occur alongside iPhone 
4             // update the import subscription to include those topics too
5            //..
6            //.. 
7             subscriptionString += " || topic==\""+newTag+"\"";
8             setInputPortImportSubscription(subscriptionString,0u);
9       } 
10      param
11       file: "updateSubscription.txt"; 
12 }

Import filter

The subscription given by an Import is matched against the properties of an Export, and it does not consider what’s actually in the tuple itself. Sometimes, though, you only want to examine some of the tuples on a stream. Let’s say that your app only wants to examine retweets, so you want to only get tuples containing the token RT.  Instead of filtering the data after the import to drop those tuples, use the filter parameter of the Import operator:

stream<list<rstring> tokens> TokenizedTweets = Import() {
   param 
         subscription: topic == "#iPhone" || topic == "#siri";
         filter: "RT" in tokens;
}

Again, this filter can be updated at runtime, using the setInputPortFilterExpression function in the operator downstream from the Filter.

 

Sample application

There’s a sample application on GitHub demonstrating some of these features.

The application has two parts, and importer and an exporter.   The exporter checks sample tweet for mentions of the current topic of interest, and exports all matching tweets.  Initially, the topic is “#iPhone”,  but whenever the topic of interest changes, the exporter updates its properties accordingly.

The import part of the application has 3 importers.  One subscribes to  #android, another to #iPhone. A third importer has an initial subscription of #iPhone that updates its subscription to include tags that are related to #iPhone when they arrive. It will update the topic  to #iPhone" and "#siri" once the first message with both #siri and #iPhone arrives.

Each import will only be connected to the exported stream when the current trending topic is the same as its subscription topic.  The graphs below demonstrate how the connections between importers and exporters change as the application runs:

Initially, the iPhone importer and the iPhone and related import are both connected to the Export:

 

When the exporter changes the trending  topic to #android, the two connections are dropped, and the exported stream is connected to the android sink:

Finally, the topic changes to #siri. Only the iPhoneAndRelated importer is now connected:

Primitive operator support

This post has shown change the subscriptions, properties, and filter from SPL. Similar changes can be made from primitive operators. There are equivalents to setInputPortImportSubscription, setInputPortFilterExpression, and setOutputPortExportProperties in the C++ primitive operator API.  Though the Java operator support is not as complete, it does have an equivalent to setOutputPortExportProperties.

Happy Streaming!

Join The Discussion