SPL’s Import and Export pseudo-operators are extremely useful, but they aren’t right for every use case. In this post, I will try to help you decide when you should use them and when you shouldn’t. Rather than starting by describing Import and Export, I’m going to jump straight to an example of when they are more like “lightning” than “lightning bug”.

The difference between the right word and the almost right word is the difference between lightning and a lightning bug.

— Mark Twain

The application

Cecilia is working on a application to monitor social media traffic around the World Cup. She has an ingest composite, which pulls data in from twitter, and an analyze composite, which looks for mentions of her favorite team.

composite Main {
   graph 
   stream<MentionType> mentions = GetTwitterData() {
   // Pull world-cup related traffic
   }

   // Look for mentions of Cecilia's favorite team.  
   () as writeAnalysis = Analyze(mentions) {
       param team: "myteam";
   }
}

Cecilia has this running for a while, then she decided that she also wants to do some analysis on the the opposing team. So she cancels the job, and re-submits something that looks like this.

composite Main {
   graph 
   stream<MentionType> mentions = GetTwitterData() {
   // Pull world-cup related traffic
   }

   () as writeAnalysisMyTeam = Analyze(mentions) {
        param team: "myteam";
   }

   // Look for mentions of the other team 
   () as writeAnalysisOpposingTeam = Analyze(mentions) {
       param team: "otherteam";
   }
}

Now Cecilia decides she wants to also look at YouTube comments. So she cancels and resubmits the whole thing.

composite Main {
   graph 
   stream<MentionType> mentions = GetTwitterData() {
   // Pull world-cup related traffic
   }

   stream<MentionType> youTubeMentions = GetYouTubeData() {
   // Pull world-cup related traffic
   }

   () as writeAnalysisMyTeam = Analyze(mentions,youTubeMentions) {
        param team: "myteam";
   }

   () as writeAnalysisOpposingTeam = Analyze(mentions,youTubeMentions) {
   // Look for mentions of Cecilia's favorite team.  
       param team: "otherteam";
   }
}

While she’s watching the world cup, she sees that Luis Suárez bite–now she immediately wants to deploy a job (but the job she writes only makes sense for twitter data) to capture that traffic. But she again has to take down her whole app and resubmit it…meanwhile, she’s going to miss some of the traffic.

Import and Export to the rescue

Cecilia keeps having to cancel and resubmit her job every time she makes a change or an addition–when she deploys her bite analysis composite, she’s going to miss traffic on the two teams she’s monitoring. This is frustrating.

The reality is worse. I’ve told this story as if Cecilia were the only developer, but imagine if it were a different person writing each of the analysis pieces–in that case, for Cecilia to add the bite-analysis composite, she has to disrupt the running of the applications of several different people.

In this case, the GetDataFromTwitter and GetDataFromYouTube are independent of each other, and we’d like to be able to manage them independently. Likewise, the analysis apps are all independent from each other, and we’d like to be able to manage them independently.

What this application needs is a publish-subscribe system, where jobs with input data can advertise that the data is there, and where job that want data can subscribe to it. Streams has that built-in, via the Import and Export pseudo-operators. (They are called “pseudo-operators” because they look like operators when you write SPL, but behind the scenes they are treated differently.)

Applications publish with Export, and subscribe with Import. There are two ways to publish a stream; I’m going to use the more flexible properties method, in which the export operator states properties of the stream, and the import operator subscribes to streams with those properties. The other way to do it is to use stream names.

Here’s the piece that grabs twitter data and puts it in streams:

composite PublishTwitter {
   graph 
   stream<MentionType> mentions = GetTwitterData() {
   // Pull world-cup related traffic
   }
   () as ExportTwitter = Export(mentions) {
      // the properties topic and source are chosen by the writer and not something
      // in the language--I could have called them alice and bob, and so long as 
      // the importers used those names, it'd make no difference.
      param properties: {topic="world cup", source="twitter"}
   }
}

.
The YouTube reader has the same topic, but the source property is different:

