A fundamental task for a streaming application is to read data from an external source, and convert it to a structured format suitable for processing by the rest of the application. In Streams, this usually means using one of our adaptor operators (such as TCPSource or FileSource), which converts the data from an external format into tuples.

Reading from an external source—such as the network or filesystem—is often a performance bottleneck. When source operators are the performance bottleneck for a streaming application, we have a tendency to blame the reading from the external source. But, that is not always the case. Particularly for large tuples which have many attributes, the actual performance bottleneck can be parsing.

Consider the following tuple type from an SPL application:

type ManyAttributes = tuple <
    rstring str00, rstring str01, rstring str02, rstring str03, rstring str04,
    rstring str05, rstring str06, rstring str07, rstring str08, rstring str09,
    rstring str10, rstring str11, rstring str12, rstring str13, rstring str14,
    rstring str15, rstring str16, rstring str17, rstring str18, rstring str19,
    rstring str20, rstring str21, rstring str22, rstring str23, rstring str24,
    rstring str25, rstring str26, rstring str27, rstring str28, rstring str29,
    rstring str30, rstring str31, rstring str32, rstring str33, rstring str34,
    rstring str35, rstring str36, rstring str37, rstring str38, rstring str39,
    rstring str40, rstring str41, rstring str42, rstring str43, rstring str44,
    rstring str45, rstring str46, rstring str47, rstring str48, rstring str49,
    float64 num00, float64 num01, float64 num02, float64 num03, float64 num04,
    float64 num05, float64 num06, float64 num07, float64 num08, float64 num09,
    float64 num10, float64 num11, float64 num12, float64 num13, float64 num14,
    float64 num15, float64 num16, float64 num17, float64 num18, float64 num19,
    float64 num20, float64 num21, float64 num22, float64 num23, float64 num24,
    float64 num25, float64 num26, float64 num27, float64 num28, float64 num29,
    float64 num30, float64 num31, float64 num32, float64 num33, float64 num34,
    float64 num35, float64 num36, float64 num37, float64 num38, float64 num39,
    float64 num40, float64 num41, float64 num42, float64 num43, float64 num44,
    float64 num45, float64 num46, float64 num47, float64 num48, float64 num49
>;

The type ManyAttributes contains 100 attributes in total, with 50 rstring and 50 float64 attributes. While this tuple type is contrived, it is similar to tuples from the telecomunnications domain (representing call detail records) and from customer analytics (representing a customer profile).

Our goal is to read tuples of this type from a CSV file, demonstrate that it is the tuple parsing and creation that is the bottleneck, and improve its performance through parallelism.

Simple File Reading

Our first attempt uses the FileSource operator in the simplest way:

composite FileSourceOnly {
    graph
        stream<ManyAttributes> Parsed = FileSource() {
            param file: "data.csv";
                  format: csv;
                  parsing: fast;
        }

        () as Sink = CountProcessed(Parsed) {}
}

The operator CountProcessed is a simple sink that counts the number of tuples processed, and prints the total when the application is finished. This invocation of the FileSource operator does three things to produce a single tuple:

  1. Reads a line of text from the file data.csv.
  2. Tokenizes that line of text in CSV format.
  3. Constructs a tuple out of each field extracted from the line of CSV.

It is points 2 and 3 that are the expensive parts, as we will see in the next section. The average time to process a 1.4 GB file (1 million discrete lines with 100 fields each) over 5 runs using this method is about 70 seconds. This invocation of the FileSource is our performance baseline; we want to beat 70 seconds.

File Reading Only

We claimed that parsing and tuple creation are the most expensive parts of the FileSource invocation in FileSourceOnly. We can establish that by measuring how expensive it is to just read the file, but avoid parsing the CSV line and creating the tuples. Instead of invoking FileSource with the type ManyAttributes, we will invoke it with a tuple type that just contains a single rstring:

composite FileSourceString {
    graph
        stream<rstring line> Parsed = FileSource() {
            param file: "data.csv";
                  format: line;
                  parsing: fast;
        }

        () as Sink = CountProcessed(Parsed) {}
}

