I would like to define the export properties and the import subscription of a stream at run time. This will allow greater flexibility when running up many of the same jobs using import and exports.
I have used the 042_dynamic_import_export_api_at_work example from https://www.ibm.com/developerworks/community/files/app?lang=en#/file/414b8379-832d-424e-b0d5-ef63ed3500af as a starter. However, for my application the ImportSubscription/ExportProperties is known at runtime and I do not necessarily have a Custom operator in the data path before the Export or after the Import to use the setOutputPortExportProperties()
/setInputPortImportSubscription()
functions. Also, it would be preferable not to have the performance hit of putting a Custom operator in the data path.
With this in mind I have the below two main composites, both try to use a Custom to change the ImportSubscription/ExportProperties at runtime.
If run both import and export jobs are run with the submission time parameter feed : prod
, this appears to work looking at the jobs in the Instance Graph, they connect and tuples flow as expected. However, if I then run a further import_Main job with the submission time parameter feed : ref, this 3rd job does not appear to connect but its main.Sink operator receives tuples (it goes green and the Total Tuples metric increments) and looking at the console output for the PE the Tuple Received!!
debug can be seen.
What is going on here please? Can the setOutputPortExportProperties()/setInputPortImportSubscription() functions be used in this way? Is there a better way to achieve runtime export properties and the import subscription?
Thanks for your help.
import_Main.spl
composite import_Main {
param
expression<rstring> $feed : (rstring)getSubmissionTimeValue("feed", "test");
graph
@parallel (width=2)
() as main = imp_impl(){
param
feed: $feed;
}
}
composite imp_impl(){
param
expression<rstring>$feed;
graph
stream<SampleTuple> SampleTupleImport = Import(){
param
subscription: feed == "overwritten at runtime" && channel == getChannel();
}
//dynamically update the import subscription at runtime
stream<int32 i> UpdateConfig = Beacon(){
param
period: 30.0;
output
UpdateConfig : i = 0;
}
//only process UpdateConfig, but apply subscription changes to the import
() as ImportRuntimeConfig = Custom(SampleTupleImport; UpdateConfig){
logic
state: mutable boolean configSet = false;
onTuple UpdateConfig : {
if (configSet == false){
rstring subscription = "feed==\"" + $feed + "\" && channel==\""+ (rstring) getChannel() +"\"";
int32 result = setInputPortImportSubscription(subscription,0u);
if (result != 0){
printStringLn("Could note set Import subscription!");
}
else{
configSet = true;
mutable int32 err = 0;
printStringLn("Set Import Subscription to: " + getInputPortImportSubscription(0u,err));
}
}
}
}
//in a real application this would probably be anything but a Custom
() as Sink = Custom(SampleTupleImport){
logic
onTuple SampleTupleImport: printStringLn("Tuple Received!!!");
}
}
export_Main.spl
type SampleTuple = rstring name, uint32 val;
composite export_Main {
param
expression<rstring> $feed : (rstring)getSubmissionTimeValue("feed", "test");
graph
@parallel (width=2)
() as main = exp_impl(){
param
feed: $feed;
}
}
composite exp_impl(){
param
expression<rstring> $feed;
graph
stream<SampleTuple> SampleData = Beacon(){
param
period: 1.0;
output
SampleData : name = "sampleT", val = 1u;
}
//Dynamically udate the export properties at runtime
stream<SampleTuple> ExportRuntimeConfig = Custom(){
logic
state:
mutable boolean configSet = false;
onProcess:{
if(configSet == false){
int32 result = setOutputPortExportProperties({feed=$feed, channel=(rstring)getChannel()},0u);
//int32 result = setOutputPortExportProperties([{name = "feed", value=$feed, typ="rstring"}, {name = "channel", value=(rstring)getChannel(), typ="rstring"}]/* , channel=(rstring)getChannel()}*/,0u);
if (result != 0){
printStringLn("Could not set Export properties");
}
else{
printStringLn("Set Export properties to: {feed = " + $feed + ", channel = " + (rstring)getChannel() + "}");
configSet = true;
}
}
}
}
//export the SampleData
() as SampleDataExport = Export(SampleData, ExportRuntimeConfig){
param
properties: {feed = "overwritten at runtime", channel = getChannel()};
}
}
Answer by Dan Debrunner (1175) | Feb 01 at 03:38 PM
Have you looked at using Publish/Subscribe that are effectively a layer above Export/Import. They support submission time parameters to define the topic/topic pattern and handle UDP naturally.
Supported in 4.0-4.3.
Answer by guyhoskins (7) | Feb 04 at 08:39 AM
Thanks @Dan Debrunner the Publish/Subscribe look a lot easier to use! Inside the composites that is a neat trick calling the setOutputPortExportProperties()
/setInputPortImportSubscription()
from within a filter - this will remove the processing overhead we were worried about with putting a custom inline and ,as you say, they handle parallelisation naturally too.
Submission time parameters to control import/export subscriptions 1 Answer
Import/Export implementation in Java Primitive Operator + Import/Export mechanism 2 Answers
Error from UCD 6.0.1.4.ifix01.495326 importing application json exported from version 6.0.0.424813 1 Answer
Why Wires in Webgui maps and AEL doesnt work properly? 0 Answers