I'm splitting an existing application into multiple applications, and seeing odd behavior with an import operator.
.
In one application:
stream<`ParametersSchema> Parameters = FileSource()
{
param
file : "Parameters.txt" ;
format : txt ;
parsing : strict ;
}
() as Export_6 = Export(Parameters)
{
param
properties : { kind = "parameters" } ;
}
In the other:
(stream<`ParametersSchema> Parameters) as Import_13 = Import()
{
param
subscription : kind == "parameters" ;
}
The jobs link successfully through the export-import pair, and I see two tuples and a final punctuation flowing into the Import() operator. But neither tuples nor punctuation are coming out when hovering over the instance graph. Also confirmed that no tuples are flowing out of the Import() by looking at other results.
.
Does anyone have any recommendations on how I might debug my code? Or happy to rewrite to another pattern.
.
-Kevin
Answer by KrisWH (450) | Apr 02, 2014 at 12:25 AM
Static connections (the normal ones you use) are different than dynamic connections. A PE isn't healthy until its static connections are established, and they don't start sending tuples until that happens (see the allPortsReady() call). For example, a FileSource doesn't start reading tuples until the operator sending it is sending its tuples to is ready.
Dynamic connections (those via Import/Export) are handled via a different process. There's basically a match-making process in SAM and it starts after both operators are ready. The matchmaking process doesn't take that long, but if the exporter starts sending tuples as soon as it's ready, there is almost surely going to be some tuple loss between it and the importer.
In your case, you were only sending two tuples, so it had probably sent all its tuples by the time it was connected to the importer.
Import/Export is very useful when you don't know exactly what input your operator is going to use, but it's not good to replace normal connections with Import/Export unless you have to.
Answer by Dan Debrunner (1175) | Mar 29, 2014 at 06:30 PM
Note the Import operator is a pseudo operator, it actually modifies the input port of the operator it feeds.
What happens if you start the import job first?
It might be that the two tuples and a final marker are submitted before the dynamic connection is made. Maybe try with many more tuples and see what happens.
Answer by Kevin_Foster (520) | Mar 31, 2014 at 02:10 PM
I was starting the import job first with a healthy delay before starting the export job. Still didn't work.
But in further testing today I find that if I put an initial delay of 3 seconds on the FileSource of the export job, then that does work. Seems to give the export job more time to connect to the import job?
Answer by askene (167) | Apr 03, 2014 at 10:36 AM
Kevin, I had the same issue with some of our jobs sending out tuples before downstream jobs were fully connected.
I ended up creating a c++ operator that sits in front of the export and would block until a user defined number of dynamic connections were established. This worked very well to ensure that the scenario you are seeing doesn't happen in our data flow.
The number of connections metric is not available in SPL, but in C++ you can access it like this:
Metric const& connections = getContext().getPE().getMetrics().getOutputPortMetric(0, PEMetrics::nConnections)
In the process method I block until the connections are setup.
while(connections.getValue() < requiredConnections) {
::SPL::Functions::Utility::block(0.2);
}
This isn't really an ideal solution but it works pretty well. It also blocks if the downstream imports go away.
I am hoping that the upcoming guaranteed delivery options might make this unnecessary?
I noticed that this solution works only if the primitive is NOT fused with other operators. When I fuse connections.getValue() always returns 1 - also when the importer job is not started.
Just an add-on to my previous comment:
To make this work also when fused with other operators to a single PE, the PE-Port index must not be statically 0 as in the answer above. It must be calculated with
getOutputPortAt(0).getPEOutputPortIndex();
With this we get the PE connections metric with
Metric const & connections = getContext().getPE().getMetrics().getOutputPortMetric (getOutputPortAt (0).getPEOutputPortIndex(), PEMetrics::nConnections);
best In-memory db to be used with Streams ? 1 Answer
streamtool capturestate metrics vs jmx metrics snapshot and operator port name resolution 1 Answer
Streams QSE 4.2 unzip problem with Windows default unzip utility (-->USE 7zip or PKZIP) 2 Answers
PE Throwing NameNotFound exception, what could cause this 2 Answers