Technically, this invocation of FileSource still does do some parsing, but it does significantly less: it converts a single line of CSV text into a single tuple which contains a single line of text. This process is significantly less work than parsing out the full tuple of ManyAttributes, but we still have to process the entire file.

The time it takes this invocation of FileSource to process the same 1.4 GB file, averaged over 5 runs, is about 1.5 seconds. This is only about 2% of the time it takes to process the file and create ManyAttribute tuples, which is about 70 seconds. Because the only difference between these two invocations is the parsing and tuple creation, we can conclude that when we invoke the FileSource operator with ManyAttributes, no more than 2% of its total processing time is taken up by the actual file reading. Most of it, about 98%, is consumed by parsing and tuple creation.

What this also tells us is that if we can separate the file reading from the parsing and tuple creation, we have a great opportunity for improving performance.

The Parse Operator

The Parse operator from the Standard SPL Toolkit allows us to parse and create tuples in a separate operator. The Parse operator converts a stream of binary data into tuples. But, a problem we have is that the Parse operator is stateful. The Parse operator expects a stream of tuples which just contain blobs of data. The actual tuples it parses can go across the boundaries of an arbitrary number of tuples with blobs of data.

Why is this a potential problem for us? As the title of this post implies, we want to improve performance through parallelism. And in order to take advantage of parallelism, we will need to split our stream of tuples to multiple copies of Parse. We can get around the statefulness of Parse if we ensure that each tuple we send to it contains exactly one tuple’s worth of data. We can do this by using the FileSource which turns each line of CSV text into a tuple that contains just one rstring, converting that rstring into a blob, and then sending the blobs to Parse:

composite ParallelParse {
    graph
        stream<rstring line> Lines = FileSource() {
            param file: "data.csv";
                  format: line;
                  parsing: fast;
        }

        stream<blob bytes> LinesAsBytes = Functor(Lines) {
            output LinesAsBytes: bytes = convertToBlob(Lines.line + "\n");
        }

        @parallel(width=(int32)getSubmissionTimeValue("parallelism"))
        stream<ManyAttributes> Parsed = Parse(LinesAsBytes) {
            param format: csv;
                  parsing: fast;
        }

        () as Sink = CountProcessed(Parsed) {}
}

This code does the following things:

  1. Reads the file, line-by-line, producing one tuple for each line.
  2. Adds the end-line character, "\n", back onto the line, so that it can be correctly re-parsed.
  3. Converts the rstring that represents the line of the CSV file into a blob.
  4. Parses each blob, which represents exactly one line from the CSV file, into exactly one tuple.
  5. Performs the previous point in parallel, with the amount of parallelism controlled by the submission-time value "parallelism".

The only drawback with this approach is that the tuples will arrive at CountProcessed (or wherever else they need to go) out of order. That is, consumers of the Parsed stream may see tuples from later in the CSV file before they see tuples from earlier in the CSV file. You will need to evaluate your own application to determine if this out-of-order behavior is acceptable. The performance for this code depends on the value of "parallelism", so we present the results using a graph, instead of simply a single number:

parallel_parse_no_order

This performance graph also contains the prior two sequential results for comparison. (FileSourceOnly is our performance baseline; it’s the time we want to beat. FileSourceString is the cost of just reading the file; it’s the time we can’t beat.) Before we discuss these results, we should make a few notes about the experimental setup. The machine these experiments were executed on is a system that has 2 processors, each with 4 cores. Each core can support 2 threads each, which means that the operating systems sees 16 logical processors. However, as the system only has 8 physical cores, compute-intensive tasks will tend to not scale all the way to using all 16 logical processors. All SPL programs were run in standalone mode, which means that all parallelism was achieved using threads.

