(Updated March 10, 2016)

Columnar storage layout such as Parquet can speed up queries because it examines and performs calculations on all values for required columns only thereby reading only a small fraction of the data from a data file or table. Parquet also supports flexible compression options so on-disk storage can be reduced drastically.

If you have text-based data files or tables on HDFS and are using Spark SQL to perform queries against them, it is highly recommended to convert text data files to Parquet data files to achieve performance and storage benefits. Sure, it takes time to convert, but query performance boost can reach 30X or higher in some cases, and storage saving up to 75%!

Similar performance gains have been written for BigSQL, Hive, and Impala using Parquet storage, and this blog will show you how to write a simple Scala application to convert existing text-base data files or tables to Parquet data files, and show you the actual storage savings and query performance boost for Spark SQL.

Let’s convert to Parquet!

Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. Parquet schema allows data files “self-explanatory” to the Spark SQL applications through the Data Frame APIs. Of course, Spark SQL also supports reading existing Hive tables that are already stored as Parquet but you will need to configure Spark to use Hive’s metastore to load all that information. In our example, Hive metastore is not involved.

The following Scala code example reads from a text-based CSV table and writes it to a Parquet table:


  def convert(sqlContext: SQLContext, filename: String, schema: StructType, tablename: String) {
      // import text-based table first into a data frame.
      // make sure to use com.databricks:spark-csv version 1.3+ 
      // which has consistent treatment of empty strings as nulls.
      val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .schema(schema)
        .option("delimiter","|")
        .option("nullValue","")
        .option("treatEmptyValuesAsNulls","true")
        .load(filename)
      // now simply write to a parquet file
      df.write.parquet("/user/spark/data/parquet1000g/"+tablename)
  }

  // usage exampe -- a tpc-ds table called catalog_page
  schema= StructType(Array(
          StructField("cp_catalog_page_sk",        IntegerType,false),
          StructField("cp_catalog_page_id",        StringType,false),
          StructField("cp_start_date_sk",          IntegerType,true),
          StructField("cp_end_date_sk",            IntegerType,true),
          StructField("cp_department",             StringType,true),
          StructField("cp_catalog_number",         LongType,true),
          StructField("cp_catalog_page_number",    LongType,true),
          StructField("cp_description",            StringType,true),
          StructField("cp_type",                   StringType,true)))

  convert(sqlContext,
          hadoopdsPath+"/catalog_page/*",
          schema,
          "catalog_page")

The code above essentially reads text-based CSV files in hadoopdsPath+"/catalog_page/* and saves converted Parquet files under /user/spark/data/parquet/. In addition, the converted Parquet files are automatically compressed in gzip because the Spark variable, spark.sql.parquet.compression.codec is set to gzip by default. You can also set the compression codec as uncompressed, snappy, or lzo.

Example spark-submit command used:
spark-submit --master yarn-client --verbose --name ConvertTextToParquet --driver-memory 10g --executor-memory 5g --num-executors 80 --packages com.databricks:spark-csv_2.10:1.3.0 --class org.apache.spark.examples.sql.ConvertTextToParquet /home/spark/tpcdssparksql_2.10-0.9.jar /user/spark/data/text1000g

How long does it take to convert 1 TB worth of data?

50 minutes, yielding about 20 GB/min throughput on a 6-datanode Spark v1.5.1 cluster. Total memory used is about 500GB. And the resulting Parquet files on HDFS take the form of:

hdfs://bigaperf116.svl.ibm.com:8020/user/spark/data/parquet1000g/catalog_page/_SUCCESS
hdfs://bigaperf116.svl.ibm.com:8020/user/spark/data/parquet1000g/catalog_page/_common_metadata
hdfs://bigaperf116.svl.ibm.com:8020/user/spark/data/parquet1000g/catalog_page/_metadata
hdfs://bigaperf116.svl.ibm.com:8020/user/spark/data/parquet1000g/catalog_page/part-r-00000-a9341639-a804-45bd-b594-8e58220190f4.gz.parquet
hdfs://bigaperf116.svl.ibm.com:8020/user/spark/data/parquet1000g/catalog_page/part-r-00001-a9341639-a804-45bd-b594-8e58220190f4.gz.parquet

Storage savings

The following Linux output shows on-HDFS size comparison between TEXT and PARQUET:

	% hadoop fs -du -h -s /user/spark/data/text1000g
	897.9 G  /user/spark/data/text1000g
	% hadoop fs -du -h -s /user/spark/data/parquet1000g
	231.4 G  /user/spark/data/parquet1000g

That is close to a 75% saving on storage for 1TB worth of data files!

Query performance gain

