The Blog

 

Background and motivation

When Spark applications operate on distributed data coming from disparate data sources, they often have to directly query data sources external to Spark such as backing relational databases, or data warehouses. For that, Spark provides Data Source APIs, which are a pluggable mechanism for accessing structured data through Spark SQL. Data Source APIs are tightly integrated with the Spark Optimizer. They provide optimizations such as filter push down to the external data source and column pruning. While these optimizations significantly speed up Spark query execution, depending on the data source, they only provide a subset of the functionality that can be pushed down and executed at the data source. As part of our ongoing project to provide a generic data source push down API, this blog will show our work related to aggregate push down. Spark jira 22390 was opened to address this issue and the design doc is in the jira.

Filter push down implementation

In a SQL statement, a filter is often used to choose the rows that meet the given criteria. In Spark, a filter is pushed down to the data source layer using the following implementation:

  1. Logical Plan Filter is in Catalyst Expression.
  2. A Catalyst Expression is translated into data source Filter.
  3. If the Catalyst Expression can’t be translated to data source Filter, or is not supported by data source, it will be handled at the Spark layer.
  4. Otherwise, it will be pushed down to the data source layer.

Aggregate push down

Aggregate functions are often used in SQL to compute a single result from a set of input values. The most commonly used aggregate functions are AVG, COUNT, MAX, MIN and SUM. If the aggregates in the SQL statement are supported by data source with the same exact semantics as Spark, these aggregates can be pushed down to the data source level to improve performance. The performance gains are mainly in two areas:

  • Network IO between Spark and data source is dramatically reduced.
  • The aggregate computation can be faster in data source because of presence of indexes.

Aggregate is usually used in combination with a filter. For example:

Select sum(i)
From T
Where i > 3
Group by j
Having sum(i) > 10

The following figure shows how we can push down the above Filter and Aggregate using Spark Data Source v2.

process flow diagram

DSV: Data Source View
AGG: Aggregate
FilterPD: Filter Push Down
AGGPD: Aggregate Push Down

Logical/Physical plans for pushed down aggregate and no pushed down aggregate

Let’s take a look of the logical and physical plans of the previous SQL statement for both pushed down and no pushed down cases. In Spark, the previous SQL statement can be written as

df.filter('i > 3).groupBy('j).agg(sum($"i")).where(sum('i) > 10)

Before Aggregate Push Down, only the filter i> 3 is pushed down to the data source. Both the Aggregate sum (i) and the Aggregate filter sum(i) > 10 stay at the Spark layer. The Optimized Logical Plan and Physical Plan are shown below:

== Optimized Logical Plan ==
Project [j#1, sum(i)#10L]
+- Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- Aggregate [j#1], [j#1, sum(cast(i#0 as bigint)) AS sum(i)#10L,
     sum(cast(i#0 as bigint)) AS sum(cast(i#0 as bigint))#21L]
      +- DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int,
       j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]


== Physical Plan ==
*Project [j#1, sum(i)#10L]
+- *Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- *HashAggregate(keys=[j#1], functions=[sum(cast(i#0 as bigint))],
     output=[j#1, sum(i)#10L, sum(cast(i#0 as bigint))#21L])
       +- Exchange hashpartitioning(j#1, 5)
      +- *HashAggregate(keys=[j#1], functions=[partial_sum(cast(i#0 as
          bigint))], output=[j#1, sum#24L])
      +- DataSourceV2Scan(source=AdvancedDataSourceV2,
      schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]

After Aggregate Push Down, it has the following Optimized Logical Plan and Physical Plan:

== Optimized Logical Plan ==
DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

== Physical Plan ==
DataSourceV2Scan(source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

Performance Results Using TPCDS 1TB Setup

The early prototype of this feature has shown great improvement in aggregate heavy workloads. Below are some results:

Test1 (with group by, completely push down, no partition)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk

No aggregate push down: 782.287 sec
Aggregate push down: 250.331 sec
Performance improvement: ~3X improvement

Test2 (no group by, completely push down, no partition)

SELECT avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost) FROM store_sales

No aggregate push down: 2219.104 sec
Aggregate push down: 839.664 sec
Performance improvement: ~2.6X improvement

Test3 (with group by, completely push down, partition column is the same as group by column)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_date_sk

No aggregate push down: 588.918 sec
Aggregate push down: 296.763 sec
Performance improvement: ~2X improvement

Test4 (with group by, completely push down, partition column is different from group by column)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_time_sk

No aggregate push down: 344.509 sec
Aggregate push down: 225.186 sec
Performance improvement: ~1.5X improvement