In my previous post, I covered general aspects of Hadoop and object stores. In this post, I’ll offer some insight on how MapReduce works with object stores, highlight potential issues related to object stores and Hadoop, and discuss ways to make this integration work better.

As I described in my previous post, Hadoop can work with object stores without having to copy data into a Hadoop Distributed File System (HDFS) cluster. Inspired by this, my team did a simple experiment with Apache Spark 2.0.1 and Hadoop 2.7.3. We persisted a collection of 9 elements as a data object in the object store.

val data = Array(1,2,3,4,5,6,7,8,9)
val myData = sc.parallelize(data,9)
myData.saveAsTextFile("s3a://mybucket/data.txt")

The following table shows the summary of HTTP requests that were sent.

  API GET HEAD PUT DELETE
Hadoop (s3a) S3 158 361 26 16

We were surprised to see such a high number of requests to the object store. Because we observed very similar results with other Hadoop connectors, we needed to better understand what caused the problem. After studying the problem, we concluded that Hadoop connectors are not optimized to work with object stores. The issues we discovered impact Apache Spark as well as the broader Hadoop ecosystem, including Hadoop MapReduce and Hive.

What causes the inefficiency?

The primary reasons why Hadoop connectors don’t work with object stores are that the standard output committers aren’t adapted for object stores, the cost of supporting FS shell operations is excessive, and the complexity of treating object stores as file systems.

Standard output committers are not adapted for object stores

Hadoop uses OutputCommitters to write data into underlying storage. To achieve fault tolerance, each MapReduce task first writes data into a temporary location. During the task commit phase, data is moved from the temporary location into the destination name. As previously explained, rename operations are not native object store operations; thus, the approach with temporary files seriously reduces the overall performance. For more information, check out the “Stocator – Fast Lane for Connecting Object Stores to Spark” blog post and the “Exabytes, elephants, objects and spark” blog post.

The excessive cost of supporting shell file systems operations

File System (FS) shell operations in Hadoop are crucial for the work with HDFS. Object stores, however, don’t need Hadoop FS shell operations and provide their own CLI tools, which are adapted to work with object stores.

Let’s look back to the example explaining that object stores have different semantics for directories

./bin/hadoop fs-mkdirs hdfs://myhdfs/a/b/c/

But this time, let’s see how it works.

The actual flow performs two loops. The first one to loop over “a/b/c/” for each entry verifies that the entry doesn’t exists as a file. Then it will loop over “a/b/c/” again and create all three directories recursively. While this approach is obvious in file systems, it’s inefficient for object stores and will create too many RESTfull calls that basically achieve nothing. Reviewing the existing Hadoop object store connectors, we observed that all of them had complex logic and used a lot of code to support shell operations.

How does all this affect analytic flows? This time, let’s go back to our example of persisting a collection of 9 elements and see how it works behind the scenes.

Each task is responsible to persist only its relevant data part. Thus, for example, task 0 is responsible for persisting its data as an object. However, it will initially create a temporary object.

/data.txt/_temporary/0/_temporary/attempt_201705040936_0000_m_000000_0/part-0000

During the task commit phase, this temporary object will be renamed to the final name.

/data.txt/part-0000

The main issue here is that Hadoop connectors will not distinguish between FS shell operations and analytic flows. They will treat the temporary object URI consisting of directory _temporary/0/_temporary/attempt_201705040936_0000_m_000000_0 and an object with name part-0000.

Creating _temporary/0/_temporary/attempt_201705040936_0000_m_000000_0 recursively will generate a large number of HEAD, GET, and PUT requests. And this will happen for each task. Next, it will also remove them recursively and this will produce a large number of DELETE requests. Without supporting FS shell operations, existing connectors could achieve the same result with a single PUT request.

The important observation here is that analytic flows don’t need shell-like operations at all, and the code that enables them significantly reduces the overall performance of the Hadoop connectors.

Complexity of treating object stores as HDFS

To move from HDFS to object stores, you might need to brush up some of your skills and perhaps change some existing scripts that were designed for file systems. For example, consider a script that creates a directory, uploads some data into HDFS, runs a word count application, and deletes the data from HDFS after the job is complete. This script is perfectly suited for HDFS, but doesn’t make sense for object stores.

First, there is no need to create a directory to upload an object. And if the object is already uploaded, there is no point in deleting it, since object storage is not a temporary storage solution. Object store is not a file system and this means making some changes to existing scripts to use object store efficiently.

Our solution: Stocator

So, what can be done? The good news is that we have a solution. Stocator is a connector for object stores. Stocator was developed in IBM Research and released to the open source under Apache license 2.0.

Stocator is explicitly designed for object stores and has very a different architecture from the existing Hadoop connectors. It doesn’t depend on the Hadoop modules and interacts directly with object stores. Stocator addresses the limitations we described here. For more information about Stocator, visit the project page.

Where to find Stocator?

IBM Data Science Experience is based on Stocator. Stocator enables Spark in IBM’s Data Science Experience (DSX) to access IBM Cloud Object Storage more efficiently than native Hadoop connectors. Running the same example again we get the following results comparing Stocator with Hadoop connector:

  API GET HEAD PUT DELETE
Hadoop (s3a) S3 158 361 26 16
Stocator S3 1 2 11 0

Stocator in DSX can be used to analyze large amounts of data. In fact, the SETI is using it to process large data volumes. I’ll be discussing the SETI use case with Graham Mackintosh in our presentation “Very large data files, object stores, and deep learning – lessons learned while looking for signs of extra-terrestrial life” at the Spark Summit, SF 2017.


I also invite you to attend the talk I will present with Trent-Gray Donald on “Hadoop and object stores: Can we do it better?” at Hadoop Strata 2017, 22-25 May, London.

And of course, I invite you to join IBM DSX and be part of that exciting experience of using Stocator while working on Spark and data stored in IBM Cloud Object Storage.

Join The Discussion

Your email address will not be published. Required fields are marked *