This post demonstrates how to write C++ native functions to add functionality to Streams. As a motivating example, we’ll create Streams functions for AES encryption and decryption via the OpenSSL library.

Operators or Functions?

When we need to wrap a library so that we can use it from SPL, there are two options. One option is to add a new primitive operator. But an alternate choice is to add a native function. A native function is an SPL function where the code is written in C++ or Java. If the library you want to wrap can be naturally invoked by a single function call, then native functions are probably the more flexible approach. This post just covers C++ native functions; Java native functions are done quite differently.

For the case of encryption and decryption, we don’t need code generation, we don’t need to maintain any state, and the only parameter is the key, so we are going to use native functions. Here’s the prototype of these functions in SPL (remember that¬†blob¬†is the SPL type used to represent binary data):

blob aesencrypt(blob key, blob plain);
blob aesdecrypt(blob key, blob cipher);

In the code below, we’ll use the two functions in a¬†Functor, which makes the¬†Functor¬†work like an encryption or decryption operator.

composite Encrypt  {
    param expression<blob> $key;
    graph
        stream<rstring line> Lines = FileSource() {
            param file: "plain_in.csv";
        }

        stream<blob encrypted> Cypher = Functor(Lines) {
            // Take the rstring to a blob, and then send it to encrypt.
            output Encrypted: encrypted = aesencrypt($key, convertToBlob(line));
        }

        () as sink = FileSink(Cypher) {
            param file: "cipher.csv";
        }
}

composite Decrypt {
    param expression<blob> $key;
    graph
        stream<blob encryptedLine> Cypher = FileSource() {
            param file: "cipher.csv";
        }

        stream<rstring line> Plain = Functor(Cypher) {
            // The output is a blob, so we need to convert it to a rstring.
            output Plain: line = convertFromBlob(aesdecrypt($key, encryptedLine));
        }

        () as sink = FileSink(Plain) {
            param file: "plain_out.csv";
        }
}

Doing the Encryption in C++

First, here is the C++ code to invoke the OpenSSL functions for encryption or decryption, depending on the value of encrypt. It takes the data in input (a pointer to unsigned char), assumed to be of length len (which does not have to be a multiple of the block size), and encrypts it or decrypts it and puts the result in output, and the length in outputLen. When encrypting, the output size can be bigger than the input size (because it pads the input to reach a multiple of the block size), when decrypting, the output size may be smaller than the input size, because it removes padding adding during encryption.

void crypto(const unsigned char * key, const unsigned char * input, int len, unsigned char* output,int * outputLen, bool encrypt) {
    EVP_CIPHER_CTX ctx;
    EVP_CIPHER_CTX_init(&ctx);
    EVP_CipherInit_ex(&ctx,EVP_aes_128_cbc(),NULL,key,NULL,encrypt ? 1 :0);
    int tmpLen = 0;

    if (!EVP_CipherUpdate(&ctx,output, &tmpLen,input, len)) {
        EVP_CIPHER_CTX_cleanup(&ctx);
        *outputLen = 0;
    return;
    }

    *outputLen = tmpLen;
    if (!EVP_CipherFinal_ex(&ctx,output+tmpLen, &tmpLen)) {
        outputLen = 0;
    return;
    }

    *outputLen += tmpLen;
    EVP_CIPHER_CTX_cleanup(&ctx);
}

Around that basic encryption functionality, we add a layer to convert from the SPL data types to the C++ data types and back. The input arrives to us not as char pointers, but as type blob, and our output is also type blob. For efficiency, we pass by reference, and since they are not mutable, we use const.

SPL::blob aesencrypt(const SPL::blob &key, const SPL::blob& input_data) {
    SPLAPPTRC(L_INFO,"Encrypt input size " << input_data.getSize(), "opensslwrapper");

    int outputLen = 0;
    const int len = input_data.getSize();
    const size_t maxOutputLen = len % BYTES_PER_BLOCK == 0 ? len/BYTES_PER_BLOCK : len/BYTES_PER_BLOCK+1;
    // make a big enough output buffer
    unsigned char output[maxOutputLen];
    crypto(key.getData(), input_data.getData(), input_data.getSize(), &(output[0]), &outputLen, true);
    SPL::blob myblob(output, outputLen);

    SPLAPPTRC(L_INFO,"Encrypt output size " << outputLen, "opensslwrapper");
    return myblob;
}

Decrypt is similar. We put crypto, aesencrypt and aesdecrypt into opensslwrapper.h.

Creating a toolkit

