This document describes how create a source stream with the Java Application API so that we can read files from HDFS. The document first shows how to generate the source stream, and then shows how to build a reader that takes the input filename from a stream.

The Java Application API allows you to build applications entirely in Java; IBM Streams is used as the distributed runtime. If you haven’t seen the Java Application API before, start with the Java Application API Introduction.¬†Here’s the development guide: ¬†Java Application API Development Guide.

Designing the HDFS File Reader

This post will build an HDFS file reader that reads text files line-by-line. Let’s say you want to use the Java Application API to extract hash tags or key words from twitter messages. Your first step is creating a stream of Strings where each String is a line from the input file. Recall that using the Java Application API, you can generate one stream from another by calling transform, but in this case we need to generate a stream without another stream. For that, we call a method on the Topology object.

The Topology class has several methods for creating source streams depending on the kind of source you need. To create a general TStream from a topology object, the options are:

  • source(Supplier<Iterable<T>> data) — A source that generate a finite number of tuples. The tuples are generated from the Iterable returned by the get method of data.
  • endlessSource(Supplier<T> data) — A source that never stops supplying tuples. The tuples are generated by calling the get method on data repeatedly.
  • endlessSourceN(Function<java.lang.Long,T> data) — A source that never stops generating tuples. The tuples are produced by calling the apply method of data on 0,1,2,3,…
  • limitedSource(Supplier<T>,long count) — A source that generates only a fixed number of tuples. Tuples are generated by calling the get method of data count times.
  • limitedSourceN(Function<Long,T>,long count) — A source that generates only count tuples, where the tuples are generated by calling the apply method of data on 0,1,…,count-1.

Since a file is not endless, and we don’t know beforehand how many lines it has, the most natural fit in our case is the first option, source(Supplier<Iterable<T>> data). Let’s call our reader ReadHDFSFile. It must implement Supplier<Iterable<String>>–so that when the Suppliers get method is called, it returns an Iterable that can be used to get an Iterator that returns lines of the file. Here’s what using it will look like:


Topology flow = new Topology();
TStream<String> lines = flow.source(new ReadSingleHDFSFile("myfilename")); 

An iterator for HDFS Files

To make the code above work, HDFSFileReader needs to implement get() and return an iterable for the given file. An iterable is just a class that returns an iterator. Let’s dive down and write that iterator class, since it is where the work is. (The code I give below is available for download from www.github.com/IBMStreams/samples.)

It has to support two methods: hasNext() and next(). The function hasNext() should return true if there’s another line of the file to be read. The function next() should return the next line of the file. Here it is:

public class HDFSFileReadIterator implements Iterator<String> {

    final String filename; 
    BufferedReader reader; 
    boolean inputDone=false;


    public HDFSFileReadIterator(String file) {
        filename = file;
        inputDone=false;
                Configuration conf = new Configuration();
                // Need to get the location of HDFS
        conf.addResource(new Path("/opt/ibm/biginsights/hadoop-conf/core-site.xml"));

        System.out.println("Configuration created");
        FileSystem fSystem;
        try {
            //Configuration.dumpConfiguration(conf, new PrintWriter(System.out));
            fSystem = FileSystem.get(conf);
            if (fSystem == null) {
                System.out.println("Problem getting FileSystem");
            }
            reader = new BufferedReader( new InputStreamReader (fSystem.open(new Path(filename))));
            System.out.println("Reader successfully initialized");
        } catch (IOException e) {
            System.out.println("Caught IOException ");
            e.printStackTrace();
            reader = null;
        }
    }

    @Override
    public boolean hasNext() {
        return !inputDone;
    }

    @Override
    public String next() {

        try {
            String line = reader.readLine();
            if (line == null) {
                // File is done, let's cleanup.
                reader.close();
                reader = null;
                inputDone = true;
                                // nulls are ignored, so it's safe to return this.
                return null;
            }
            else {
                return line;
            }
        }
        catch (IOException e) {
            inputDone = true;
            e.printStackTrace();
            return null;
        }
    }
       // skipping the remove method.
}

Now let’s create the Iterable that will create the Iterator.

public class HDFSFileIterable implements Iterable<String> {

    final String filename;
    public HDFSFileIterable(String file) {
        filename  = file;
    }
    @Override
    public Iterator<String> iterator() {
        return new HDFSFileReadIterator(filename);
    }

}

Finally, let’s build HDFSFileReader(). It just needs to return an Iterable when get is called.

public class HDFSFileReader implements Supplier<Iterable<String>>    
   private static final long serialVersionUID = 1L;
   private final String filename;

   public HDFSFileReader(String filename) {
         this.filename = filename;
   }

  @Override
   public Iterable<String> get() {
      return new HDFSFileIterable(filename);
   }
}

The Application

Now, let’s build the application:

Topology flow = new Topology();
TStream<String> lines = flow.source(new HDFSFileReader("myfilename"));
lines.print();
// also have to set dependencies, see github for details.

And we’re done! We’ve build a class for creating source streams for HDFS files.

multiTransform as an alternative to source

When I got to this point, I celebrated by starting some testing. That’s when I realized I’d never wanted a source at all. For timing measurements, I usually re-read the same source file multiple times. With this source operator, I couldn’t do that. Also, there was no convenient way to read multiple files without creating a TStream for each file. What if I wanted to read all the files in a directory?

What I really wanted was an HDFS reader that gets the input filename from a TStream. Then it could interpret each item on the stream as a filename, open the file, and then produce a String for each line of that file. This would give me the flexibility I needed.

We cannot use transform to do this, since with transform allows at most one output per input, and in this case we have many lines per filename. Instead, we need to use multiTransform. The class passed to multiTransform needs to implement Function<String,Iterable<String>>–that is, an object with an apply method that takes the filename as a String and gives an Iterable that can be used to get an Iterator over the lines in the file.

We’d use it like this:

TStream<String> filenames = // get this stream somehow, for example by flow.strings("myfilename");
TStream<String> lines = filenames.multiTransform(new HDFSFileReader());

To make ReadHDFSFile work this way, we need just a few minor additions. We have to add constructor without arguments, add implements Function<String,Iterable<T>> to the declaration, and create the apply method in HDFSFileReader:

   @Override
    public Iterable<String> apply(String filename) {
        return new HDFSFileIterable(filename);
    }

Now, we can use HDFSFileReader either to create a source stream or we can use as an operator that takes its filenames from another stream, whichever works better for that application.

The lesson here is that building the Iterable is the core work. Once you have the Iterable, you can use it either a multiTransform or with a source. Moreover, with lambda syntax from Java 8, you can cleanly skip creating HDFSFileReader class at all, and just use HDFSFileIterable in both cases.

Running the sample

The code for this is available on samples repository on GitHub. The main method contains both the source version and the multitransform version. In order for the project to build, you will need to adjust the classpath to point to your copies of the required libraries.

The example can run with either BigInsights or hadoop, but the defaults are set so that it expects BigInsights installed in /opt/ibm/biginsights. If your BigInsights is in a different location, or if you are running hadoop without BigInsights, you can change hadoopDir and hadoopConfDir (lines 16 and 19 in ReadAndPrint.java). It’s also possible to run on a host that isn’t running hadoop at all, as long as the host has access to a hadoop cluster. To do that, you’ll need to copy the core-site.xml file from your hadoop config directory to somewhere on your local host, and change hadoopConfDir to point to the local directory containing core-site.xml. You’ll also need to copy the jars mentioned in
addHdfsDependencies to the local host and adjust those lines to point to the now-local jars.

Join The Discussion