Data warehouse capabilities are increasingly important to the Spark data platform. To easily benefit from data warehousing capabilities from Apache Spark data modeling features such as star schema are available. Star schema is the simplest form of a data warehouse schema. Star schema tables are connected using primary key – foreign key relationships. Queries against star schema are expected to run fast based on the established relationships among the tables. This blog presents the star schema enhancements in Apache Spark.

What is a data warehouse?

Typically, a data warehouse is a relational database that integrates data from heterogeneous data sources. It is designed for data modeling and analysis. The most important requirements are query performance and data simplicity. These needs are best addressed by a dimensional or star schema model. Star schema consists of a fact table referencing a number of dimension tables. Fact table contains the main data, or measurements, about a business. Dimension table, usually a smaller table, describes the different characteristics, or dimensions, of a business.

TPC-DS benchmark is a proxy of a real data warehouse. It represents the standard benchmark for measuring the performance of decision support solutions. It uses a retail product supplier as the underlying business model. The figure below shows part of the store_sales fact table diagram of the TPC-DS benchmark.

The diagram looks like a star with the fact table store_sale at the center. Surrounding the fact table are the dimension tables, which are related to the fact table by 1 – N joins. Fact and dimension tables are in a primary key – foreign key relationship.

About Spark’s Catalyst Optimizer

Catalyst is Spark’s query optimizer. Query optimizer is the component that determines the best way to execute a query. It chooses the implementation of relational operators and the order of their execution. Internally, queries are represented by trees of operators. There are logical trees and physical trees corresponding to the different phases of optimizations. Trees are manipulated by rules. Catalyst is mainly a rule-based optimizer. Each query compiler phase shown below applies a different set of rules.

For example, in the Logical Optimization phase, rules transform logical plans into semantically equivalent ones for better performance (e.g. push down predicates close to the base table access, remove unreferenced columns, etc). The order of the joins is also decided in this phase. In the Physical Planning phase, logical operators are transformed into physical operators, which can be executed by Spark at runtime.

The EXPLAIN command can be used to visualize the internal query transformations. Consider the following query using TPC-DS schema:

select avg(ss_net_profit) as store_sales_profit
from store_sales, date_dim
where d_date_sk =ss_sold_date_sk and
      d_moy between 4 and  10 and
      d_year = 2001

The output of the EXPLAIN command is shown below.

== Parsed Logical Plan ==
'Project ['avg('ss_net_profit) AS store_sales_profit#0]
+- 'Filter ((('d_date_sk = 'ss_sold_date_sk) && (('d_moy >= 4) && ('d_moy <= 10))) && ('d_year = 2001))
   +- 'Join Inner
      :- 'UnresolvedRelation `store_sales`
      +- 'UnresolvedRelation `date_dim`