In this section, I’ll walk you through creating these native functions. I assume you’re working via StreamStudio, though that is not a requirement for creating new native functions. If you are planning to use the native functions in different applications, it’s best to put them in a toolkit and then you can use the toolkit from different applications.

In order for the SPL compiler to know use the native functions, you need to create a¬†function.xml¬†file in the¬†native.function¬†directory of the namespace where your functions should reside. This file contains information the compiler needs about your functions‚ÄĒthe SPL name, the C++ name (if different from the SPL name), the argument types, the return type, what header file needs to be included, and where any libraries needed by these functions can be found.

Creating the header file

We put the header file containing the encryption code in the¬†impl/include¬†directory of the toolkit. For this example, I’m putting all the code in the header file. In the last section, I’ll describe what you need to do if you don’t want to put the implementation in your header file.

Creating the function.xml

The easiest way to create a function.xml file is via StreamsStudio. From your project, create a new C++ native function:
CppNativeFunction

Now, we need to add prototypes for our native function. After you click on the¬†C++ Native Function, you’ll get a dialog that lets you fill in arguments and description of your function.

CreateNewCppNativeFunction

But you can also do this later, by editing your function.xml file and choosing to add a new function prototype. You can edit the xml with your favorite text editor, or you can use StreamsStudio:

NewFunction

Next, we need to tell Streams where it can find the header file needed for the C++ implementation of these functions. We create a library element in the function.xml.

NativeFunctionLibrary

Under the library, add the location of the include file to your include path. Paths in the function.xml are relative to the function.xml itself, so the path is ../../impl/include.

This is also where you’d add to the library path and list any libraries that are needed during linking. In this case, the library is already being included during the compile, so we don’t need to do anything.

Using the toolkit

The NativeFunctions sample on GitHub contains the com.ibm.streamsx.aes toolkit, which has aesencrypt and aesdecrypt. It also contains CryptoTest, which has the Encrypt and Decrypt composites.

You can run the Encrypt composite to encrypt the plain_in.txt data file to cipher.txt. Then you can run Decrypt to decrypt cipher.txt to plain_out.txt.

Separating implementation from header file

In the example above, I put all the code into the header file. If your wrapper code is simple, this is a good way to get started using the functions. Once you create the header file and make the modifications to function.xml, no additional steps are needed.

However, in most cases you will eventually need to separate the implementation from the the header file. You will definitely need to do this if you don’t want to distribute your source code. The other reason you might want to do this separation is if you want to use the native functions in two different operators in the same PE (or in a standalone). Our example just uses one of the functions in one operator, so we didn’t run into this problem. But if we had both encrypt and decrypt functor in the standalone, we would have gotten an error.

To demonstrate this, I’ve written the same two native functions, but separating the code from the implementation, in¬†com.ibm.streamsx.aesv2

When you separate the source and the header, you can substantially simplify the header. Here’s the entire header for our running example:

#ifndef OPENSSLWRAPPER_H
#define OPENSSLWARPPER_H
#include <SPL/Runtime/Type/Blob.h>

namespace openssl_for_spl {
SPL::blob aesencrypt(const SPL::blob &key, const SPL::blob& input_data);
SPL::blob aesdecrypt(const SPL::blob& key, const SPL::blob &input_data);
}
#endif

Pointing Streams to the library

When the header doesn’t contain the implementation, you’ll need to supply a library containing the compiled functions. Usually this library is placed in¬†impl/lib¬†of the toolkit. You must point your¬†function.xml¬†to both the library’s name and its location. To do this, you can either manually edit the¬†function.xml¬†file, or you can edit the library element to give the library name (without the lib prefix or .so suffix):
library

You also need to add to the lib path element to say where the library can be found:
libpath

Building the library

If the library you have is already compiled for you, you can just place it in¬†impl/lib¬†and be done. But if you are keeping the source code in the toolkit, the usual place to do this is in¬†impl/src. Streams will not build this into a library for you, so you’ll have to supply a Makefile to do that yourself. Here’s the core part of the¬†Makefile¬†for aesv2:

all: lib/libopensslwrapper.so

lib/libopensslwrapper.so: bin/opensslwrapper.o lib
        gcc --shared -o lib/libopensslwrapper.so bin/opensslwrapper.o

bin/opensslwrapper.o: src/opensslwrapper.cpp include/opensslwrapper.h bin
        @$(CXX) -O3 -Wall -fPIC -I include  $(SPL_COMPILE_OPTIONS) -c $< -o $@

You will have to manually make the library before running any application that uses functions in the library.

For more information on native functions in Streams, visit the documentation.

Join The Discussion