composite PublishYoutube {
   graph 
   stream<MentionType> youTubeMentions = GetYouTubeData() {
   // Pull world-cup related traffic
   }
   () as ExportYouTube = Export(youTubeMentions) {
      param properties: {topic="world cup", source="youtube"}
   }
}

Applications can then subscribe with an Import. Remember that that team analysis apps used any source, so the subscription doesn’t need to specify the source, and it will connect with both the twitter data and the youtube data.

composite AnalysizeMyTeam() {
   graph
   stream<MentionType> mentions = Import() {
   // Here, any source is okay.
   param subscription: topic == "world cup";
   }

   () as writeAnalysisMyTeam = Analyze(mentions) {
        param team: "myteam";
   }
}

The AnalyzeOtherTeam is similar. But remember that for the bite-analysis app, we only want data from twitter, so in addition to specifying that the topic must be “world cup”, the the subscription specifies that the source must be “twitter”.

composite AnalysizeBite() {
   graph
   stream<MentionType> mentions = Import() {
   // Here, any source is okay.
   param subscription: topic == "world cup" && source=="twitter";
   }

   () as writeAnalysisMyTeam = AnalyzeBite(mentions) {
   }
}

Each these applications are completely independent–they can be stopped or started any without reference to any other app. To connect, they must run in the same streams instance, but they can even be run by different people, provided you establish the permissions correctly.

Why not use Export-Import connections everywhere?

This is the perfect solution for the social media application. But the flexibility of the Export-Import connections means that there’s more room to make mistakes, and fewer ways for the system to help the developer do the right thing.

Consider this example, with a single data producing operator that feeds two file sinks, one directly, and one via an Export-Import connection.

composite Compare {

graph
   stream<rstring message>Messages = Beacon() {
      output Messages: 
         message = "Tuple number "+(rstring)IterationCount();
   }

   () as sink1 = FileSink(Messages) {
      param file: "regular.txt";
   }

   () as Exporter = Export(Messages) {
       // Here there's a list for properties
       param properties: {favorites=["Watership Down","A Tale of Two Cities", "Pride and Prejudice"]};
   }

   stream<rstring message> MessagesImport = Import() {
      param subscription: "Pride and Prejudice" in favorites;
   }

   () as importSink = FileSink(MessagesImport) {
      param file: "import.txt";
   }
}

When we look at first line of "regular.txt", we’ll see “Tuple number 0”. This is not good luck; even when the Beacon starts up before the FileSink is ready, the Streams runtime ensures Beacon won’t send the tuples until the connection to the sink is ready. But the system doesn’t wait for Export connections, so the tuples sent before the Export-Import connection is established are lost. In my run, the first line of "import.txt" says “Tuple number 895,” so the second file sink missed hundreds of tuples. Usually, tuples won’t be lost after startup, but it’s possible for an already established Export-Import connection to be broken and then reconnected, with any tuples sent in the middle being lost.

For the social media application, this isn’t a problem–tuples are lost from the source (eg, the Twitter feeds often doesn’t include all the relevant messages) and the classification can make mistakes, too, so we know we’re not getting 100% of world-cup related tweets. A few lost tuples because the connection isn’t perfect doesn’t affect the application’s correctness.

However, for a typical Streams application, the implicit assumption is that when an operator sends a tuple, the downstream operator will get it. For these sort of applications, an Import-Export connection can introduce a hidden source of errors.

Here’s some of the of the other ways in which Import and Export can be more confusing than regular connections:

  • While the IDE shows connections at runtime, it does cannot show connections in application view. For example, take my simple example above:
    importconnect
    You see the regular connection, but the Import and the file sink for the imported stream look disconnected from the rest of the graph. This makes your application harder to understand, particularly for big applications.

  • The health of a PE can’t be used to determine if the connections you expect are established. A PE won’t be healthy unless all its regular connections are established, and while an Export connection is being established, the pe is “partially healthy” rather than “healthy”, but if a connection you expect isn’t setup because the subscription expression isn’t right, the PE will be “healthy”.
  • There’s more potential for typos and coding errors that can’t be checked by the compiler. For example, in writing the Compare composite above, my first iteration had messages instead of message in the MessagesImport type. As a result, no tuples flowed (as the stream types need to match). I only realized there was a problem at runtime when I saw there were no tuples, and it took me a while to figure out what the problem was (bad properties? bad subscription?). If I had made a similar error with a regular connection, I would have gotten a compile-time error.

