• United States
IBM?
  • Site map
IBM?
  • Marketplace

  • Close
    Search
  • Sign in
    • Sign in
    • Register
  • IBM Navigation
IBM Developer Answers
  • Spaces
    • Blockchain
    • IBM Cloud platform
    • Internet of Things
    • Predictive Analytics
    • Watson
    • See all spaces
  • Tags
  • Users
  • Badges
  • FAQ
  • Help
Close

Name

Community

  • Learn
  • Develop
  • Connect

Discover IBM

  • ConnectMarketplace
  • Products
  • Services
  • Industries
  • Careers
  • Partners
  • Support
10.190.13.206

Refine your search by using the following advanced search options.

Criteria Usage
Questions with keyword1 or keyword2 keyword1 keyword2
Questions with a mandatory word, e.g. keyword2 keyword1 +keyword2
Questions excluding a word, e.g. keyword2 keyword1 -keyword2
Questions with keyword(s) and a specific tag keyword1 [tag1]
Questions with keyword(s) and either of two or more specific tags keyword1 [tag1] [tag2]
To search for all posts by a user or all posts with a specific tag, start typing and choose from the suggestion list. Do not use a plus or minus sign with a tag, e.g., +[tag1].
  • Ask a question

Import operator receives data but doesn't send it?

120000GMHC gravatar image
Question by Kevin_Foster  (520) | Mar 29, 2014 at 05:44 PM streamsdev

I'm splitting an existing application into multiple applications, and seeing odd behavior with an import operator.

.

In one application:

stream<`ParametersSchema> Parameters = FileSource() { param file : "Parameters.txt" ; format : txt ; parsing : strict ; }

() as Export_6 = Export(Parameters) { param properties : { kind = "parameters" } ; }

In the other:

    (stream<`ParametersSchema> Parameters) as Import_13 = Import()
{
param
subscription : kind == "parameters" ;
}

The jobs link successfully through the export-import pair, and I see two tuples and a final punctuation flowing into the Import() operator. But neither tuples nor punctuation are coming out when hovering over the instance graph. Also confirmed that no tuples are flowing out of the Import() by looking at other results.
.

Does anyone have any recommendations on how I might debug my code? Or happy to rewrite to another pattern.
.

-Kevin

People who like this

  0
Comment
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster

4 answers

  • Sort: 
2700047M9A gravatar image
Accepted answer

Answer by KrisWH (450) | Apr 02, 2014 at 12:25 AM

Static connections (the normal ones you use) are different than dynamic connections. A PE isn't healthy until its static connections are established, and they don't start sending tuples until that happens (see the allPortsReady() call). For example, a FileSource doesn't start reading tuples until the operator sending it is sending its tuples to is ready.

Dynamic connections (those via Import/Export) are handled via a different process. There's basically a match-making process in SAM and it starts after both operators are ready. The matchmaking process doesn't take that long, but if the exporter starts sending tuples as soon as it's ready, there is almost surely going to be some tuple loss between it and the importer.

In your case, you were only sending two tuples, so it had probably sent all its tuples by the time it was connected to the importer.

Import/Export is very useful when you don't know exactly what input your operator is going to use, but it's not good to replace normal connections with Import/Export unless you have to.

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
120000GEAB gravatar image

Answer by Dan Debrunner (1175) | Mar 29, 2014 at 06:30 PM

Note the Import operator is a pseudo operator, it actually modifies the input port of the operator it feeds.

What happens if you start the import job first?

It might be that the two tuples and a final marker are submitted before the dynamic connection is made. Maybe try with many more tuples and see what happens.

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
120000GMHC gravatar image

Answer by Kevin_Foster (520) | Mar 31, 2014 at 02:10 PM

I was starting the import job first with a healthy delay before starting the export job. Still didn't work.

But in further testing today I find that if I put an initial delay of 3 seconds on the FileSource of the export job, then that does work. Seems to give the export job more time to connect to the import job?

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
270004YJNG gravatar image

Answer by askene (167) | Apr 03, 2014 at 10:36 AM

Kevin, I had the same issue with some of our jobs sending out tuples before downstream jobs were fully connected.

