Code can fight systemic racism. This Black History Month, let's rewrite the wrong. Get involved

Basic data interchange techniques

Big data and SQL

“Big data” is a term that has been used regularly now for almost a decade. And it, along with technologies such as NoSQL, are seen as the replacements for the long successful RDBMS solutions that use SQL. Today, DB2®, Oracle, Microsoft® SQL Server MySQL, and PostgreSQL dominate the SQL space and still make up a considerable proportion of the overall market. Big data and the database systems and services that go along with it have become additional cogs in the gears of modern systems. But how do you integrate your existing SQL-based data stores with Hadoop so you can take advantage of the different technologies when you need them? Let’s examine the basic architectural aspects of exchanging information and basic techniques for performing data interchange.

Data and querying considerations

The most important consideration when exchanging information between SQL and Hadoop is the data format of the information. The format should be driven entirely from the perspective of the information and the reason it is being exported.

Simply exporting your data and then importing it into Hadoop doesn’t solve any problems. You need to know exactly what you are importing, why you are importing it, and what you expect to get out of the process.

Before we look at the specifics of why you are exchanging the data to begin with, first consider the nature of the data exchange. Is it one-way? Or is it two-way?

One-way data exchange — from SQL to Hadoop, or Hadoop to SQL — is practical in situations where the data is being transported to take advantage of the query functionality, and the source is not the companion database solution. For example, pure-textual data, or the raw results of a computational or analysis program, might be stored in Hadoop, processed with MapReduce, and stored in SQL (see Figure 1).

Figure 1. Hadoop to SQL translations
Hadoop to SQL translations

The reverse is less common, where information is extracted from SQL into Hadoop, but it can be used to process SQL-based content that provides a lot of textual content, such as blogs, forums, CRM and other systems (see Figure 2).

Figure 2. SQL to Hadoop translations
SQL to Hadoop translations

Two-way data exchange is more common and provides the best of both worlds in terms of the data exchange and data processing (see Figure 3).

Figure 3. Bidirectional translations
Bidirectional translations

Although there are many examples, the most common one is to take large, linear datasets and text datasets from the SQL and convert that into summarized information that can be processed by a Hadoop cluster. The summarized information can then be imported back into your SQL store. This is particularly useful where the large dataset would take too long to process within an SQL query. An example would be a large corpus of review scores or word/term counts.

As a rule, there are three primary reasons for interfacing between SQL and Hadoop:

  1. Exporting for storage— Hadoop provides a practical solution for storing large amounts of infrequently used data in a format that can be queried, processed, and extracted. For example: Usage logs, access logs, and error information are all practical for insertion into a Hadoop cluster to take advantage of the HDFS architecture. A secondary feature of this type of export is that the information can later be processed or parsed and converted into a format that can be used again.
  2. Exporting for analysis— Two common cases are exporting for reimport to SQL and exporting the analysis output to be used directly in your application (analyzing and storing the result in JSON, for example). Hadoop provides the advantage here by allowing for distributed large-scale processing of information rather than the single-table host processing provided in SQL. With the analysis route, the original information is generally kept, but the analysis process is used to provide summary or statistical bases that work alongside the original data.
  3. Exporting for processing— Processing-based exports are designed to take the original raw source information, process and reduce or simplify it, then store that information back to replace the original data. This type of exchange is most commonly used where the source information has been captured, but the raw original information is no longer required. For example, logging data of various forms can be easily resolved into a simpler structure, either by looking for specific event types or by summarizing the data to counts of specific errors or occurrences. The raw data is often not required here. Reducing that data through Hadoop and loading the summary stats back, saves time and makes the content easier to query.

With these basic principles in mind, let’s look at the techniques for a basic data exchange between SQL and Hadoop.

Exporting data from SQL

When exporting from SQL, the biggest consideration is the format of the information that you generate. Because Hadoop is not a tabular database, it makes sense to choose a flexible format for data that will processed in Hadoop. One option is the CSV format if you want to work with pure tabular information, but you can also use raw text with suitable separators or identifiers.

