It is well-known that columnar storage saves both time and space when it comes to big data processing. Parquet, for example, is shown to boost Spark SQL performance by 10X on average compared to using text, thanks to low-level reader filters, efficient execution plans, and in Spark 1.6.0, improved scan throughput!
To see the power of Parquet, we pick 24 TPC-DS-derived queries (out of 99 because some queries simply fail to run with flat CSV data files at 1TB scale factor. More on this below.) from the spark-perf-sql test kit for comparison. The queries represent all categories in TPC-DS: reporting, ad hoc, iterative and data mining. We also make sure to include both short (queries 12 and 91) and long-running queries (queries 24a and 25), and queries known to use 100% of CPU (query 97).
We use a 6-node on-prem Cisco UCS cluster similarly configured per Cisco Validated Designs. We tune the underlying hardware so there is no network or disk IO bottlenecks in any tests. The focus is to find differences in performance running these queries simply with text and Parquet storage formats in Spark 1.5.1 and in the just-released Spark 1.6.0. The total Spark working memory is 500GB. The TPC-DS scale factor is 1TB.
1. Spark SQL is much faster with Parquet!
The chart below compares the sum of all execution times of the 24 queries running in Spark 1.5.1. Queries taking about 12 hours to complete using flat CVS files vs. taking less than 1 hour to complete using Parquet, a 11X performance improvement.
2. Spark SQL works better at large-scale with Parquet
Poor choice of storage format often cause exceptions that are difficult to diagnose and fix. At 1TB scale factor for example, at least 1/3 of all runnable queries failed to complete using flat CSV files, but they all completed using Parquet files.
Some of the errors and exceptions are very cryptic. Here are 3 examples:
Error example 1:
WARN scheduler.TaskSetManager: Lost task 145.0 in stage 4.0 (TID 4988, rhel8.cisco.com): FetchFailed(BlockManagerId(2, rhel4.cisco.com, 49209), shuffleId=13, mapId=47, reduceId=145, message= org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data6/hadoop/yarn/local/usercache/spark/appcache/application_1447965002296_0142/blockmgr-44627d4c-4a2b-4f53-a471-32085a252cb0/15/shuffle_13_119_0.index (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195)
Error example 2:
WARN scheduler.TaskSetManager: Lost task 1.0 in stage 13.1 (TID 13621, rhel7.cisco.com): FetchFailed(null, shuffleId=9, mapId=-1, reduceId=148, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 9 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
Error example 3:
ERROR cluster.YarnScheduler: Lost executor 59 on rhel4.cisco.com: remote Rpc client disassociated
Most of these failures force Spark to re-try by re-queuing tasks or even re-starting a stage. And things get worse from here on out; eventually the application gets killed and never completes.
By switching to Parquet, these problems just went away without changing any other Spark configuration. Compression minimizes size of files, columnar format allows reading only selective records, and reduced input data directly impacts the Spark DAG scheduler’s decision on execution graphs (more details below). All these benefits of Parquet are important for the speedy completion of query execution.
3. Less disk IO
Parquet with compression reduces your data storage by 75% on average, i.e., your 1TB scale factor data files will materialize only about 250 GB on disk. This reduces significantly input data needed for your Spark SQL applications. But in Spark 1.6.0, Parquet readers used push-down filters to further reduce disk IO. Push-down filters allow early data selection decisions to be made before data is even read into Spark. For example, in handling the
between clause in query 97:
select cs_bill_customer_sk customer_sk, cs_item_sk item_sk from catalog_sales,date_dim where cs_sold_date_sk = d_date_sk and d_month_seq between 1200 and 1200 + 11
Spark SQL shows the following scan statement in the query’s physical plan:
+- Scan ParquetRelation[d_date_sk#141,d_month_seq#144L] InputPaths: hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_SUCCESS, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_common_metadata, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_metadata, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/part-r-00000-4d205b7e-b21d-4e8b-81ac-d2a1f3dd3246.gz.parquet, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/part-r-00001-4d205b7e-b21d-4e8b-81ac-d2a1f3dd3246.gz.parquet, PushedFilters: [GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211)]]
PushedFilters only returns records in the
d_mont_seq column that is within the 1200 to 1211 range, or a few records only. Compared to flat files, where the entire table (every column and every row) is read, as shown in the physical plan:
[ Scan CsvRelation(hdfs://rhel10.cisco.com/user/spark/hadoopds1000g/date_dim/*,false,|,",null,PERMISSIVE,COMMONS,false,false,StructType(StructField(d_date_sk,IntegerType,false), StructField(d_date_id,StringType,false), StructField(d_date,StringType,true), StructField(d_month_seq,LongType,true), StructField(d_week_seq,LongType,true), StructField(d_quarter_seq,LongType,true), StructField(d_year,LongType,true), StructField(d_dow,LongType,true), StructField(d_moy,LongType,true), StructField(d_dom,LongType,true), StructField(d_qoy,LongType,true), StructField(d_fy_year,LongType,true), StructField(d_fy_quarter_seq,LongType,true), StructField(d_fy_week_seq,LongType,true), StructField(d_day_name,StringType,true), StructField(d_quarter_name,StringType,true), StructField(d_holiday,StringType,true), StructField(d_weekend,StringType,true), StructField(d_following_holiday,StringType,true), StructField(d_first_dom,LongType,true), StructField(d_last_dom,LongType,true), StructField(d_same_day_ly,LongType,true), StructField(d_same_day_lq,LongType,true), StructField(d_current_day,StringType,true), StructField(d_current_week,StringType,true), StructField(d_current_month,StringType,true), StructField(d_current_quarter,StringType,true), StructField(d_current_year,StringType,true)))[d_date_sk#141,d_date_id#142,d_date#143,d_month_seq#144L,d_week_seq#145L,d_quarter_seq#146L,d_year#147L,d_dow#148L,d_moy#149L,d_dom#150L,d_qoy#151L,d_fy_year#152L,d_fy_quarter_seq#153L,d_fy_week_seq#154L,d_day_name#155,d_quarter_name#156,d_holiday#157,d_weekend#158,d_following_holiday#159,d_first_dom#160L,d_last_dom#161L,d_same_day_ly#162L,d_same_day_lq#163L,d_current_day#164,d_current_week#165,d_current_month#166,d_current_quarter#167,d_current_year#168]]
4. Higher scan throughput in Spark 1.6.0
The Databricks’ Spark 1.6.0 release blog mentioned significant Parquet scan throughput because a “more optimized code path” is used. To show this in real world, we ran query 97 in Spark 1.5.1 and in 1.6.0 and captured nmon data. The improvement is very obvious.
First, the query response time is reduced by half: query 97 took 138 seconds in Spark 1.5.1 and 60 seconds in 1.6.0.
Second, CPU usage on worker nodes is much lower in Spark 1.6.0, mostly attributed by SPARK-11787:
5. Efficient Spark execution graph
In addition to smarter readers such as in Parquet, data formats also directly impact Spark execution graph because one major input to the scheduler is RDD count. In our example where we run the same query 97 on Spark 1.5.1 using text and Parquet, we got the following execution pattern for the stages.
Using Parquet, though more stages, jobs executed very quickly and created only two longer-running stages toward the end. This indicates cleaner parent-child stage boundaries therefore less intermediate data is needed to save to disk and/or travel via network among nodes, speeding up the end-to-end execution.
Parquet is great for use with Spark SQL. Not only the compression rate is desirable, it also allows reading only records of interest through selected columns only and through low-level reader filters. So it is worth your time to encode your existing flat files even if multiple passes on data maybe required.
Disclaimer: The spark-sql-perf workload is derived from the TPC-DS Benchmark and as such is not comparable to published TPC-DS Benchmark results.