In IBM Streams 4.2 and newer, there are two features designed to make it easier to attain high performance: automatic submission time fusion and the automatic threading model.
Automatic submission time fusion means that you can achieve a reasonable number of PEs for your system without having to manually fuse operators.
The automatic threading model means that you can take advantage of multiple cores per host, without having to manually place threaded ports in your application.
This post explains what these features are, how to use them, and how they relate to each other.
Note: Read the post Manually Optimizing Streams Applications, to see how to manually perform fusion and threading optimizations to improve your application’s performance.
Automatic Submission Time Fusion
In Streams, operators execute at runtime inside of Processing Elements, or PEs. Every PE in the application is a separate running process. The act of grouping operators into PEs is called fusion. PEs can execute on separate hosts, allowing us to take advantage of clusters. But, operators that communicate across PEs suffer from greater communication cost. We have a classic tradeoff: we want multiple PEs in the application so we can take advantage of all of the available hosts, but too many PEs causes unnecessary communication overhead. In general, we want to have roughly one PE per host.
Prior to Streams 4.2, PE fusion was performed manually through the partitionColocation
, partitionExlocation
and partitionIsolation
options for the operator placement configs. Operators without a partition config were isolated into their own PE by default. All fusion occurred at compile time.
In contrast, Streams 4.2 automatically fuses operators into PEs for you at submission time. Because this fusion occurs at submission time, the system can tailor the fusion to the available hosts in the instance. For example, if you submit an application to an instance with 20 hosts, you are likely to get a different fusion than if you submit the same application to an instance with 1 host. Note that this difference does not require recompiling the application.
This fusion happens automatically for you, by default. However, you can change its behavior through the new fusionScheme
application configuration option. The default value is automatic
, which will target one PE per host, as described above. If you specify manual
, then you can use the additional fusionTargetPeCount
option to manually control how many PEs you get. For example, the following job submission requests 5 PEs:
streamtool submitjob -C fusionScheme=manual -C fusionTargetPeCount=5 myApp.sab
Note that the actual number of PEs may be more or less than 5. Streams has to obey a variety of application and instance constraints which are independent of the target PE count.
If you wish to use the old fusion behavior, then specify the legacy
option:
streamtool submitjob -C fusionScheme=legacy myApp.sab
When under automatic
and manual
fusion, you can also influence how Streams fuses parallel regions as specified by the @parallel
annotation. By default, parallel regions are treated no differently than the rest of the application. However, through the fusionType
option, you can change the fusion behavior for parallel regions. You can specify channelIsolation
to make sure that only operators in the same parallel channel are fused together. Or you can specify channelExlocation
to allow operators from outside of the parallel region to be fused with operators from within the parallel region, but disallows operators from different parallel channels to be fused together.
For more on automatic submission time fusion, see the product documentation for specifying fusion for a single application, and for specifying the default fusion for all applications in an instance or domain.
Automatic Threading
In a PE, when a thread executes an operator, it will also execute all downstream operators until it hits another thread or a PE output port. There are several sources of threads inside of a PE. Source operators always introduce at least one thread. Some operators, such as ThreadedSplit
, introduce their own threads (along with a queue per thread; tuples are copied into the queue). PE input ports also introduce a thread which pulls tuples from the network, and then executes downstream operators. Parallel regions, through the @parallel
annotation, introduce threads if the beginning of the parallel region is in the same PE as its immediately upstream operators. Finally, threaded ports are a means for programmers to manually introduce threads into an application. A threaded port inserts a new thread and a queue in front of the input port it’s associated with. The upstream thread copies tuples into the threaded port queue, and the downstream thread pulls tuples from that queue and executes the downstream operator.
Prior to Streams 4.2, threaded ports were the primary way for you to introduce threads into a PE that were not already there. In general, you want to place them in front of very expensive operators. In practice, however, it can be tricky to find exactly where to place threaded ports in large applications. And manually adding threaded ports can’t adapt when you submit the same application to different hosts.
In contrast, PEs in Streams 4.2 will automatically determine how many threads to use based on the host they are currently running on, and which numbers of threads yield the best performance. You can change this behavior through threading models, which were also introduced in Streams 4.2. You can control threading models both through SPL itself, and through changing submission time options.
Specifying Threading Models in SPL
In SPL, you can apply the @threading
annotation to composite operators to control the threading model that applies to the set of operators in that composite. (If you apply @threading
to the main composite, it will determine the threading model for the entire application.) The new default is automatic
, which actually chooses between dynamic
and manual
when the PE launches. The following application requests the dynamic
threading model:
@threading(model=dynamic)
composite Main {
graph
stream Src = Beacon() {}
stream Res = ManyOperators(Src) {}
() as Sink = Custom(Res) {}
}
The dynamic
threading model maintains a global thread pool, and any thread can execute any operator. The manual
threading model is the old behavior, where developers have to manually introduce new threads. When under the automatic
threading model, the PE looks at properties of the system (such as the number of available logical cores) and properties of the PE (such as the number of operators and the number of threads the PE must already have) to determine if dynamic
or manual
is best. In general, the PE will favor dynamic
when the number of operators exceeds the number of logical cores, and it will favor manual
when the number of existing threads is already a significant fraction of the available logical cores on the system. Finally, the dedicated
threading model puts threaded ports in-between all operators in the PE. The name comes from the fact that each operator input port is executed by a dedicated thread.
All of the threading models have valid uses. The manual
threading model has the lowest latency, as there are no queues and no extra tuple copying into those queues. But, it is single threaded and is unable to take advantage of multicore hosts without your intervention. The dedicated
threading model can take advantage of multicore hosts, but it may incur more latency because it must copy tuples into the threaded port queues, and the number of threads increases with the number of operators in the PE. Using the dedicated
threading model is only appropriate when you know that each PE will have fewer operators than the number of available logical cores on the system it’s running on. The dynamic
threading model can take advantage of multiple cores by default, automatically find the appropriate number of threads, but it may incur more latency than manual
because it also copies tuples into queues, and more synchronization overhead than dedicated
. The default, automatic
, uses information available at runtime to try to choose the best option between manual
and dynamic
. However, because you may know your application’s needs better than Streams, the @threading
annotation allows you to change the threading model.
You can apply the @threading
annotation to both composite operator definitions and invocations. If a composite operator invocation has the annotation, and its definition has the annotation, then the PE uses the threading model requested at the invocation site. The @threading
annotation can also nest, so that you can specialized different regions of your application with different threading models. The following application specifies a different threading model for the ingress operators than for the rest of the operators:
@threading(model=dynamic)
composite Main {
graph
@threading(model=manual)
stream Src = Ingress() {}
stream Res = Compute(Src) {}
() as Sink = Egress(Res) {}
}
By default, the dynamic
threading model is elastic. At runtime, the PE will explore using different numbers of threads, all while recording the total throughput achieved. The PE will settle on the number of threads that yields the highest overall throughput. You can override this behavior by turning elasticity off and providing a static number of threads. For example, the following turns off elasticity for the entire application, and sets the number of threads in each PE to 4:
@threading(model=dynamic, elastic=false, threads=4)
composite Main {
graph
stream Src = Beacon() {}
stream Res = ManyOperators(Src) {}
() as Sink = Custom(Res) {}
}
All threading models specified through the @threading
annotation will take effect in both distributed and standalone modes. For more on threading models, see the product documentation on the @threading
annotation in SPL.
Specifying Threading Models at Submission Time
At submission time, you can override the threading models specified in the SPL application. The threadingModel
application configuration option takes the same values as the SPL annotation: automatic
, manual
, dynamic
and dedicated
. For example, the following specifies the threading model for the whole application to be manual
:
streamtool submitjob -C threadingModel=manual myApp.sab
Note that the threading model specified at submission time overrides any threading model specified by the @threading
annotation in SPL. This allows you to experiment with different threading models without having to recompile your application. Also note that unlike the @threading
annotation, you can only change the threading model for the entire application. For the example above, all PEs in the application will use the manual
threading model; there is no way to specify a different threading model for different sets of operators at submission time.
As with the @threading
annotation, there are also ways to control elasticity and the number of threads at submission time. The following example chooses the dynamic
threading model, turns off elasticity and sets the number of threads to 4:
streamtool submitjob -C threadingModel=dynamic -C dynamicThreadingElastic=false -C dynamicThreadingThreadCount=4 myApp.sab
You can also change the threading model for all applications in an instance by setting instance.threadingModel
. The following example sets the manual
threading model for all applications in the StreamsInst
instance:
streamtool setproperty instance.threadingModel=manual -i StreamsInst
As with changing the threading model for a single application, changing the threading model at the instance level will override any @threading
annotations in the SPL application. The principle here is that the last person to have influence over the operators determines what threading model applies. Considering the @threading
annotation and the submission time properties, the precedence rules which determine the actual threading model at runtime are (lower values supersede higher values):
- Application-level submission time
threadingModel
.
- Instance-level submission time
instance.threadingModel
.
@threading
on a composite operator’s invocation.
@threading
on a composite operator’s definition.
- Streams default of
automatic
.
Automatic Submission Time Fusion and Automatic Threading
These two features are independent but complementary. So far, we have discussed them in isolation of each other as using one does not require knowledge of the other. This was a deliberate design decision on our part: it greatly simplifies their interfaces. We also designed them so that if you changed one but not the other, the other would still do a reasonable thing. For example, if you change your fusion so that you get many more PEs, automatic threading is likely to use less threads.
For fusion, the available options are:
manual
: You specify how many PEs you want through the fusionTargetPeCount
option.
automatic
: The default, which automatically determines the appropriate number of PEs for your application at submission time.
legacy
: Reverts back to the pre-4.2 fusion behavior where operators are in their own PE by default, and are only fused together when explicitly requested.
For threading models, the available options are:
manual
: You have to manually add threads; best for low-latency and if you know exactly where threads should go.
dynamic
: The PE automatically adds threads, and any thread can execute any operator; best for high throughput and when you’re unsure where threads should go and how many are best.
dedicated
: A threaded port in front of every operator input port in the PE; best for high throughput when you know the number of operators in the PE will be less than the number of available logical cores on the system.
automatic
: The default, which chooses between manual
and dynamic
at PE startup.
In the post Optimizing Streams Applications, we presented a set of slides that walks you through how to perform fusion and threading optimizations to improve your application’s performance. This post explains how Streams 4.2 now performs that fusion and threading automatically, and how to modify the default behavior. If you choose legacy
fusion mode and the manual
threading model, then the old lessons still apply for manually optimizing your application. But the point of these new features is that most of the lessons are now applied automatically by Streams (including compiling with optimizations on).
We chose the defaults for both features knowing the default behavior of the other. Submission time fusion defaults to automatically making about one PE per host. We chose this default because if two operators exist on the same host, it’s more efficient for them to communicate from inside of the same PE than across PEs. But, before Streams 4.2, using very few PEs and not adding threaded ports in the right places could greatly limit parallelism and overall throughput. As a consequence, we made automatic threading the default, so that applications which consist of a small number of PEs with a large number of operators will still take advantage of the available parallelism.
We had two goals with these features. First, we wanted to achieve good performance by default by automatically fusing and threading applications based on the systems they were deployed to. Second, we wanted to provide you with controls that allow you to use the default as a baseline, and then make small changes to that baseline to further improve performance. While you can pay attention to just fusion or just threading for incremental performance gains, fully optimizing your application will require taking into account both fusion and threading. Streams 4.2 makes this process easier by providing direct controls that allow you to specify the number of PEs in your application, and how threads will execute operators.
Related Links
See the
Streams Knowledge Center for more information.
#CloudPakforDataGroup