Up to Streams 4.0.1, a processing element (PE) gracefully shuts down when any of its operator throws an exception. Many times, this behavior is desired, as the SPL runtime cannot guarantee that the operator will have a correct state after throwing an exception. Other times, it can cause unnecessary shutdowns and tuple loss when processing a single problematic tuple out of millions.

In applications with consistent regions, this problematic tuple will cause the region to reset multiple times and eventually reach the maximum number of reset attempts.

Primitive operators can always be written to do fine-grained exception handling and enable operator-level configurations. One such example is the FileSource operator and its parsing parameter. This operator enables users to disable error checks using the permissive value.

Custom operators, and primitive operators with custom logic or output clauses had no such option. Once an exception would be thrown, the PE would shutdown and restart. To avoid such shutdown, developers would have to write expensive error checks or write a new primitive operator that does exception handling. Another alternative was to use the ExceptionCatcher operator available in the TEDA toolkit. This operator is very handy in many use cases, but forces developers to be aware of the threading model of SPL applications. For example, it must be placed in the input port of PEs, or as the first operator in a threaded port chain.

In Streams 4.1.0, SPL has a new annotation named @catch, which automatically adds try and catch statements for tuple processing based on the operator instance configuration. The annotation must be configured to indicate what types of exceptions are to be caught during runtime. The exceptions can be thrown by a primitive operator or by native functions. With @catch, one can configure if the stack trace and the tuple causing the exception should be traced to PE logs. By default, stack traces are printed. Due to privacy, tuples are only printed if explicitly indicated. Use the streamtool ‘getapplicationlog’ command to obtain PE logs.

The @catch annotation applies only to the annotated operator and is independent of the threading model of the application.  For example, assume an application with a pipeline of operators A, B and C all fused in the same PE. If B is annotated with @catch, and its process() method or onTuple clause throw an exception, the exception is caught. If B submits a tuple to C, and C ends up throwing an exception, B does not catch such exception and the PE ends up terminating. If exceptions thrown by C must also be caught, one can annotate operator C directly.

When using @catch, one must still understand if catching the specified exception is safe. It is strongly recommended for primitive and Custom operator developers to use functions that may throw exceptions before attempting to do any state change. This is because the next tuple will be processed with the current operator state. If the state was corrupted by the tuple causing the exception, the next tuple will do computations using such corrupted state.

Configuring the @catch annotation

@catch has the following parameters:

  1. exception={none, streams, std, all} – indicates the types of exceptions that must be caughts.  This parameter is mandatory. The parameter can have one of the following values:
    •  none – no exception is caught
    • streams  – Streams exceptions are thrown by the standard toolkit native functions, C++ exceptions derived from SPL::RuntimeException, and Java exceptions derived from com.ibm.streams.operator.DataException
    • std – standard exceptions include all Streams exceptions, C++ exceptions derived from std::exception and all Java checked exceptions deriving from java.lang.Exception
    • all – all exceptions include all standard exceptions, plus any C++ thrown exception and all Java checked and unchecked exceptions deriving from java.lang.Exception. 
  2. tupleTrace – enables the tracing of the offending tuple into the Streams trace facility. It is disabled by default.
  3. stackTrace – enables the printout of the stack trace into the Streams trace facility. Is it enabled by default.

The code below shows an example of @catch. In this example, the application injects a tuple with an invalid timestamp (id=5), as to force an exception to be thrown by the timeStringToTimestamp() native function.  When @catch is not used, the application throws an exception after processing tuple with id=5 and finishes. In distributed mode, the exception causes a PE restart.

When @catch is used, the application correctly finishes after processing all tuples. In distributed mode, the PE continues to run after processing the invalid tuple.

composite Main {
  graph
    stream<uint32 id> Beat = Beacon() {
      param
        iterations: 20;
      output Beat: id = (uint32) IterationCount();
    }
 
    stream<uint32 id, rstring date, rstring time> ValidStringTimestamps = Custom(Beat) {
      logic
        state: { rstring d = "02-NOV-2015"; }
        onTuple Beat: {
          uint32 second = id % 60u;
          mutable rstring t = "13:18:" + formatNumber((int64) second, 2u)
            + ".000";
          submit({id = id, date = d, time = t}, ValidStringTimestamps);
        }
    }

    stream<ValidStringTimestamps> ValidAndInvalidTimestamps = Custom(ValidStringTimestamps) {
      logic
        onTuple ValidStringTimestamps: {
          if (id == 5u) {
            submit({id = id, date = date, time = "AA:123:BB"}, ValidAndInvalidTimestamps);
          } else {
            submit(ValidStringTimestamps, ValidAndInvalidTimestamps);
          }
        }
    }

    @catch(exception=streams, tupleTrace=true, stackTrace=false)
    stream<uint32 id, timestamp time> ValidTimestamps = Custom(ValidAndInvalidTimestamps as T) {
      logic
        onTuple T: {
          timestamp t = timeStringToTimestamp(T.date, T.time, false);
          submit({id = id, time = t}, ValidTimestamps);
        }
    }

    () as Nil = FileSink(ValidTimestamps) {
      param
        file: "data.out";
    }
}

Another example of @catch is available in the Streams install sample directory (CatchException.spl).

2 Comments on "Catching exceptions in SPL"

  1. […] 26, 2015] Catching Exceptions in SPL [Apr 24, 2015] InfoSphere Streams 4.0 Developer's Conference Demo [Apr 23, 2015] InfoSphere Streams […]

Join The Discussion