Parquet files are self-describing so the schema is preserved. To load a Parquet file into a DataFrame and to register it as a temp table, do the following:

      val df = sqlContext.read.parquet(filename)
      df.show
      df.registerTempTable(tablename)

To compare performance, you can then run the following query (assuming all other tpc-ds tables have also been converted to Parquet) against TEXT and PARQUET tables separately. You can leverage the spark-sql-perf test kit to do query testing. Now, take query #76 in TPC-DS benchmark for example,

("q76", """
            | SELECT
            |    channel, col_name, d_year, d_qoy, i_category, COUNT(*) sales_cnt,
            |    SUM(ext_sales_price) sales_amt
            | FROM(
            |    SELECT
            |        'store' as channel, ss_store_sk col_name, d_year, d_qoy, i_category,
            |        ss_ext_sales_price ext_sales_price
            |    FROM store_sales, item, date_dim
            |    WHERE ss_store_sk IS NULL
            |      AND ss_sold_date_sk=d_date_sk
            |      AND ss_item_sk=i_item_sk
            |    UNION ALL
            |    SELECT
            |        'web' as channel, ws_ship_customer_sk col_name, d_year, d_qoy, i_category,
            |        ws_ext_sales_price ext_sales_price
            |    FROM web_sales, item, date_dim
            |    WHERE ws_ship_customer_sk IS NULL
            |      AND ws_sold_date_sk=d_date_sk
            |      AND ws_item_sk=i_item_sk
            |    UNION ALL
            |    SELECT
            |        'catalog' as channel, cs_ship_addr_sk col_name, d_year, d_qoy, i_category,
            |        cs_ext_sales_price ext_sales_price
            |    FROM catalog_sales, item, date_dim
            |    WHERE cs_ship_addr_sk IS NULL
            |      AND cs_sold_date_sk=d_date_sk
            |      AND cs_item_sk=i_item_sk) foo
            | GROUP BY channel, col_name, d_year, d_qoy, i_category
            | ORDER BY channel, col_name, d_year, d_qoy, i_category
            | limit 100

Query times are as follows:

            TIME               TEXT     PARQUET
            Query time (sec)    698          21

Query 76 speeds up from almost 12 minutes to just less than half a minute, a 30X improvement!

12 comments on"How-to: Convert Text to Parquet in Spark to Boost Performance"

  1. […] is well-known that columnar storage saves both time and space when it comes to big data processing. Parquet, for example, is shown to boost Spark SQL performance […]

  2. Federico Ponzi March 03, 2016

    Nice comparison! I love the parquet format, even if spark seems really slow to creating them (compared with apache-drill).

  3. […] While CSV is great for readability, for working within Spark, Parquet is choice to speed things up. More details on Apache Parquet could be found here. Essentially the solution provides provides columnar storage that enables complex data to be encoded efficiently in bulk. The difference between columnar structure relative to a traditional DB structure is that how data is fundamentally organized. This enables searches across large data sets and reads of large sets of data can be optimized. Parquet provides better performance advantage over CSV, which is true especially dealing with large data sets. Here is an excellent article that elegently articulates the benefits […]

  4. Thank you for the great comparison. This is probably obvious, but are there any performance benefits of running those operations directly on DataFrame as opposed to registering the DataFrame as a TempTable?

    • Thanks John, for the kind comment. I have not done the comparison. My hypothesis would be that given the same amount of memory, their performance should be on-par, due to the fact that under the covers, they are all cached RDDs.

  5. Once the data gets converted to DF, from any format, there is a schema associated with the Data. So from where are we getting the performance benefit. I think I am missing something here.

  6. Rudolf Farfan November 19, 2016

    Appreciating the hard work you put into your
    blog and detailed information you provide. It’s awesome to come across a blog every once in a
    while that isn’t the same out of date rehashed material. Great read!
    I’ve bookmarked your site and I’m including your RSS feeds to my Google account.

  7. Noam Marianne December 04, 2016

    great blog, any chance pythoning the scripts?

  8. rohit pareek December 09, 2016

    great blog!!! helped me a lot 🙂
    Thanks

  9. How do we convert txt file to parquet file format in Java or Python. Do we currently has any APIs to do that in these languages?

    Thanks.

  10. Raja Rajagopal May 15, 2018

    Great topic. I have a question. My use case is, I have a fixed length file and I need to tokenize some of the columns on that file and store that into S3 bucket and again read the same file from S3 bucket and push into NoSQL DB. (it could be Casndra or MongoDB).Please let me know which is the best way to do this using Spark & Scala.

    Note: The fixed length file will huge file.

    Could you please help me on this?

    Thanks,
    Raja Rajagopal

  11. Raja Rajagopal May 15, 2018

    May I know, where can I get the catalog_page sample files?

Join The Discussion

Your email address will not be published. Required fields are marked *