Digital Developer Conference: a FREE half-day online conference focused on AI & Cloud – North America: Nov 2 – India: Nov 9 – Europe: Nov 14 – Asia Nov 23 Register now

Close outline
  • United States
IBM?
  • Site map
IBM?
  • Marketplace

  • Close
    Search
  • Sign in
    • Sign in
    • Register
  • IBM Navigation
IBM Developer Answers
  • Spaces
    • Blockchain
    • IBM Cloud platform
    • Internet of Things
    • Predictive Analytics
    • Watson
    • See all spaces
  • Tags
  • Users
  • Badges
  • FAQ
  • Help
Close

Name

Community

  • Learn
  • Develop
  • Connect

Discover IBM

  • ConnectMarketplace
  • Products
  • Services
  • Industries
  • Careers
  • Partners
  • Support
10.190.13.195

Refine your search by using the following advanced search options.

Criteria Usage
Questions with keyword1 or keyword2 keyword1 keyword2
Questions with a mandatory word, e.g. keyword2 keyword1 +keyword2
Questions excluding a word, e.g. keyword2 keyword1 -keyword2
Questions with keyword(s) and a specific tag keyword1 [tag1]
Questions with keyword(s) and either of two or more specific tags keyword1 [tag1] [tag2]
To search for all posts by a user or all posts with a specific tag, start typing and choose from the suggestion list. Do not use a plus or minus sign with a tag, e.g., +[tag1].
  • Ask a question

Does user defined parallelism require contiguous partitioning attributes?

2700018MKY gravatar image
Question by Jim Sharpe  (68) | Jan 21, 2016 at 04:51 PM streamsdevparalleluser-defined-parallelismpartitioned

I'm seeing a behavior that would seem to indicate that partitioning based on numerical values for user-defined parallelism (using the annotation) must have a contiguous range of values. For example, if I have 20 channels of data each with a chNum attribute in the range of 0-19 and I set the width of the parallelism to 20 and the partitioning attribute to chNum it works perfectly as expected with each parallel region processing a different channel. However, if I filter the input to the parallel region to eliminate a couple of the channels but add a couple at the top end (i.e. the chNum values are now 0-3, and 6-21, which still amounts to a total of 20 channels, but with channels 4 and 5 replaced with 20 and 21) . Two of the parallel regions show as not processing any values (presumably they might still be looking for chNum's 4 and 5 which are no longer in the input stream, but that's a guess) and channels 19 and 20 never appear in the output.

As a followup test I then increased the width to 22 and still got the two "unused" parallel regions, but the data for the additional two channels did appear in the output. The makes me believe that something in the splitter for numerical partitioning attributes is expecting a contiguous set of values not just a unique mapping to number specified by the width. Is that supposed to be the case? What else might be impacting this behavior? If it matters, this on v4.1QSE

People who like this

  0
Comment
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster

9 answers

  • Sort: 
2700023KH6 gravatar image

Answer by Michael.K (1380) | Jan 22, 2016 at 02:39 AM

Hello Jim.

Excerpt from the Knowledge Center:

When the parallel region is partitioned, the splitter uses the partition keys, which are the attribute values of each tuple, to determine where it routes the tuple. The splitter achieves the routing process by creating a hash from the set of tuple attributes.

This means, even if you use a contiguous range of integer values, it is not ensured that every channel is used. It depends on the hash value that is generated. You can think of the routing as a Split operator with the following index parameter: index : hashCode(attribute)

This ensures only that tuples with the same attribute value are routed to the same channel.

Regards, Michael.

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700018MKY gravatar image

Answer by Jim Sharpe (68) | Jan 22, 2016 at 10:26 AM

Hi Michael,

Thanks for the quick response. However I still don't understand why I end up missing values (i.e. not processing tuples) from some channels if I have sufficiently wide parallelism. I.e. if I have 20 different unique values in the partitioning attribute and have a width of 20. I would expect that all 20 parallel sections will get used and output for tuples containing all 20 of those attribute values will be produced. Neither is what I am observing. It appears that tuples with attribute values greater than the specified parallel width are not getting routed anywhere, but instead are getting dropped even though there is sufficient width for the hashing algorithm to map each value to a unique parallel region. Furthermore, the hashing algorithm which decides where to route the tuples is also not mapping values to two of the available parallel sections as no tuples are procssed by them even though I have exactly the same number of attribute values as the width of the parallelism. Because the hashing works fine with contiguous (uint32) attribute values and things do not appear to function correctly when there is a gap I can't help but wonder if there is a bug.

Comment