The first thing we can see looking at the data is that using 1 and 2 threads for ParallelParse is actually slower than FileSourceOnly. This result is due to the fact that we must copy the rstring into a blob. However, this conversion cost is fixed (that is, it does not scale with the number of parallel copies of Parse), which means that as we increase the parallelism, ParallelParse is eventually faster than FileSourceOnly. Once ParallelParse uses 8 threads, it takes about 26 seconds to complete, which is about 2.7 times faster than FileSourceOnly. Past 8 threads, scalability levels off, but this is likely due the limitations of our hardware, as explained above. With access to more parallel hardware, we could probably scale more.

Maintaining Order

If you do need the process the tuples in order, we can merge them so that the rest of the application only sees them in the same order that they were in the file. However, it does take a little more work. We introduce sequence numbers into each tuple, and we define a ParallelMerge operator that is responsible for putting tuples back into their proper order based on those sequence numbers:

composite ParallelMerge(input In; output Out) {
    param attribute $key;

    graph
        stream<In> Out = Custom(In) {
            logic
            state: {
                mutable map<uint64, tuple<In>> _tuples;
                mutable uint64 _next = 1;
            }

            onTuple In: {
                if (_next == $key) {
                    submit(In, Out);
                    ++_next;

                    while (_next in _tuples) {
                        submit(_tuples[_next], Out);
                        removeM(_tuples, _next);
                        ++_next;
                    }
                }
                else {
                    _tuples[$key] = In;
                }
            }
        }
}

composite ParallelParseOrdered {
    graph
        stream<rstring line> Lines = FileSource() {
            param file: "data.csv";
                  format: line;
                  parsing: fast;
        }

        stream<blob bytes> LinesAsBytes = Custom(Lines) {
            logic state: {
                mutable uint64 _seqnoGenerator = 0ul;
            }
            onTuple Lines: {
                ++_seqnoGenerator;
                submit({bytes=convertToBlob(rtrim(Lines.line, "\n") + 
                        "," + 
                        (rstring)_seqnoGenerator + 
                        "\n")}, LinesAsBytes);
            }
        }

        @parallel(width=(int32)getSubmissionTimeValue("parallelism"))
        stream<ManyAttributes, tuple<uint64 seqno>> Parsed = Parse(LinesAsBytes) {
            param format: csv;
                  parsing: fast;
        }

        stream<ManyAttributes, tuple<uint64 seqno>> Merged = ParallelMerge(Parsed) {
            param key: Parsed.seqno;
        }

        () as Sink = CountProcessed(Merged) {}
}

This code does some non-obvious things, which we will explain:

  1. First, we define a new composite operator, ParallelMerge. This operator puts tuples back into order, based on the key attribute provided to it when it is invoked.
  2. In the same operator we use to perform the rstring to blob conversion, we also maintain a sequence number. We will use this sequence number to remember what the order the lines of text were in the file. Every line gets a sequence number that is 1 more than the one before it.
  3. We insert this sequence number into the line of text, pretending it was part of the original CSV. (That’s what the unusual code inside the call to convertToBlob does: it removes the original end-line, adds a comma and the sequence number, then puts the end-line back in.)
  4. Parse our blobs in parallel as before, but now the tuple we produce is not just ManyAttributes, but ManyAttributes and a uint64 attribute named seqno.
  5. Pass our out-of-order tuples to ParallelMerge, telling it to use the seqno attribute to put our tuples back in order.

Our final performance graph contains all of the alternatives:

parallel_parse

At 8 threads, ParallelParseOrdered takes about 31.5 seconds to process the entire 1.4 GB file, compared to ParallelParse’s 26 seconds, which is about 17% slower. However, it is still about 2.2 times faster than FileSourceOnly.

Conclusion

By separating the reading of data from an external source from its parsing, we open the possibility to speedup the process through parallelism. In Streams, we can achieve this speedup through the Parse operator and parallelism. The stream programming model itself also aids us, because it makes it easy to decompose our problem into independent parts that we can parallelize easily.

The SPL source code and scripts used to run the experiments are publicly available at https://github.com/IBMStreams/samples/tree/master/Performance/ParallelParse.

1 comment on"Parallelized File Processing with the Parse Operator"

  1. Another option is to use AdaptiveParser – fast, stateless open source toolkit from IBMStreams github repository.

Join The Discussion