This example illustrates how to create a non-generic C++ primitive operator which uses a sliding window to filter data received on its input port.

The implementation demonstrates the following techniques:

  • The operator code generator template (*.cgt) uses the pointer to implementation (pimpl) idiom to separate it from the implementation. The cgt files are reduced to a minimal skeleton, while the actual implementation resides entirely in a separate class.¬† The implementation class is syntactically complete (no operator generation prologue and epilogue pragmas), so you can use standard C/C++ tools to develop the entire operator logic.
  • The implementation is a C++ class template, with parameters for the wrapping operator instance, the input and the output tuple types.¬† The C++ compiler instantiates the template using the actual operator and tuple types generated by the Streams compiler.
  • The implementation uses a sliding window for storing the time series values on which the filter is applied.¬† The window uses count-based trigger and tuple eviction policies.
  • Implementation is header-only.¬†You don’t need to build a separate library to hold the operator logic.

The operator provides tuples to a signal smoothing filter which implements a Savitzky-Golay algorithm. The most recent N attribute values, which are stored in the sliding window, are treated with a set of convolution coefficients to calculate the filter output value on each tuple insert. The operator supports the following parameter:

  • length – specifies the number of correlation coefficients used by the filter. The number of coefficients must be an odd number in the range [5, …11].

Pointer to implementation

Pimpl (pointer to implementation) is a widely used C++ language idiom. You use pimpl usually when you define C++ interfaces; by encapsulating implementation details in a separate “impl” class, a stable interface defined by the header file remains unchanged as the implementation may evolve through extensive changes.

So how does this help us? Using pimpl in a Streams operator allows you to separate the code generator templates from the actual operator implementation. In the implementation class there are no operator generation prologue and epilogue pragmas, so you could use the full power of your preferred C/C++ IDE for source code navigation, refactoring, code analysis, etc.

For example, the operator header code generator template (*_h.cgt) may look like the following:

class MY_OPERATOR : public MY_BASE_OPERATOR
{
public:
  MY_OPERATOR();
  virtual ~MY_OPERATOR() {}
  void process(Tuple const & tuple, uint32_t port);
private:
  // Forward definition and pointer to the implementation
  class impl;
  std::unique_ptr<impl> impl_;
};

The operator implementation code generator template (*_cpp.cgt) file contains the definition of MY_OPERATOR::impl class. In our example, this class simply extends the tuple transformation class template. We pass a reference to MY_OPERATOR to its constructor in order to provide access to the operator API.

// Implementation class
class MY_OPERATOR::impl : public my_class<
    MY_OPERATOR, IPort0Type, OPort0Type> {
public:
  typedef my_class<MY_OPERATOR, IPort0Type, OPort0Type> impl_base;
  impl(MY_OPERATOR& op, ... /* other arguments here */) : impl_base(op, ...) {}
};

// The operator constructor instantiates the implementation
MY_OPERATOR::MY_OPERATOR()
{   impl_.reset(new impl(... )); }

// Operator methods forward to the implementation
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{   impl_->process(tuple, port); }

Note that inlining MY_OPERATOR methods and destructor will not work because they need to ‚Äúunderstand‚ÄĚ the ‚Äúimpl‚ÄĚ class declaration. If you really need to inline your process() method, then you need to break the pimpl encapsulation by including the definition of your class in the *_h.cgt file and avoid the use of the intermediate MY_OPERATOR::impl class.

 

C++ class template

In our example, the tuple transformation logic is encapsulated in the SGFilter class. We’d like to avoid using the reflection-based API to inspect tuples, and tuple types are not known until the SPL compiler generates the code, so we define our class as a C++ template:

template <typename OperatorType, typename TupleInType, typename TupleOutType>
class SGFilter : public ::SPL::WindowEvent<TupleInType> { . . . };

The template parameters are:

  • OperatorType – the actual SPL operator wrapping this instance
  • TupleInType – type of tuples received by the operator on input port 0.
  • TupleOutType – type of tuples submitted on the operator output port 0.

By using C++ templates, you can reuse the same implementation code in multiple operator definitions, with various input and output types. In our example, the data on which the filter is applied has a fixed type and is located in tuple attribute 0. You could extend the example by using additional operator parameters to make the data type and position configurable.

 

Sliding window

The SGFilter class creates a sliding window which holds the last N input values. The window size is defined by a count eviction policy configured to coincide with the number of filter coefficients and a trigger policy with a count of 1, so the window gets triggered on every tuple insert. In the constructor, the SGFilter registers itself as a handler for the initialFull and trigger events:

template <typename OperatorType, typename TupleInType, typename TupleOutType>
class SGFilter : public ::SPL::WindowEvent<TupleInType> {
  OperatorType& op_;             // Reference to the wrapping operator instance
  SlidingWindowType window_;     // Window which holds incoming tuples
  bool windowFull_;               // Set to true when the window fills for the first time

public:
  // Type of window used by this filter
  typedef typename ::SPL::SlidingWindow<TupleInType> SlidingWindowType;

  SGFilter(OperatorType& op, size_t filterLength, const char* traceCategories) :
    op_(op),
    window_(op, 0,
      SPL::CountWindowPolicy(filterLength),
      SPL::CountWindowPolicy(1)),
    windowFull_(false)
  {
    window_.registerOnWindowTriggerHandler(this);
    window_.registerOnWindowInitialFullHandler(this);
  }
  . . .
};

On receiving the initialFull event, the handler sets the windowFull_ boolean which indicates that from now on the filter should calculate output values. On every subsequent trigger event, the filter correlates the window contents with the filter coefficients and submits the result on output port 0. A WindowMarker is submitted after each value.  Here is a simplified version of the trigger event handler:

void onWindowTriggerEvent(WindowType & window, PartitionType const & partition)
{
  typename WindowType::DataType & data = window.getWindowData(partition);
  typename WindowType::DataType::const_iterator it;
  size_t i = 0;
  float result = 0.F;

  // Correlate window with filter coefficients
  for (it = data.begin(); it != data.end(); it++, i++) {
    TupleInType const & tuple = *it;
    double val = (double)tuple.getAttributeValue(0);
    result += val * coefficients_[i];
  }

  // Submit the result
  TupleOutType otuple(result);
  op_.submit(otuple, 0);
  op_.submit(::SPL::Punctuation::WindowMarker, 0);
}

Header-only implementation

The SGFilter operator model contains the path to the implementation include files.

<library>
  <cmn:description>SGFilter (header-only)</cmn:description>
  <cmn:managedLibrary>
    <cmn:includePath>../../impl/include</cmn:includePath>
  </cmn:managedLibrary>
</library>

Our simple example was designed as a header-only implementation. Since we use C++ templates, we have to include the class definition in a header, as the compiler needs to know the full definition of the template in order to instantiate. A direct benefit is a simpler build process: no extra steps are needed for building a separate library and package it with the operator. Even though the operator logic is separated from the *.cgt files, the Makefile contains only SPL build commands.

Resources

Join The Discussion