Digital Developer Conference: a FREE half-day online conference focused on AI & Cloud – North America: Nov 2 – India: Nov 9 – Europe: Nov 14 – Asia Nov 23 Register now

Close outline
  • United States
IBM?
  • Site map
IBM?
  • Marketplace

  • Close
    Search
  • Sign in
    • Sign in
    • Register
  • IBM Navigation
IBM Developer Answers
  • Spaces
    • Blockchain
    • IBM Cloud platform
    • Internet of Things
    • Predictive Analytics
    • Watson
    • See all spaces
  • Tags
  • Users
  • Badges
  • FAQ
  • Help
Close

Name

Community

  • Learn
  • Develop
  • Connect

Discover IBM

  • ConnectMarketplace
  • Products
  • Services
  • Industries
  • Careers
  • Partners
  • Support
10.190.13.195

Refine your search by using the following advanced search options.

Criteria Usage
Questions with keyword1 or keyword2 keyword1 keyword2
Questions with a mandatory word, e.g. keyword2 keyword1 +keyword2
Questions excluding a word, e.g. keyword2 keyword1 -keyword2
Questions with keyword(s) and a specific tag keyword1 [tag1]
Questions with keyword(s) and either of two or more specific tags keyword1 [tag1] [tag2]
To search for all posts by a user or all posts with a specific tag, start typing and choose from the suggestion list. Do not use a plus or minus sign with a tag, e.g., +[tag1].
  • Ask a question

Defining import/export configuration at runtime

504NE3EPEC gravatar image
Question by guyhoskins  (7) | Feb 01 at 01:29 PM streamsdevimportexport

I would like to define the export properties and the import subscription of a stream at run time. This will allow greater flexibility when running up many of the same jobs using import and exports.

I have used the 042_dynamic_import_export_api_at_work example from https://www.ibm.com/developerworks/community/files/app?lang=en#/file/414b8379-832d-424e-b0d5-ef63ed3500af as a starter. However, for my application the ImportSubscription/ExportProperties is known at runtime and I do not necessarily have a Custom operator in the data path before the Export or after the Import to use the setOutputPortExportProperties()/setInputPortImportSubscription() functions. Also, it would be preferable not to have the performance hit of putting a Custom operator in the data path.

With this in mind I have the below two main composites, both try to use a Custom to change the ImportSubscription/ExportProperties at runtime.

If run both import and export jobs are run with the submission time parameter feed : prod, this appears to work looking at the jobs in the Instance Graph, they connect and tuples flow as expected. However, if I then run a further import_Main job with the submission time parameter feed : ref, this 3rd job does not appear to connect but its main.Sink operator receives tuples (it goes green and the Total Tuples metric increments) and looking at the console output for the PE the Tuple Received!! debug can be seen.

What is going on here please? Can the setOutputPortExportProperties()/setInputPortImportSubscription() functions be used in this way? Is there a better way to achieve runtime export properties and the import subscription?

Thanks for your help.

import_Main.spl

 composite import_Main {
 
 param
     expression<rstring> $feed : (rstring)getSubmissionTimeValue("feed", "test");
     
 graph
     @parallel (width=2)
     () as main = imp_impl(){
         param 
             feed: $feed;
     }
 }
 
 composite imp_impl(){
 param 
     expression<rstring>$feed;
 
 graph
     stream<SampleTuple> SampleTupleImport = Import(){
         param
             subscription: feed == "overwritten at runtime" && channel == getChannel();
     }
     
     //dynamically update the import subscription at runtime
     stream<int32 i> UpdateConfig = Beacon(){
     param
         period: 30.0;
     output
         UpdateConfig : i = 0;
     }
     
     //only process UpdateConfig, but apply subscription changes to the import
     () as ImportRuntimeConfig = Custom(SampleTupleImport; UpdateConfig){
         logic
             state: mutable boolean configSet = false;
         onTuple UpdateConfig : {
             if (configSet == false){
                 rstring subscription = "feed==\"" + $feed + "\" && channel==\""+ (rstring) getChannel() +"\"";
                 int32 result = setInputPortImportSubscription(subscription,0u);
                 
                 if (result != 0){
                     printStringLn("Could note set Import subscription!");
                 }
                 else{
                     configSet = true;
                     mutable int32 err = 0;
                     printStringLn("Set Import Subscription to: " + getInputPortImportSubscription(0u,err));
                 }
             }
                 
         }
     }
     
     //in a real application this would probably be anything but a Custom
     () as Sink = Custom(SampleTupleImport){
         logic
             onTuple SampleTupleImport: printStringLn("Tuple Received!!!");
     }
 
 }