People who like this

  0   Show 1   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700023KH6 gravatar image Michael.K (1380)   Jan 22, 2016 at 10:32 AM 0
Share

Hello Jim.

Is it possible that you paste your good & your bad application here?

My expectation is that every tuple gets processed. It might happen that the some channels process more than one tuple, but at the end, all tuples must be processed.

From your explanation I understand that you noticed tuple loss, so I want to do some investigation and need your applications for this.

Thank you in advance.

2700018MKY gravatar image

Answer by Jim Sharpe (68) | Jan 22, 2016 at 01:43 PM

The issue is occurring within a much larger app so I can't include the entire example but below are some slightly tweaked extracts that illustrate what the code is doing. And yes I'm definitely experiencing unexpected tuple loss and have verified that by printing the contents of the streams passing into and out of the parallel region. The incoming stream includes equal numbers of tuples with chNum values between 0 and 49 inclusive with all values represented. If I set parallelChannels to 22 tuples with output values of 20 and 21 are output. But when I set it to 20 then I get no tuples output that have chNum equal to 20 or 21. (Note that in this version of the filter values 4 and 5 are removed so the total number of values to be hashed is 20.)

         stream<ChannelDataT> Channels = Filter(AllChannels)
         {
             param
                 filter : has((list<uint32>)[0, 1, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21], chNum) ;
         }            
         @parallel(width =(int32) getSubmissionTimeValue("parallelChannels"),
             partitionBy = [ { port = Channels, attributes = [ chNum ] } ])
         stream<PeriodDataT> PeriodData = ProcessChannels(Channels)
         {
         }
Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700023KH6 gravatar image

Answer by Michael.K (1380) | Jan 22, 2016 at 02:43 PM

Hello Jim.

Please find below my test application. I put a Beacon in front of your operators, added some FileSinks, and used a Functor inside ProcessChannels. Please let me know whether this application fits your scenario.

The Beacon produces 50 tuples with chNum value between 0 and 49.

If I set parallelChannels to 22. I get the following 20 tuples in the result.txt file:

 {chNum=0,channel=0}
 {chNum=1,channel=20}
 {chNum=2,channel=6}
 {chNum=3,channel=3}
 {chNum=6,channel=18}
 {chNum=7,channel=15}
 {chNum=8,channel=21}
 {chNum=9,channel=9}
 {chNum=10,channel=12}
 {chNum=11,channel=10}
 {chNum=12,channel=16}
 {chNum=13,channel=13}
 {chNum=14,channel=4}
 {chNum=15,channel=1}
 {chNum=16,channel=7}
 {chNum=17,channel=5}
 {chNum=18,channel=11}
 {chNum=19,channel=8}
 {chNum=20,channel=19}
 {chNum=21,channel=2}

If I set parallelChannels to 20, I get the following 20 tuples in the result.txt file:

 {chNum=0,channel=0}
 {chNum=1,channel=6}
 {chNum=2,channel=3}
 {chNum=3,channel=14}
 {chNum=6,channel=15}
 {chNum=7,channel=9}
 {chNum=8,channel=12}
 {chNum=9,channel=10}
 {chNum=10,channel=16}
 {chNum=11,channel=13}
 {chNum=12,channel=4}
 {chNum=13,channel=1}
 {chNum=14,channel=7}
 {chNum=15,channel=5}
 {chNum=16,channel=11}
 {chNum=17,channel=8}
 {chNum=18,channel=19}
 {chNum=19,channel=2}
 {chNum=20,channel=0} <-- as chNum=0
 {chNum=21,channel=6} <-- as chNum=1

As I tried to explain in my first response, it might happen that a unique chNum value is passed to a channel that is also used for another unique chNum value. In this case, chNum 0 and 20 are handled in UDP channel 0, and chNum 1 and 21 are handled in UDP channel 6.

Please let me know whether this application is fitting to your scenario and whether it produces in your environment less than 20 tuples. Or, whether I am missing something ;) Thank you in advance.