Summary

I started with quote from Mark Twain about the difference between the “almost right” tool and the “right” tool. When Export-Import connections are right for your application, they can make the application a lot easier to manage. But if Import-Export isn’t right for your application, using it can make your application harder to manage.

How do you determine whether the upsides outweigh the downsides? The question to ask is whether you expect all the pieces to be deployed together. If so, regular connections (and so one big job) are probably better. Use composites and namespaces to organize your code, but connect the pieces using normal stream connections. When you’re not sure what set of applications will be deployed at runtime (and especially when that set is likely to change), then Export-Import may be the right tool for the job.

5 comments on"Using Import and Export"

  1. Kmell Van Hellsing August 15, 2014

    Hello. First of all, thank you for sharing.

    Now, I would like to ask you this: Is there any “best practice” applied when you use Export and Import operators referring to the number of seconds that an Export Operator should wait in order to the Import Operator to be ready?

    I mean, if I don’t want to lose any tuple coming from the Export Op, what options do I have in order to coordinate the execution of the Import Op? Is it enough to launch them in different PE’s, the one with the Import Op in it first?

    Thank you very much.

  2. Import/Export are usually in separate jobs. You can submit the job containing the import operator first and once it is up and running then submit the job with the export.

    One of use cases for these are long running data ingest jobs that export the data streams to analytic jobs which import the data. In these scenarios the export jobs are often started first, they are long running (forever) and the analytic jobs (importers) come and go as needed.

  3. The best practice is to avoid the use of Export/Import when you cannot tolerate tuple loss.

    Starting the import operator first, in a separate job, won’t ensure that no tuples are lost. In the simple test application, starting the import operator first lost the same number of tuples as starting the two operators together. You can use an initial delay on the source operator; in this case, I found that 1.0 seconds was enough. However, that number is dependent on a lot of factors. If your application relies on that initial delay being “big enough”, it may not work correctly when you move from a development environment to a production environment, or if the other applications running on the same machine change.

    • Kris,

      This is a surprising observation and one worth remembering. I had assumed (as had Pete, it looks like) that it would be enough to start the downstream job before the upstream job to ensure that any tuples submitted on the exported stream would be caught by the import. But this was not guaranteed by any explicit documentation, and apparently the runtime is free to start the upstream job’s streams flowing before all possible connections with already running jobs have been made.

      That said, I think that in most cases any startup effects are only of concern during development, when you’re likely to be working from a finite sample data set. It’s important not to let such side-effects of the development setup skew the design considerations for the production system. In principle a stream is infinite; in practice, while not infinite, the data on a feed (tweets and YouTube comments, in your example) will have been flowing for a long time by the time your Streams job starts. Given that you usually cannot make a feed replay its entire past, you are guaranteed to miss many items. It does not matter if you miss them while the jobs in your modular solution are still being started and connected or while no jobs are running at all; the important consideration is the steady state, when all jobs are up and running and all export-import connections have been made. In that sense, I believe that a modular approach using Import and Export on the streaming data is always useful, and in fact a tremendously powerful technique for Streams applications (for the reasons you articulate so well in your scenario).

      Note that this applies to the streaming data. Cases where concern about losing tuples on exported streams does apply have to do with reading inherently finite data sets, usually from file or a database. Lookup and initialization information are prime examples of that, and I completely agree that in such cases the operator that reads the data (say, a FileSource) and the operator that uses it (for example, a Join) should be part of the same job.

Join The Discussion