Hadoop Streaming is an utility included with any Hadoop distribution that allows any executable program that can read from standard input and write to standard output to be used as the Mapper or Reducer of a MapReduce job.

Why use Hadoop Streaming?

There are 2 main reasons:

  1. It lets you write MapReduce applications in languages other than Java, so if you are not comfortable with Java this might be an option.
  2. Most importantly, it opens MapReduce execution to a large amount of libraries, written by really smart people, that are only available in other languages such as in Python with pip, Node.JS with npm, and Ruby with gems. Instead of porting the code to Java, you can simply run the libraries in the environment they were designed for.

Hadoop Streaming basics

The simplest example of a Hadoop Stream is using already available command line tools such as cat and wc. In this example, we are going to use a slightly modified version of Ulysses by James Joyce as the input file:

hdfs dfs put 4300-0.txt /tmp/ulyses.txt
yarn jar path/to/hadoop-streaming.jar -input /tmp/ulysses.txt -output /tmp/ulyses -mapper /bin/cat -reducer /usr/bin/wc

Which creates the output file /tmp/ulyses/part-00000 containing:

15416  271956 148471

From wc help you can see this correspond exactly with the newlines, words and byte count of the input file.

The requirements for this command are really simple:

  1. -input inputFilesOrDir: the input files
  2. -output OutputDir: the directory where to output the results
  3. -mapper /bin/cat: the mapper executable to call
  4. -reducer /usr/bin/wc: the reducer executable to call

By default, each line in the input files counts as a record, and the mapper formats each key/value pair in a line separated by a TAB. You can modify this behavior by specifying the -inputformat and -outputformat command options to use custom InputFormat and OutputFormat Java classes, respectively.

Hadoop Streaming using custom scripts

There is also the command line option -files, which consists of a comma-separated list of files and folder, that makes them available to the task nodes running the jobs as temp files.

You can exploit this option to pass custom scripts written in Javascript, Ruby, Python, etc. This is a really simple example using  mapper.js and reducer.js to Hadoop Streaming.

The mapper:

#mapper.js
var stdin = process.openStdin();
var stdout = process.stdout;
var input = '';

stdin.setEncoding('utf8');
stdin.on('data', function(data) {
  if (data) {
    input += data;
    while (input.match(/\r?\n/)) {
      input = RegExp.rightContext;
      proc(RegExp.leftContext);
    }
  }
});

stdin.on('end', function() {
  if (input) {
    proc(input);
  }
});

function proc(line) {
  var words = line.split(' ');
  stdout.write(words[8] + ',1\n');
}

The reducer:

#reducer.js
var stdin = process.openStdin();
var stdout = process.stdout;
var counter = {};
var input = '';

stdin.setEncoding('utf8');
stdin.setEncoding('utf8');
stdin.on('data', function(data) {
  if (data) {
    input += data;
    while (input.match(/\r?\n/)) {
      input = RegExp.rightContext;
      proc(RegExp.leftContext);
    }
  }
});

stdin.on('end', function() {
  if (input) proc(input);
  for (var k in counter) {
    stdout.write(k + ':' + counter[k] + '\n');
  }
});

function proc(line) {
  var words = line.split(',');
  var word = words[0];
  var count = parseInt(words[1]);
  if (!counter[word]) counter[word] = 1;
  else counter[word] += count;
}

The command:

yarn jar path/to/hadoop-streaming.jar -files mapper.js,reducer.js -input /tmp/ulyses.txt -output /tmp/ulyses-node -mapper 'node mapper.js' -reducer 'node reducer.js'

But you might run into problems if even a single node does not have the correct runtime installed. This situation might not be easy to solve if you have a cluster with a significant amount of nodes, or you don’t have privileged access to the filesystem, situations that occur in many cloud environments.

You can solve this by getting a binary distribution of Node.js and passing the binary as one of the files.

#Get the binary distribution
wget https://nodejs.org/dist/v4.4.7/node-v4.4.7-linux-x64.tar.xz

#Extract the content to a familiar location
tar -xvJf node-v4.4.7-linux-x64.tar.xz -C ./node --strip 1

