As discussed in this post, Processing tuples at-least-once in Infosphere Streams V4 with consistent regions, consistent region feature allows us to guaranteed that tuples are processed at least once by a Streams application.  You can identify a subgraph in a Streams application to be consistent using the @consistent annotation.  If any application failure is detected within a consistent region, the Streams runtime will reset the application back to the last consistent state (a.k.a. checkpoint), and direct the application data sources to replay tuples from that point on.

It is important to note for a subgraph to be marked as consistent, each of the primitive operators in that subgraph needs to implement additional code to support consistent region.  This has been done with the operators in the Standard SPL Toolkit and the Specialized Toolkits from Streams.  If you have developed your own toolkits, you will need to do the same before you can take advantage of the consistent region feature.  This document is a development guide to show how you can enable consistent region in your own operators.

Refer to this documentation on how to add consistent region support to primitive operators:  Writing Operators that Support Consistent Regions

Requirements to Support Consistent Region

To support consistent region, each of the operators need to be able to do the following:

Drain->Checkpoint->Reset Sequence

To be able to participate in a consistent region, each of the operators need to be able to support the basic drain->checkpoint->reset sequence.  These are callbacks from the Streams runtime:

  • On Drain: The operator needs to submit any pending tuples. ¬†If your operator has a buffer for submitting tuples, or writing to external system, the buffer must be drained, so there are no pending tuples left in the operator.
  • On Checkpoint: ¬†If the operator has any internal state, it needs to serialize it to the checkpointing backend.
  • On Reset: ¬†This is called after an¬†application failure is detected in a consistent region. ¬†The operator reads the persisted internal state from the checkpoint, and restores it to the state it had when the checkpoint was taken. ¬†Tuples will be replayed after all of the operators have successfully reset to the checkpoint states.
  • On ResetToInitialState: ¬†This is a special callback that gets called when an application failure is detected before the first checkpoint is established. ¬†In this case, all operators in the consistent region will be told to reset back to its initial states as if the operator has just started up. ¬†Tuples will be replayed from the beginning after all of the operators¬†have¬†successfully reset to its initial states.

Tuple Replay

If the operator is a source operator, the operator needs to be able to replay tuples after a successful reset.

Step 1:  Understanding Your Operator

As a first step to add consistent region support, you need review your operators in detail and look for the following:

Operator States

Make sure you understand what internal states your operator maintains. ¬†In general, these are the things that affect operators’ state:

  1. internal states Рwhat internal states does the operator maintain (e.g.member variables), and is it changed in the lifetime of the operator.  It is important to find states that can change after initialize as the operator is run.  For states that do not change after initialization, it is ok to ignore.
  2. external states РIf your operator interfaces with external systems, it is important to understand how the operator manipulates external states and whether the external states can be reset when the reset method is called.  Some key questions to ask:
    1. Does the operator affect external state?
    2. Can the external state change during the lifetime of the operator after initialization?
    3. Can we persist and restore / recreate external state?
    4. Can changes to external state be rolled back?

Control Ports

Control ports are typically used to manipulate operator states without having to restart your Streams Application.  Therefore, it is important to review your control port implementation and see how internal / external states can be altered by control ports.

Window

If your operator supports windows, you need to persist and restore the windows on checkpoint and reset.  There are special APIs to help you do this.

Threading

If your operator can spawn off background threads, and submits are done in these background threads, then you may need to add use permits ( special locks provided by our API) in your operators.  The permits are required for suspending the tuple flow when needed and also making sure that operator states are modified in an atomic fashion.

Step 2:  Can my operator participate in a consistent region?

Next, you need to determine if the operator can participate in a consistent region, and at what capacity:

  1. Can the operator participate in a consistent region?
  2. Can the operator be the start of a consistent region?  An operator can only be at the start of a consistent region if it is able to replay tuples.

Most operators should be able to participate in a consistent, except when:

  1. The operator interfaces with external systems or models that it cannot control.  In some cases, the operator may rely on some external libraries or models that it has no way of persisting and recreating.  Because the operator cannot roll back to the checkpointed states, the operator (i) cannot participate in a consistent region or (ii) the operator participates in the consistent region but the external system may see duplicate data.
  2. Source operators cannot replay tuples.  If your source operators reads data from an external system that it cannot reread the data after a reset (e.g. JMSSource operator reading off a queue from a messaging server), the operator cannot participate in a consistent region.

Follow these best practices if your operator cannot participate in a consistent region:

  1. If your operator cannot participate in a consistent region, add a compile time check to detect that the operator is in a consistent region, and generate a compile error.
  2. If your operator cannot be at the start of a consistent region, add a compile time check to detect that it is at the start of a consistent region, and generate a compile error.

Please note that it is common for operators to “partially” support consistent region. ¬†One example is an operator that can participate in a consistent region most of the time, except when a specific parameter is used. ¬†You can also add compile time checks to detect such scenarios.

To add compile checks for C++ operators use the ConsistentRegionContext available from the Perl code generation API:

<%
     ### Consistent region ERROR message ###
     my $crContext = $model->getContext()->getOptionalContext("ConsistentRegion");
     if($crContext && $crContext->isStartOfRegion()) {
         SPL::CodeGen::exitln("The following operator cannot be the start of a consistent region: XMSSink");
     }
%>

To add compile checks for Java operators use the ConsistentRegionContext available from the Java runtime API:

@ContextCheck(compile = true)
public static void checkInConsistentRegion(OperatorContextChecker checker) {
    ConsistentRegionContext consistentRegionContext = checker.getOperatorContext().getOptionalContext(ConsistentRegionContext.class); 

    if(consistentRegionContext != null) {
        checker.setInvalidContext("The following operator cannot be in a consistent region: JMSSource", new String[] {});
    }
}

 Step 3: Set up an instance with checkpoint backend

To submit a job with consistent region enabled, you need a Streams instance with checkpointing enabled.  Streams support two checkpoint backends:

  1. For file checkpointing backend, follow this guide to create the Streams instance:  https://developer.ibm.com/streamsdev/docs/using-streams-studio-develop-applications-consistent-regions/
  2. For Redis checkpointing backend, follow this guide on how to set up a Streams instance:  https://developer.ibm.com/streamsdev/docs/submit-consisten-region-applicaton-using-redis-checkpoint-store/

It is recommended to use the file system backend when testing. The Redis backend is preferred for application deployment.

 Step 4:  Implement StateHandler APIs in operator

For a C++ operator, follow this guide:  Consistent Region РToolkit Development Guide for C++ Operator

For a Java operator, follow this guide: Consistent Region – Toolkit Development Guide for Java Operator

Join The Discussion