For complex structures, it can make more sense to output information in a structure that allows for easy separation and distribution. An example would be generating record data as JSON and exporting blocks of data, for example 10,000 records per file. Using a flexible encapsulation format like JSON solves many of the data interchange headaches.

Using a standard dump or query export

Most SQL databases and interfaces have a method for exporting data in specific formats. For example, within MySQL you can create a CSV file using the command line, as shown in Listing 1.

Listing 1. Creating a CSV file using the command line

SELECT title, subtitle, servings, description into OUTFILE  'result.csv' 

In DB2 the same solution exists (see Listing 2).

Listing 2. Creating a CSV file in DB2

servings, description FROM recipes;

The resulting file can be loaded straight into Hadoop through HDFS. Generating the same output with a simple script in Perl, Python, or Ruby is as straightforward.

Writing a custom program

Depending upon the dataset, using a custom application to export the data may be more practical. This is true particularly with structured data where the information you want to output is based on the content of multiple tables and structures.

In general, the easiest method is to take your structured data, agree on an output format or structure (so it can be parsed within Hadoop), and then dump that information out.

For example, when processing recipe data to look for common themes and threads, you can use the internal tool to load the recipe record, include the ingredients, description, method, and other data, then use the constructed recipe object to output the information for processing in Hadoop, storing each recipe as a JSON object (see Listing 3).

Listing 3. Exporting complex data

use JSON;
use Foodware;
use Foodware::Public;
use Foodware::Recipe;

my $fw = Foodware‑>new();

my $recipes = $fw‑>{_dbh}‑>get_generic_multi('recipe','recipeid',
    { active => 1});

my $js = new JSON;