#Pass the node executable as one of the files
yarn jar path/to/hadoop-streaming.jar \
-files mapper.js,reducer.js,node/bin/node \
-input /tmp/ulyses.txt -output /tmp/ulyses-node -mapper 'node mapper.js' -reducer 'node reducer.js'

These are the last few lines of the result of that command when we sort by count:

he:175
his:193
undefined:215
to:282
in:283
a:307
and:380
of:516
the:740

Using scripts with additional requirements

Although this makes Node.js available in every task node, this solution only provides the core modules/packages of Node.js, those that come precompiled with the executable.  If you are using Node.js, it’s probably because you want to use some of its custom packages.

There are two recommended options, depending on the number of extra packages that you are going to use:

  1. Install the packages one-by-one using npm install <package>.
  2. Install the packages listed as dependencies in a package.json file and then installing using npm install.

The only requirement is that you must install all the packages locally in the ./node_modules directory.

Now, instead of passing each file one by one, we are going to create a package that we are going to distribute to each of the nodes:

#Create the package containing the node executable,
#the node modules, and the mapper and reducer script
tar -cvzf package.tar.gz node/bin/node node_modules mapper.js reducer.js

You also need a couple of scripts to extract the package in each task node:

1. For the mapper_init.sh use the command:

#!/bin/sh

#Extract the package
tar -xvzf package.tar.gz

#Execute the mapper with the included node binary
./node mapper.js

2. For the reducer_init.sh use the command:

#!/bin/sh

#Extract the package
if [ ! -f node ]; then
tar -xvzf package.tar.gz
fi

#Execute the mapper with the included node binary
./node reducer.js

The only thing left to do is execute the streaming job passing the package and the two scripts with the command:

yarn jar path/to/hadoop-streaming.jar \
-files package.tar.gz,mapper_init.sh,reducer_init.sh \
-input inputFilesOrDir -output OutputDir \
-mapper 'sh mapper_init.sh' -reducer 'sh reducer_init.sh'

Using HDFS to distribute the environment

Now, lets put all the necessary files in HDFS and access them from there. The -files and -archives options can use hdfs://... addresses as well. We will also tell Node.js where to find its modules using the -cmdenv option to set the NODE_PATH variable.

Note: The -archives option extracts the package for you, so you don’t need an additional script to perform the extraction.

# Create a new package that will contain only Node.js and its dependencies
tar -czvf node.tar.gz node node_modules

# Put everything on HDFS
hdfs dfs -mkdir node-example/
hdfs dfs -put mapper.js reducer.js node.tar.gz node-example/

# Run job and specify the neccesary parameters
yarn jar path/to/hadoop-streaming.jar \
-D mapred.job.name=streaming_with_nodejs \ # We can also set Job conf using the -D option
-files hdfs://cluster/user/me/mapper.js,hdfs://cluster/user/me/reducer.js \ # Pass the mapper and reducer scripts
-archives hdfs://cluster/user/me/node-example/node.tar.gz#node \ # Pass the node package and set a symlink
-mapper "node/node mapper.js" -reducer "node/node reducer.js" \ # Call Node.js using the symlink
-cmdenv NODE_PATH=node/node_modules \ # Tell  Node.js where to find additional packages
-input inputFilesOrDir -output outputDir

With this approach the job is available to run from any machine with access to the Hadoop cluster without having the need to have the files in your local filesystem. The above command also accepts the -conf option that, as with traditional MapReduce applications, allows you to pass a configuration file to set different options for the job, instead of passing them one-by-one using the -D option.

You might ask, “Why don’t you put everything inside the package that we are delivering with the -archives option?”  You can, but separating the code from the framework makes the code more reusable. You can write another mapper and reducer and still use the same package that is already available for you inside HDFS.

You can take a similar approach by using Hadoop Streaming with Ruby or Python versions that do not come pre-installed with the system. The only thing to remember is that you need to provide everything the script needs to run, including transient dependencies.


References:

  1. Hadoop Streaming
  2. Examples of Node.js and Ruby scripts for mapper and reducer – GitHub Gist
  3. Node.js package for Hadoop Streaming Utils
  4. Blog about building a Hadoop Streaming scripts using Python

Join The Discussion

Your email address will not be published. Required fields are marked *