Some operators can be implemented directly in SPL using Custom operators, rather than defining them as native operators in C++ or Java. Implementing logic using Custom operators is typically suitable for operations that do not need to call out to pre-existing libraries, and whose logic operates on, and can be fully expressed with, SPL data types. Writing Custom operators requires less code and simplifies development as there is no need to switch to another language and write an operator model.

For example, consider parsing system messages from /var/log/messages on a Linux system. A typical example looks like the following:

Feb 13 04:02:07 monza syslogd 1.4.1: restart.
Feb 13 04:03:08 monza sshd(pam_unix)[16637]: authentication failure; tty=ssh ruser= rhost=123.176.40.122
Feb 13 04:03:12 monza sshd(pam_unix)[16639]: check pass; user unknown
Feb 13 04:03:12 monza sshd(pam_unix)[16639]: authentication failure; tty=ssh ruser= rhost=123.176.40.122
Feb 13 04:03:17 monza sshd(pam_unix)[16641]: check pass; user unknown
Feb 13 04:03:17 monza sshd(pam_unix)[16641]: authentication failure; tty=ssh ruser= rhost=123.176.40.122
Feb 13 04:03:22 monza sshd(pam_unix)[16643]: authentication failure; tty=ssh ruser= rhost=123.176.40.122  user=root

While there is no formal grammar, the structure of a message is:

<date> <hostname> <service>: <message>

We would like to write an SPL operator that parses such messages. The input tuples will contain a single string, which contains a single message. The output will be a tuple where each entry in our informal grammar above is its own string attribute:

rstring flatten(list<rstring> lst)
{
    mutable rstring str = "";
    for (rstring e in lst) {
        str += e + " ";
    }
    return str;
}

composite Example {
    type ServiceMessage = rstring date, rstring hostname, rstring service, rstring message;

    graph
        stream<rstring raw> RawMessages = FileSource() {
            param file: '/var/log/messages';
                  format: line;
        }
        stream<ServiceMessage> ServiceMessages = Custom(RawMessages) {
            logic onTuple RawMessages: {
                    list<rstring> tokens = tokenize(raw, " ", false);
                    submit({date = flatten(tokens[0:2]),
                            hostname = tokens[3],
                            service = tokens[4],
                            message = flatten(tokens[5:])},
                           ServiceMessages);
                }
        }
}

Our example application reads messages directly from /var/log/messages line by line. To parse these messages, we separate the raw line into tokens separated by spaces. From there, we can associate indices with attributes. For the date attribute, we know that entries 0, 1 and 2 are part of the date, so we combine them (effectively un-tokenizing them). The message itself can have any number of tokens separate by spaces, but we know that it must start at index 5.

Implementing this logic directly in SPL with a Custom operator is easier than dropping down to C++ or Java. However, as written, there is a problem: we cannot reuse this operator. If we wanted to parse messages in the same way in this or in another application, we would need to copy this code. It exists only inside of this Custom operator; there is no way to “name” this custom logic.

Wrapping Custom operators in composites solves this problem. By making a Custom operator invocation the only part of a composite’s stream graph, we can use the composite operator as a way to “name” that logic. For example:


type ServiceMessage = rstring date, rstring hostname,
    rstring service, rstring message;

composite ParseMessages(input In; output Out) {
    graph
        stream<ServiceMessage> Out = Custom(In) {
            logic onTuple In: {
               list<rstring> tokens = tokenize(raw, " ", false);
               submit({date = flatten(tokens[0:2]),
                   hostname = tokens[3],
                   service = tokens[4],
                   message = flatten(tokens[5:])},
                   Out);
            }
       }
}

Note that the ParseMessages composite knows about the ServiceMessage type. Because of this fact, our composite is not type generic.

We can now use ParseMessages in our application:


composite Example {
    graph
        stream<rstring raw> RawMessages = FileSource() {
            param file: '/var/log/messages';
            format: line;
        }
        stream<ServiceMessage> ServiceMessages = ParseMessages(RawMessages) {}
}

The operator ParseMessages can now be reused elsewhere in the application, and we can place it in toolkits so that other applications can make use of it.

Join The Discussion