export_Main.spl

 type SampleTuple = rstring name, uint32 val;
 
 composite export_Main {
 
 param
     expression<rstring> $feed : (rstring)getSubmissionTimeValue("feed", "test");
     
 graph
     @parallel (width=2)
     () as main = exp_impl(){
         param 
             feed: $feed;
     }
 }
 
 composite exp_impl(){
 param 
     expression<rstring> $feed;
 
 graph
     stream<SampleTuple> SampleData = Beacon(){
         param
             period: 1.0;
             output
             SampleData : name = "sampleT", val = 1u;
     }
     //Dynamically udate the export properties at runtime
     stream<SampleTuple> ExportRuntimeConfig = Custom(){
     logic
        state:
        mutable boolean configSet = false;
        onProcess:{
            if(configSet == false){
                int32 result = setOutputPortExportProperties({feed=$feed, channel=(rstring)getChannel()},0u);
                //int32 result = setOutputPortExportProperties([{name = "feed", value=$feed, typ="rstring"}, {name = "channel", value=(rstring)getChannel(), typ="rstring"}]/* , channel=(rstring)getChannel()}*/,0u);
            
                if (result != 0){
                    printStringLn("Could not set Export properties");
                }
                else{
                    printStringLn("Set Export properties to: {feed = " + $feed + ", channel = " + (rstring)getChannel() + "}");
                    configSet = true;
                }
            }
        }
     }
     //export the SampleData
     () as SampleDataExport = Export(SampleData, ExportRuntimeConfig){
         param
             properties: {feed = "overwritten at runtime", channel = getChannel()};
     }
 }




People who like this

  0
Comment
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster

2 answers

  • Sort: 
120000GEAB gravatar image
Accepted answer

Answer by Dan Debrunner (1175) | Feb 01 at 03:38 PM

Have you looked at using Publish/Subscribe that are effectively a layer above Export/Import. They support submission time parameters to define the topic/topic pattern and handle UDP naturally.

Supported in 4.0-4.3.

https://www.ibm.com/support/knowledgecenter/SSCRJU_4.3.0/com.ibm.streams.toolkits.doc/spldoc/dita/tk$com.ibm.streamsx.topology/ns$com.ibm.streamsx.topology.topic.html

Comment
guyhoskins

People who like this

  1   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
504NE3EPEC gravatar image

Answer by guyhoskins (7) | Feb 04 at 08:39 AM

Thanks @Dan Debrunner the Publish/Subscribe look a lot easier to use! Inside the composites that is a neat trick calling the setOutputPortExportProperties()/setInputPortImportSubscription() from within a filter - this will remove the processing overhead we were worried about with putting a custom inline and ,as you say, they handle parallelisation naturally too.

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster

Follow this question

159 people are following this question.

Answers

Answers & comments

Related questions

Submission time parameters to control import/export subscriptions 1 Answer

Import/Export implementation in Java Primitive Operator + Import/Export mechanism 2 Answers

In Streams 4.2.1.3 Does the TCP connection between PE's stay blocked while there is any data in a threadedInput port right behind the Import() 2 Answers

Error from UCD 6.0.1.4.ifix01.495326 importing application json exported from version 6.0.0.424813 1 Answer

Why Wires in Webgui maps and AEL doesnt work properly? 0 Answers

  • Contact
  • Privacy
  • IBM Developer Terms of use
  • Accessibility
  • Report Abuse
  • Cookie Preferences

Powered by AnswerHub

Authentication check. Please ignore.
  • Anonymous
  • Sign in
  • Create
  • Ask a question
  • Spaces
  • API Connect
  • Analytic Hybrid Cloud Core
  • Application Performance Management
  • Appsecdev
  • BPM
  • Blockchain
  • Business Transaction Intelligence
  • CAPI
  • CAPI SNAP
  • CICS
  • Cloud Analytics
  • Cloud Automation
  • Cloud Object Storage
  • Cloud marketplace
  • Collaboration
  • Content Services (ECM)
  • Continuous Testing
  • Courses
  • Customer Experience Analytics
  • DB2 LUW
  • Data and AI
  • DataPower
  • Decision Optimization
  • DevOps Build
  • DevOps Services
  • Developers IBM MX
  • Digital Commerce
  • Digital Experience
  • Finance
  • Global Entrepreneur Program
  • Hadoop
  • Hybrid Cloud Core
  • Hyper Protect
  • IBM Cloud platform
  • IBM Design
  • IBM Forms Experience Builder
  • IBM Maximo Developer
  • IBM StoredIQ
  • IBM StoredIQ-Cartridges
  • IIDR
  • ITOA
  • InformationServer
  • Integration Bus
  • Internet of Things
  • Kenexa
  • Linux on Power
  • LinuxONE
  • MDM
  • Mainframe
  • Messaging
  • Node.js
  • ODM
  • Open
  • PartnerWorld Developer Support
  • PowerAI
  • PowerVC
  • Predictive Analytics
  • Product Insights
  • PureData for Analytics
  • Push
  • QRadar App Development
  • Run Book Automation
  • Search Insights
  • Security Core
  • Storage
  • Storage Core
  • Streamsdev
  • Supply Chain Business Network
  • Supply Chain Insights
  • Swift
  • UBX Capture
  • Universal Behavior Exchange
  • UrbanCode
  • WASdev
  • WSRR
  • Watson
  • Watson Campaign Automation
  • Watson Content Hub
  • Watson Marketing Insights
  • dW Answers Help
  • dW Premium
  • developerWorks Sandbox
  • developerWorks Team
  • Watson Health
  • More
  • Tags
  • Questions
  • Users
  • Badges