== Analyzed Logical Plan ==
store_sales_profit: double
Aggregate [avg(ss_net_profit#25) AS store_sales_profit#0]
+- Filter (((d_date_sk#26 = ss_sold_date_sk#3) && ((d_moy#34L >= cast(4 as bigint)) && (d_moy#34L <= cast(10 as bigint)))) && (d_year#32L = cast(2001 as bigint)))
   +- Join Inner
      :- SubqueryAlias store_sales
      :  +- Relation[ss_sold_date_sk#3, ...] parquet
      +- SubqueryAlias date_dim
         +- Relation[d_date_sk#26,d_date_id#27,d_date#28, ...] parquet

== Optimized Logical Plan ==
Aggregate [avg(ss_net_profit#25) AS store_sales_profit#0]
+- Project [ss_net_profit#25]
   +- Join Inner, (d_date_sk#26 = ss_sold_date_sk#3)
      :- Project [ss_sold_date_sk#3, ss_net_profit#25]
      :     +- Relation[ss_sold_date_sk#3,ss_sold_time_sk#4, ...] parquet
      +- Project [d_date_sk#26]
         +- Filter (d_moy#34L >= 4)) && (d_moy#34L <= 10)) && ...
            +- Relation[d_date_sk#26,d_date_id#27, . . .] parquet

== Physical Plan ==
*HashAggregate(keys=[], functions=[avg(ss_net_profit#25)], output=[store_sales_profit#0])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_avg(ss_net_profit#25)], output=[sum#159, count#160L])
      +- *Project [ss_net_profit#25]
         +- *BroadcastHashJoin [ss_sold_date_sk#3], [d_date_sk#26], Inner, BuildRight
            :- *Project [ss_sold_date_sk#3, ss_net_profit#25]
            :  +- *Filter isnotnull(ss_sold_date_sk#3)
            :     +- *FileScan parquet tpcds1tb.store_sales[ss_sold_date_sk#3, ...]
           +- BroadcastExchange HashedRelationBroadcastMode
               +- *Project [d_date_sk#26]
                  +- *Filter (d_moy#34L >= 4)) && (d_moy#34L <= 10)) && (d_year#32L = 2001))
                     +- *FileScan parquet tpcds1tb.date_dim [d_date_sk#26, d_year#32L,d_moy#34L]

Spark’s Catalyst Optimizer uses a combination of heuristics and cost model to optimize plans. Cost model is an evolving feature in Spark. Until cost model is fully implemented, heuristics can be used. One such heuristic is star schema.

Star schema detection in Spark

Queries against star schema are expected to run fast based on the established relationships among the tables. The above query is an example of a star schema join. It is a join between the fact table store_sales and dimension table date_dim. In general, star schema joins are detected using one the following conditions:

  1. Referential integrity constraints (reliable detection)
    • Dimension contains a primary key that is being joined to the fact table
    • Fact table contains foreign keys referencing multiple dimension tables
  2. Cardinality based heuristics
    • Usually, the table with the highest cardinality is the fact table
    • Table being joined with the most number of tables is the fact table

In SPARK-17626 star schema joins is detected using table and column statistics. With this information the Optimizer joins the tables together in a star schema in an optimal way. Performance tests using 1TB TPC-DS benchmark show a two to eight times improvement for star schema queries.

Optimizations using star schema

An example of star schema optimizations and its impact on query performance is shown next. Simplified TPC-DS query 25 joins tables from two different star schemas: one corresponding to the store_sales fact table, shown in blue and one corresponding to the store_returns fact table, shown in green.

select i_item_id, s_store_id, avg(ss_net_profit) as store_sales_profit,      
       avg(sr_net_loss) as store_returns_loss
  date_dim d1,
  date_dim d2,
  d1.d_moy = 4 and
  d1.d_year = 1998 and
  d1.d_date_sk = ss_sold_date_sk 
  i_item_sk = ss_item_sk and 
  s_store_sk = ss_store_sk and 
  ss_customer_sk = sr_customer_sk and 
  ss_item_sk = sr_item_sk and 
  ss_ticket_number = sr_ticket_number and 
  sr_returned_date_sk = d2.d_date_sk and 
  d2.d_moy = 4 and
  d2.d_year = 1998
group by i_item_id, s_store_id
order by i_item_id, s_store_id

The following star schema diagram shows the relationship among the tables based on the join predicates.

The join reorder based on star schema detection is shown using the following plan transformation.

The tree on the left shows the execution without star schema detection. Without star schema, the Optimizer joins the two large fact tables, store_sales and store_returns, early in the execution plan. Spark is forced to use a sort-merge join based on the size of the inner table. The output of both fact tables scans require sorting before the join operation.

The tree on the right shows star schema detection. The tables that belong to the store_sales fact table (e.g. store_sales, date_dim, store, and item) are joined together in an efficient star join plan that has the large fact table on the driving arm and the smaller dimensions on the inner. This execution will not only result in efficient broadcast hash joins, but also significantly filter the data early in the execution plan. With the new plan, the number of input rows to the sort-merge join is reduced from 2.8x109 rows to 2.5x107 rows.

When the query is run using a 1TB TPC-DS setup the execution time drops from about 421 secs to 147 secs, a three times improvement.

Performance results using 1TB TPC-DS benchmark

More performance results are shown in the following table.

The tests were run using the following system configuration:

Cluster: 4-node cluster, each node having:
     12 2 TB disks, 
     Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz, 128 GB RAM, 10 Gigabit Ethernet
     Number of cores: 48
Apache Hadoop 2.7.3,  Apache Spark 2.2 main ( Mar 31, 2017) 
Database info:
    Schema: TPCDS
    Scale factor: 1TB total space
    Storage format: Parquet with Snappy compression

With star schema detection, there is an improvement between two and eight times. The improvements are a result of the fact that the Optimizer can make better decisions by observing relationships among the tables. The new execution plans not only reduce the data early in the execution plan, but also favor broadcast hash joins over more expensive sort-merge joins.

How to enable star schema enhancements

A new Spark SQL configuration property, spark.sql.cbo.starSchemaDetection, was introduced in Spark 2.2. Its default value is false. It can be enabled using one of the methods specified in the Spark reference manual.

Join The Discussion

Your email address will not be published. Required fields are marked *