The documentation for User-Defined Parallelism in Streams 3.2 presents an example of using a FileSource inside of a UDP region:

    @parallel(width=3)
    stream<uint64 i> Output = FileSource() {
        param file: "input" + (rstring)getChannel() + ".csv";
    }

If the above FileSource invocation is inside of a UDP region, then each operator replica will open a different file because of the use of getChannel(). The replica on the 0th channel will open the file with the name "input0.csv", the replica on the 1th channel will open the file with the name "input1.csv", and so on.

In the above example, the resource we want to use happens to have a naming convention that has the parallel channel number encoded in it. This convenient naming scheme will not always hold when we want to access an external resource from inside of a UDP region.

Consider the case of using a TCPSource from inside of a UDP region. It’s unlikely that the IP addresses and ports that we want to connect to will follow the kind of naming scheme shown above. So how can we solve this problem in general?

Our solution uses getChannel() to index into operator local lists:

    @parallel(width=3)
    stream<uint64 i> Src = TCPSource() {
        logic state: {
            list<rstring> _addresses = ["123.45.6.89", "45.6.445.89", "48.3.495.3"];
            list<uint32> _ports = [240u, 560u, 9001u];
        }

        param role: client;
              address: _addresses[getChannel()];
              port: _ports[getChannel()];
    }

When this TCPSource invocation appears inside of a UDP region, each replica of the TCPSource operator will access the part of the _addresses and _ports lists indicated by the value of getChannel(). This solution can apply in situations where we can hard-code the values for the parameters in advance, and know what the maximum parallel width of the UDP region can be. (Note that if the UDP region had more than 3 channels, we would encounter a runtime error when we accessed past the third location in the _addresses and _ports lists.) If those constraints do not hold, then we could instead use submission time values:

    @parallel(width=(int32)getSubmissionTimeValue("parWidth"))
    stream<uint64 i> Src = TCPSource() {
        logic state: {
            list<rstring> _addresses = (list<rstring>)getSubmissionTimeValue("addrs");
            list<uint32> _ports = (list<uint32>)getSubmissionTimeValue("ports");
        }

        param role: client;
              address: _addresses[getChannel()];
              port: _ports[getChannel()];
    }

The only constraint in the example above is that the length of the addrs and ports lists passed at submission time must be no larger than the value of parWidth, which is also provided at submission time.

2 comments on"General Operator Parameters in UDP Regions"

  1. DanDebrunner November 23, 2013

    Can you show what the value of the submission time parameters addrs and ports would be to match the hard-coded example? I.e. how can pass a list of rstring or uint32 as a submission time string.

    • Dan, the following job submission should work, assuming that the SPL file was named Main.spl:

      streamtool submitjob -P addrs='[“123.45.6.89”, “45.6.445.89”, “48.3.495.3”]’ -P ports='[240, 560, 9001]’ -P parWidth=3 output/Main.adl

Join The Discussion