foreach my $recipeid (keys %{$recipes})
    my $recipe = new Foodware::Recipe($fw,$recipeid,{ measgroup => 'Metric',
                                                      tempgroup => 'C',});

    my $id = $recipe‑>{title};
    $id =~ s/[ ',()]//g;
    my $record = {
        _id => $id,
        title => $recipe‑>{title},
        subtitle => $recipe‑>{subtitle},
        servings => $recipe‑>{servings},
        cooktime => $recipe‑>{metadata_bytag}‑>{totalcooktime},
        preptime => $recipe‑>{metadata_bytag}‑>{totalpreptime},
        totaltime => $recipe‑>{metadata_bytag}‑>{totaltime},
        keywords => [keys %{$recipe‑>{keywordbytext}} ],
        method => $recipe‑>{method},
        ingredients => $recipe‑>{ingredients},
        comments => $recipe‑>{comments},

    foreach my $ingred (@{$recipe‑>{ingredients}})
                 meastext => $ingred‑>{'measuretext'},
                 ingredient => $ingred‑>{'ingredonly'},
                 ingredtext => $ingred‑>{'ingredtext'},

    print to_json($record),"\n";

The data is exported to a file that contains the recipe data (see Listing 4).

Listing 4. File containing the recipe data

   "_id" : "WarmpotatotunaandCheshiresalad",
   "comments" : null,
   "preptime" : "20",
   "servings" : "4",
   "keywords" : [
      "meal type@salads",
      "main ingredient@fish",
      "convenience@add bread for complete meal",
   "subtitle" : "A change from cold salads...",
   "totaltime" : "35",
   "cooktime" : "15",
   "ingredients" : 
         "scaled_fromqty" : 100,
         "_error_ingredid" : 1,

The result can be loaded directly into HDFS and processed by a suitable MapReduce job to extract the information required. One benefit of this structured approach is that it enables you to perform any requiring preprocessing on the output, including structuring the information in a format you can use within your Hadoop MapReduce infrastructure.

The phrase “importing into Hadoop” really means you simply need to copy the information into HDFS for it to be available (see Listing 5).

Listing 5. Copying the information into HDFS

$ hdfs dfs mkdir recipes
$ hdfs dfs ‑copyFromLocal recipes.json recipes

Once the files are copied in, they can be used by your Hadoop MapReduce jobs as required.

For better flexibility within HDFS, the output can be chunked into multiple files, and those files can be loaded. Depending upon your use case and processing requirements, extracting the data into individual files (one per notional record) may be more efficient for the distributed processing.

Using Sqoop to move data

Sqoop is an additional tool for Hadoop that connects to an existing database using a JDBC driver and imports tables or databases from the source JDBC connection directly into HDFS. For the vast majority of imports where raw data from the SQL tables is being imported into Hadoop verbatim without processing, Sqoop offers the simplest and most efficient process for moving the data. For example, all of the tables within a single database can be loaded using Listing 6.

Listing 6. Loading all tables within a single database

$ sqoop import‑all‑tables ‑‑connect jdbc:mysql:// 

For those drivers that support it, use the --direct option to directly read the data and then write it into HDFS. The process is much faster, as it requires no intervening files. When loading data in this way, directories are created within HDFS according to the table names. For example, within the recipe data set is the access log information in the access_log table, and the imported data is written into text files within the access_log directory (see Listing 7).

Listing 7. Viewing imported data from Sqoop

$ hdfs dfs ‑ls access_log      
Found 6 items
‑rw‑r‑‑r‑‑   3 cloudera cloudera          0 2013‑08‑15 09:37 access_log/_SUCCESS
drwxr‑xr‑x   ‑ cloudera cloudera          0 2013‑08‑15 09:37 access_log/_logs
‑rw‑r‑‑r‑‑   3 cloudera cloudera   36313694 2013‑08‑15 09:37 access_log/part‑m‑00000
‑rw‑r‑‑r‑‑   3 cloudera cloudera   36442312 2013‑08‑15 09:37 access_log/part‑m‑00001
‑rw‑r‑‑r‑‑   3 cloudera cloudera   36797470 2013‑08‑15 09:37 access_log/part‑m‑00002
‑rw‑r‑‑r‑‑   3 cloudera cloudera   36321038 2013‑08‑15 09:37 access_log/part‑m‑00003

By default, the files are split into approximately 30MB blocks, and the data is separated by commas (see Listing 8).

Listing 8. CSV converted Sqoop table data


To select individual tables, use the code in Listing 9.

Listing 9. Selecting individual tables

$ sqoop import‑all‑tables ‑‑connect jdbc:mysql:// ‑‑username
=cheffy  ‑‑table access_log

And to select individual columns from that table, use the code in Listing 10.

Listing 10. Selecting individual columns

Rather than individually selecting tables and columns, a more practical approach is to use a query to specify the information to output. When using this method, you must use the $CONDITIONS variable in your statement and specify the column to use when dividing up the data into individual packets using the --split-by option as shown in Listing 11.

Listing 11. Specifying information to output

$ sqoop import‑all‑tables ‑‑connect jdbc:mysql:// ‑‑username
=cheffy ‑‑query 'select recipeid,recipe,description from recipe WHERE $CONDITION' 
‑‑split‑by id

One limitation of Sqoop, however, is that it provides limited ability to format and construct the information. For complex data, the export and load functions of a custom tool may provide better functionality.

Extracting data from Hadoop

When processing raw, processed data back from Hadoop, you need to take the files output by your Hadoop job. As with exporting, you should ensure that your Hadoop job outputs the information in a format that you can read back effectively.

Importing to SQL

Using CSV is simple and straightforward, but for more complex structures, you might want to consider the JSON route again because it makes the entire conversion and translation process so easy.

Getting the information out requires use of the HDFS tool to get your output files back to a filesystem where you can perform a load —$ hdfs dfs -copyToLocal processed_logs/*, for example. Once you have the files, you can load the information using whatever method suits the source information and structure.

Exporting from Sqoop

As with the import process, Sqoop provides a simplified method for translating information from your Hadoop job back into an SQL table.

When outputting the resulting information from Sqoop, use the CSV format for the easiest export. Then to import the information, you will need to create a suitable table to accept the processed logs. For example, from our access logs, the Hadoop output has mapped the data into summaries of the number of operations, so it’s necessary to first create a suitable table: CREATE TABLE summary_logs (operation CHAR(80), count int). Then the information can be imported directly from Hadoop into your SQL table (see Listing 12).

Listing 12. Exporting from Hadoop into SQL

$ sqoop export ‑‑connect jdbc:mysql:// ‑‑username=root 
    ‑‑export‑dir processed_log ‑‑table processed_log
13/08/15 10:04:34 INFO manager.MySQLManager: Preparing to use a MySQL streaming 
13/08/15 10:04:34 INFO tool.CodeGenTool: Beginning code generation
13/08/15 10:04:35 INFO manager.SqlManager: Executing SQL statement: SELECT t.
     FROM access_log AS t LIMIT 1
13/08/15 10:04:35 INFO manager.SqlManager: Executing SQL statement: SELECT t.
     FROM access_log AS t LIMIT 1
13/08/15 10:04:35 INFO orm.CompilationManager: HADOOP_MAPRED_HOME 
    is /usr/lib/hadoop‑mapreduce
13/08/15 10:04:35 INFO orm.CompilationManager: Found hadoop core jar at:
Note: /tmp/sqoop‑cloudera/compile/8034e8d9feb8c1b0f69a52fede8d1da7/ 
    uses or overrides a deprecated API.
Note: Recompile with ‑Xlint:deprecation for details.
13/08/15 10:04:37 INFO orm.CompilationManager: Writing jar file:
13/08/15 10:04:37 INFO mapreduce.ExportJobBase: Beginning export of access_log
13/08/15 10:04:39 WARN mapred.JobClient: Use GenericOptionsParser for parsing the 
    arguments. Applications should implement Tool for the same.
13/08/15 10:04:39 INFO input.FileInputFormat: Total input paths to process : 4
13/08/15 10:04:39 INFO input.FileInputFormat: Total input paths to process : 4
13/08/15 10:04:39 INFO mapred.JobClient: Running job: job_201308150649_0006
13/08/15 10:04:40 INFO mapred.JobClient:  map 0% reduce 0%
13/08/15 10:04:57 INFO mapred.JobClient:  map 2% reduce 0%
13/08/15 10:08:06 INFO mapred.JobClient:     
    CPU time spent (ms)=27470
13/08/15 10:08:06 INFO mapred.JobClient:     
    Physical memory (bytes) snapshot=317607936
13/08/15 10:08:06 INFO mapred.JobClient:     
    Virtual memory (bytes) snapshot=2076659712
13/08/15 10:08:06 INFO mapred.JobClient:     
    Total committed heap usage (bytes)
13/08/15 10:08:06 INFO mapreduce.ExportJobBase: Transferred 139.1333 MB in 
    207.5656 seconds (686.3975 KB/sec)
13/08/15 10:08:06 INFO mapreduce.ExportJobBase: Exported 2401906 records.

The process is complete. Even at the summarized level, we are looking at 2.4 million records of simplified data from a content store about 600 times that size.

With the imported information, we can now perform some simple and quick queries and structures on the data. For example, this summary of the key activities takes about 5 seconds (see Figure 4).

Figure 4. Summary operations
summary operations

On the full data set, the process took almost an hour. Similarly, a query on the top search terms took less than a second, compared to over 3 minutes, a time savings that makes it possible to include a query on the homepage (see Figure 5).

Figure 5. Summary ingredient search
summary ingredient search

These are simplified examples of the external reduction processing in Hadoop being used, but they effectively demonstrate the advantage of the external interface.


Getting information in and out of Hadoop data that has been based on SQL data is not complicated, providing you know the data, its format, and how you want the information internally processed and represented. The actual conversion, exporting, processing, and importing is surprisingly straightforward.

The solutions in this article have looked at direct, entire-dataset dumps of information that can be exported, processed, and imported to Hadoop. The process can be SQL to Hadoop, Hadoop to SQL, or SQL to Hadoop to SQL. In fact, the entire sequence can be scripted or automated, but that’s a topic for a future article in this series.

In Part 2, we look at more advanced examples of performing this translation and movement of content by using one of the SQL layers that sits on top of HDFS. We’ll also lay the foundation for providing a full live transmission of data for processing and storage.