I ended up creating a c++ operator that sits in front of the export and would block until a user defined number of dynamic connections were established. This worked very well to ensure that the scenario you are seeing doesn't happen in our data flow.

The number of connections metric is not available in SPL, but in C++ you can access it like this:

Metric const& connections = getContext().getPE().getMetrics().getOutputPortMetric(0, PEMetrics::nConnections)

In the process method I block until the connections are setup.

while(connections.getValue() < requiredConnections) {
::SPL::Functions::Utility::block(0.2);
}

This isn't really an ideal solution but it works pretty well. It also blocks if the downstream imports go away.

I am hoping that the upcoming guaranteed delivery options might make this unnecessary?

Comment

People who like this

  0   Show 2   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700051AFW gravatar image RolefH (1)   Apr 07, 2014 at 06:20 AM 0
Share

I noticed that this solution works only if the primitive is NOT fused with other operators. When I fuse connections.getValue() always returns 1 - also when the importer job is not started.

2700051AFW gravatar image RolefH (1)   Apr 07, 2014 at 10:46 AM 0
Share

Just an add-on to my previous comment:

To make this work also when fused with other operators to a single PE, the PE-Port index must not be statically 0 as in the answer above. It must be calculated with


getOutputPortAt(0).getPEOutputPortIndex();
With this we get the PE connections metric with
 
Metric const & connections = getContext().getPE().getMetrics().getOutputPortMetric (getOutputPortAt (0).getPEOutputPortIndex(), PEMetrics::nConnections);

Follow this question

No one has followed this question yet.

Answers

Answers & comments

Related questions

why can't home/streamsadmin be generated automaticaly when installing? 1 Answer

Q) increasing log file processing in real time by Streams 4 Answers

Debugging Standalone Mode Shutdown 4 Answers

how to define constants for use in multiple locations in SPL code? 3 Answers

Unable to load file containing timestamped data into Informix database 4 Answers

  • Contact
  • Privacy
  • IBM Developer Terms of use
  • Accessibility
  • Report Abuse
  • Cookie Preferences

Powered by AnswerHub

Authentication check. Please ignore.
  • Anonymous
  • Sign in
  • Create
  • Ask a question
  • Spaces
  • API Connect
  • Analytic Hybrid Cloud Core
  • Application Performance Management
  • Appsecdev
  • BPM
  • Blockchain
  • Business Transaction Intelligence
  • CAPI
  • CAPI SNAP
  • CICS
  • Cloud Analytics
  • Cloud Automation
  • Cloud Object Storage
  • Cloud marketplace
  • Collaboration
  • Content Services (ECM)
  • Continuous Testing
  • Courses
  • Customer Experience Analytics
  • DB2 LUW
  • DataPower
  • Decision Optimization
  • DevOps Services
  • Developers IBM MX
  • Digital Commerce
  • Digital Experience
  • Finance
  • Global Entrepreneur Program
  • Hadoop
  • Hybrid Cloud Core
  • IBM Cloud platform
  • IBM Design
  • IBM Forms Experience Builder
  • IBM Maximo Developer
  • IBM StoredIQ
  • IBM StoredIQ-Cartridges
  • IIDR
  • ITOA
  • InformationServer
  • Integration Bus
  • Internet of Things
  • Kenexa
  • Linux on Power
  • LinuxONE
  • MDM
  • Mainframe
  • Messaging
  • Node.js
  • ODM
  • Open
  • PowerAI
  • PowerVC
  • Predictive Analytics
  • Product Insights
  • PureData for Analytics
  • Push
  • QRadar App Development
  • Run Book Automation
  • Search Insights
  • Security Core
  • Storage
  • Storage Core
  • Streamsdev
  • Supply Chain Business Network
  • Supply Chain Insights
  • Swift
  • UBX Capture
  • Universal Behavior Exchange
  • UrbanCode
  • WASdev
  • WSRR
  • Watson
  • Watson Campaign Automation
  • Watson Content Hub
  • Watson Marketing Insights
  • dW Answers Help
  • dW Premium
  • developerWorks Sandbox
  • developerWorks Team
  • Watson Health
  • More
  • Tags
  • Questions
  • Users
  • Badges