The application:

 namespace application;
 
 type ChannelDataT = tuple<uint32 chNum>;
 type PeriodDataT = tuple<uint32 chNum, uint32 channel>;
 
 composite ProcessChannels(input stream<ChannelDataT> I; output stream<PeriodDataT> O)
 {
     graph
         stream<PeriodDataT> O = Functor(I)
         {
             output O: channel = (uint32)getChannel();
         }
 }
 
 composite Main
 {
     graph
         stream<ChannelDataT> AllChannels as O = Beacon()
         {
             param iterations: 50; period: 1.0;
             output O: chNum = (uint32)IterationCount();
         }
         () as BeaconSink = FileSink(AllChannels)
         {
             param file: "beacon.txt"; format: txt; flush: 1u;
         }
         stream<ChannelDataT> Channels = Filter(AllChannels)
         {
             param
                 filter : has((list<uint32>)[0, 1, 2, 3, /* 4, 5, */ 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21], chNum) ;
         }            
         @parallel(width =(int32) getSubmissionTimeValue("parallelChannels"), partitionBy = [ { port = Channels, attributes = [ chNum ] } ])
         stream<PeriodDataT> PeriodData = ProcessChannels(Channels)
         {
         }
         () as UDPSink = FileSink(PeriodData)
         {
             param file: "result.txt"; format: txt; flush: 1u;
         }
 }



Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700018MKY gravatar image

Answer by Jim Sharpe (68) | Jan 22, 2016 at 03:58 PM

You example does match my scenario but not the behavior I observe. In my case if I set parallel channels to 20 it does not seem to "repurpose" the vacated paths to process the tuples with chNum = 20/21. Rather those tuples just don't appear at all on the output (unless I set width to 22 or greater).

I'll try your example as well as do some more testing with my application later this evening to see if I can pin down what might be different.

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700018MKY gravatar image

Answer by Jim Sharpe (68) | Jan 22, 2016 at 07:40 PM

Hi Michael,

I just tried running your example and although it does output all the tuples it doesn't behave in the way I expected. Specifically, channels 0 and one are each used to process tuples with two different chNum values while channels 4 and 5 do not process any tuples. (See attached.) Your example above appears to not use channels 17 and 18). Is that what you would expect? If so is there any way to force it to use all the channels before reusing any and/or prohibiting any channel to process tuples for more than one chNum? Doubling up on some channels while not using others at all is problematic for at least a couple of reasons.

link text alt text


screen-shot-2016-01-22-at-62257-pm.png (98.0 kB)
result.txt (422 B)
Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700023KH6 gravatar image

Answer by Michael.K (1380) | Jan 26, 2016 at 05:01 AM

Hello Jim.

Today, the distribution algorithm calculates, as mentioned in the first answer, a hash code from your specified partitionBy attributes. The hash code algorithm is fixed and cannot be changed.

The distribution algorithm takes the calculated hash code, runs a modulo operation (hash code mod number of channels) to calculate an index, and sends the tuple to the output port with the resulting index. This is identical to the behavior of a Split operator.

For sure this means, if your partitionBy attributes have a small set of value combinations only, for example, 4 combinations, it does not make sense to have more than 4 channels because a maximum of 4 channels would be used only. If the hash code and modulo calculation produces the same results for different value combinations, the number might be smaller than 4.

On the other hand, if you have a huge set of value combinations and an application streaming data for weeks and months, the application will probably use all channels, at least in average.

And here the answers to your questions.

Q: Is that what you would expect?

The different channel numbers are possible because I used another Streams version, and there might be changes in the hash code or distribution algorithm. This is not known to me, but since you got all tuples, the result is not wrong.

Q: If so is there any way to force it to use all the channels before reusing any and/or prohibiting any channel to process tuples for more than one chNum?

No because the destination channel must be calculated from the partitionBy attributes. That means, you want to have a fast algorithm, and building a hash code is one of the fastest generic approaches. If you would do some equal load partitioning, you would need to store the values, which would be much slower.

Q: Doubling up on some channels while not using others at all is problematic for at least a couple of reasons.

As I mentioned before, if there are few value combinations only, it does not make sense to have more channels. But for a typical application, you will have a large set of value combinations and therefore, a nearly "equal load" distribution from a value perspective. I assume that seeing some channels being reused, is a test issue only ... too few test data. If you would try with a million different numbers, all channels will be used and each channel will handle a similar number of value combinations.

Regards, Michael.

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700018MKY gravatar image

Answer by Jim Sharpe (68) | Jan 26, 2016 at 07:46 AM

Hi Michael,

Thanks for he response, but it's rather disappointing news. In my current use case I'm doing a sequence of steps of time series processing in each of the parallel channels. As currently written, each of the channels expects, and can only process, a single signal. Mixing state from different signals in the same channel would produce an erroneous result. Simply using partitioned windows for some of the operators doesn't really help because that is not supported by all operators in the parallel region. The reason your example application isn't losing tuples but mine is, relates only to the nature of what happens inside the parallel region. Since I had [erroneously] assumed that each channel would be processing only a single signal, merging multiple signals together in a single channel results in corrupting both and losing one.

