In a previous article, we looked at the practice of wrapping Custom operators in composites. We observed that the result was not generic:

type ServiceMessage = rstring date, rstring hostname,
                      rstring service, rstring message;

composite ParseMessages(input In; output Out) {
    graph
        stream<ServiceMessage> Out = Custom(In) {
            logic onTuple In: {
            list tokens = tokenize(raw, " ", false);
            submit({date = flatten(tokens[0:2]),
                    hostname = tokens[3],
                    service = tokens[4],
                    message = flatten(tokens[5:])}, Out);
            }
        }
}

Technically, however, the above code does have some type genericity, just not much. We will explore what that genericity is in a moment, but first let’s go over the kinds of genericity there are in SPL, and which ones can apply to a composite operator.

Operators that can handle any number of input and output ports are port number generic. Composites cannot be port number generic; composites must define the exact number of input and output ports they provide. Primitive operators, however, can be port number generic. In our example, ParseMessages defines one input and one output port.

Composites can be type generic, which means that they can handle streams of any type. ParseMessages is not type generic, because the type of the stream Out is fully specified to have the type ServiceMessage.

The type for In, however, is partially type generic. The type is not fully specified, although we have made one assumption about it: that it contains an attribute named raw that is of type rstring. In the example application we previously developed, that attribute was the only attribute in the stream type, but that does not need to be true in general. For example, we could invoke ParseMessages in this way:

composite Example {
    type AugmentedRawMessage = timestamp processedTime, rstring networkName,
                               rstring raw;
    graph
        stream<AugmentedRawMessage> Augmented = AugmentedLogSource() {
            param file: '/var/log/messages';
        }
        stream<ServiceMessage> Messages = ParseMessages(Augmented) {}
}

Even though ParseMessages does not know about the attributes processedTime and networkName, it can still handle the type AugmentedRawMessage on its input stream because it has an rstring attribute named raw.

However, we can still make ParseMessages attribute generic. We can do this by modifying the composite to take an attribute as a parameter:

type ServiceMessage = rstring date, rstring hostname,
                      rstring service, rstring message;

composite ParseMessages(input In; output Out) {
    param attribute $line;
    graph
        stream<ServiceMessage> Out = Custom(In) {
            logic onTuple In: {
                list tokens = tokenize($line, " ", false);
                submit({date = flatten(tokens[0:2]),
                        hostname = tokens[3],
                        service = tokens[4],
                        message = flatten(tokens[5:])}, Out);
            }
        }
}

Composites that take attributes as parameters are attribute generic because they make no assumptions about an attribute’s name. The type of the attribute, however, cannot be generic. In the above version of ParseMessages, the attribute we provide upon invocation must have type rstring, like this example:

composite Example {
    type AugmentedRawMessage = timestamp processedTime, rstring networkName,
                               rstring raw;
    graph
        stream<AugmentedRawMessage> Augmented = AugmentedLogSource() {
            param file: '/var/log/messages';
        }
        stream<ServiceMessage> Messages = ParseMessages(Augmented) {
            param line: Augmented.raw;
        }
}

If we tried to provide an attribute that was not an rstring (such as processedTime), the compiler would raise an error the first time it tried to use the inappropriately typed attribute.

Using a similar idea, we can still make ParseMessages even more generic. While we want to ensure that the output stream has the specific attributes date, hostname, service and message, there is no reason for us to limit the output stream’s type to those attributes. However, because we fully specified the type name, we have forced that to be the case. We can remove that restriction by not fully specifying the type:

composite ParseServiceMessages(input In; output Out) {
    param attribute $line;
          type $otype;
    graph
        stream<$otype> Out = Custom(In) {
            logic onTuple In: {
                list tokens = tokenize($line, " ", false);
                mutable $otype outTuple = {};
                outTuple.date = flatten(tokens[0:2]);
                outTuple.hostname = tokens[3];
                outTuple.service = tokens[4];
                outTuple.message = flatten(tokens[5:]);
                submit(outTuple, Out);
            }
        }
}

Note that we can no longer create a tuple literal of our output tuple type – creating a tuple literal requires knowing the full type of a tuple, but we want to remain partially type generic. To do so, we only require the out the output’s stream type contains date, hostname, service, message and that they are type rstring. We invoke this composite in this way:

composite Example {
    type AugmentedRawMessage = timestamp processedTime, rstring networkName,
                               rstring raw;
    AugmentedServiceMessages = tuple, ServiceMessage;
    graph
       stream<AugmentedRawMessage> Augmented = AugmentedLogSource() {
           param file: '/var/log/messages';
       }
       stream<AugmentedServiceMessage> Messages = ParseMessages(Augmented) {
           param line: Augmented.raw;
           otype: AugmentedServiceMessage;
       }
}

The remaining kind of genericity is operator genericity, which is possible when a composite takes an operator as its parameter. We will cover operator genericity in a future post.

Join The Discussion