In a previous article, we discussed composites and the kinds of genericity they can have. A kind of genericity we did not cover is operator genericity, which is when at least one operator in a composite’s stream graph is passed in as a parameter.

Building on the example code from composite genericity, suppose we have the following application:

type ServiceMessage = rstring date, rstring hostname, 
                      rstring service, rstring message;

type FailureMessage = timestamp time, rstring uid, rstring euid, rstring tty,
                      rstring rhost, rstring user;

type Suspect = rstring rhost, rstring user;

composite FindSuspects {
    graph
        stream<rstring raw> RawMessages = FileSource() {
            param file: "/var/log/messages";    
                  format: line;
        }

        stream<ServiceMessage> ServiceMessages = ParseMessages(RawMessages) {}

        stream<ServiceMessage> RawFailures = Filter(ServiceMessages) {
            param filter: findFirst(service, "sshd", 0)!=-1 &&
                          findFirst(message, "authentication failure", 0) != -1;
        }

        stream<FailureMessages> Failures = ParseFailures(RawFailures) {}

        stream<Suspect> Suspects = Aggregate(Failures) {
            window Failures: tumbling, count(10), partitioned;
            param partitionBy: rhost;
            output Suspects: user = Any(user);
        }

        () as Sink = FileSink(Suspects) {
            param file: "suspects.txt";
        }
}

The purpose of this application is to find “suspect” remote hosts in a log file, where we define a suspect as any remote host from which 10 or more failed login attempts have originated. The steps the application takes to do this are:

  1. Reads raw log messages from a source.
  2. Parse the raw log messages into service-agnostic messages where the service related message remains unparsed.
  3. Finds all sshd service messages that also indicate an authentication error.
  4. Parse the message from the sshd service. (The definition of this operator, ParseFailures, is not shown, but it is similar to the definition for ParseMessages, which was shown in a previous article.)
  5. After establishing that a message is a failed login, we want to aggregate together all failed logins that come from the same remote host. When we receive 10 of them, we emit a tuple that contains the remote host, and the user who failed to login.
  6. Finally, we write our suspects to a file.

One problem with our application as it is written is that it assumes the source and the sink will always be on the filesystem. It’s easy to imagine wanting to perform this operation on data sources that come from the network, and wanting to report the results over the network. However, we don’t want to write another version of this composite that just has a different source and sink.

The solution to our problem are operator parameters:

composite FindSuspects {
    param operator $Source;
          operator $Sink;
    graph
        stream<rstring raw> RawMessages = $Source() {}

        stream<ServiceMessage> ServiceMessages = ParseMessages(RawMessages) {}

        stream<ServiceMessage> RawFailures = Filter(ServiceMessages) {
            param filter: findFirst(service, "sshd", 0)!=-1 &&
                          findFirst(message, "authentication failure", 0) != -1;
        }

        stream<FailureMessage> Failures = ParseFailures(RawFailures) {}

        stream<Suspect> Suspects = Aggregate(Failures) {
            window Failures: tumbling, count(10), partitioned;
            param partitionBy: rhost;
            output Suspects: user = Any(user);
        }

        () as Sink = $Sink(Suspects) {}
}

Now, the FindSuspects composite is operator generic in its source and sink. However, we have one problem: most source and sink adapters require parameters to configure where they should send or receive data from. There is no way to make these parameters fully generic. Of course, if we have, say, a file name parameter, we can always parameterize the value to that parameter. But we cannot parameterize the parameter itself.

Wrapping operator invocations in a composite solves this problem. Given the above definition of FindSuspects, we could invoke it with:

composite VarLogFile(output Out) {
    graph
        stream<rstring raw> Out = FileSource() {
            param file: "/var/log/messages";
                  format: line;
        }
}
composite SuspectsFile(input In) {
    graph
        () as Sink = FileSink(In) {
            param file: "suspects.txt";
        }
}
composite FindSuspectsFromFiles() {
    graph
        () as Sink = FindSuspects() {
            param Source: VarLogFile;
                  Sink: SuspectsFile;
        }
}

The composite FindSuspectsFromFiles would be invoked as the main composite for an application. The power of this technique comes from being able to define and provide different kinds of sources and sinks:

composite LogTCP(output Out) {
    graph
        stream<rstring raw> Out = TCPSource() {
            param role: client;
                  address: "logs.company.com";
                  port: 514u;
        }
}
composite SuspectsTCP(input In) {
    graph
        () as Sink = TCPSink(In) {
            param role: client;
                  address: "suspects.company.com";
                  port: 514u;
        }
}
composite FindSuspectsFromTCP() {
    graph
        () as Sink = FindSuspects() {
            param Source: LogTCP;
                  Sink: SuspectsTCP;
        }
}

When FindSuspectsFromTCP is used as the main composite for an application, then it retrieves log data from logs.company.com on port 514, and sends suspects to suspects.company.com on port 514. Of course, one could read data from a TCP source, yet still write to a file. And, more importantly, it’s easy to define new kinds of sources and sinks for FindSuspects, including operators that are not actually edge adaptors. Operator genericity affords SPL programmers with the power to abstract out the structure of their applications.

Join The Discussion