Given the relatively small number of channels I need (less than a few hundred) the workaround I've been using to ensure that each channel only processes a single signal is to set the width greater than or equal to the largest numerical partitioning attribute. In the case where all the numerical chNum's are contiguous that works great with each channel getting used for exactly one chNum. However, as illustrated in your test program, things fall apart if not all the partitioning attribute values are used. Additionally this hack of a workaround is wasteful of computational resources if the partitioning attribute values are sparse since many channels would be created but never used.

Prior to the arrival of the @parallel annotation we would probably have used a combination of mixed mode and custom operators with for implementing something like this. Is quite a loss of convenience not to be able to use the parallel operator UDP. I guess I'll have to dust off my perl and recode it the "old fashioned" way.

Please consider this a feature request for a future version Streams to allow an alternate split/hashing algorithm that would support the situation where the width of the parallel region exactly matches the number of partitioning values to provide a 1:1 mapping.

Comment

People who like this

  0   Show 1   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
2700023KH6 gravatar image Michael.K (1380)   Jan 26, 2016 at 08:15 AM 0
Share

I raised a feature request. Good luck with Perl.

060000C4DT gravatar image

Answer by MikeSpicer (386) | Jan 26, 2016 at 06:18 PM

As Michael has described, the partitioning uses a hash so the behavior of data not being evenly spread over the number of available channels is possible but you should not see lost tuples. If you double check your test and are seeing lost tuples its a defect and we will fix it. We have a feature request to allow custom partitioning functions.

Another suggestion for a workaround would be to have an upstream operator assign each signal id to a contiguous channel number in the range of 0 to chNum -1. I realize that this is additional processing and does not remove the need for the feature request but it should allow you to get the behavior you need in the interim.

Comment

People who like this

  0   Share
10 |3000 characters needed characters left characters exceeded
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster

Follow this question

53 people are following this question.

Answers

Answers & comments

Related questions

control ports on a user defined parallel region 1 Answer

Scalability of replicated operator in parallel region. 3 Answers

@parallel split on rstring attribute 1 Answer

Controlling placement of parallel channels to hosts 3 Answers

How to efficiently link paralleled exports and imports in Streams 4 1 Answer

  • Contact
  • Privacy
  • IBM Developer Terms of use
  • Accessibility
  • Report Abuse
  • Cookie Preferences

Powered by AnswerHub

Authentication check. Please ignore.
  • Anonymous
  • Sign in
  • Create
  • Ask a question
  • Spaces
  • API Connect
  • Analytic Hybrid Cloud Core
  • Application Performance Management
  • Appsecdev
  • BPM
  • Blockchain
  • Business Transaction Intelligence
  • CAPI
  • CAPI SNAP
  • CICS
  • Cloud Analytics
  • Cloud Automation
  • Cloud Object Storage
  • Cloud marketplace
  • Collaboration
  • Content Services (ECM)
  • Continuous Testing
  • Courses
  • Customer Experience Analytics
  • DB2 LUW
  • Data and AI
  • DataPower
  • Decision Optimization
  • DevOps Build
  • DevOps Services
  • Developers IBM MX
  • Digital Commerce
  • Digital Experience
  • Finance
  • Global Entrepreneur Program
  • Hadoop
  • Hybrid Cloud Core
  • Hyper Protect
  • IBM Cloud platform
  • IBM Design
  • IBM Forms Experience Builder
  • IBM Maximo Developer
  • IBM StoredIQ
  • IBM StoredIQ-Cartridges
  • IIDR
  • ITOA
  • InformationServer
  • Integration Bus
  • Internet of Things
  • Kenexa
  • Linux on Power
  • LinuxONE
  • MDM
  • Mainframe
  • Messaging
  • Node.js
  • ODM
  • Open
  • PartnerWorld Developer Support
  • PowerAI
  • PowerVC
  • Predictive Analytics
  • Product Insights
  • PureData for Analytics
  • Push
  • QRadar App Development
  • Run Book Automation
  • Search Insights
  • Security Core
  • Storage
  • Storage Core
  • Streamsdev
  • Supply Chain Business Network
  • Supply Chain Insights
  • Swift
  • UBX Capture
  • Universal Behavior Exchange
  • UrbanCode
  • WASdev
  • WSRR
  • Watson
  • Watson Campaign Automation
  • Watson Content Hub
  • Watson Marketing Insights
  • dW Answers Help
  • dW Premium
  • developerWorks Sandbox
  • developerWorks Team
  • Watson Health
  • More
  • Tags
  • Questions
  • Users
  • Badges