砥砺奋进谱新篇,且看旧貌换新颜。欢迎访问新的 IBM Developer 中文网站! 了解详情

IBM Developer 博客

通过 IBM Developer 关注最新动态并获取信息

目前,过滤器被下推到数据源层,以获得更好的性能。但是,聚合仍然是在 Spark 层完成的。我们可以将聚合下推到数据源,以便进一步提高 Spark 性能。


背景和动机

当 Spark 应用程序对来自不同数据源的分布式数据进行操作时,它们通常必须直接查询 Spark 外部的数据源,比如支持关系数据库或数据仓库。为此,Spark 提供了 Data Source API,这些 API 是一种即插即用机制(pluggable mechanism),用于通过 Spark SQL 访问结构化数据。Data Source API 与 Spark Optimizer 紧密集成。它们提供了诸多优化,比如将过滤器下推到外部数据源和列剪枝(column pruning)。虽然这些优化显著加快了 Spark 查询的执行速度,但根据数据源的不同,它们仅支持将部分功能下推到数据源并执行。我们正在开展一个提供通用数据源下推 API 的项目,作为该项目的一部分,本博客将介绍我们有关聚合下推的工作。我们开放了 Spark jira 22390 来解决此问题,设计文档已在 jira 中。

过滤器下推实现

在 SQL 语句中,过滤器通常用于选择满足给定条件的行。在 Spark 中,可以使用以下实现将过滤器下推到数据源层:

  1. 逻辑计划过滤器包含在 Catalyst Expression 中。
  2. 一个 Catalyst Expression 被转换为数据源过滤器
  3. 如果该 Catalyst Expression 无法转换为数据源过滤器,或者不受数据源支持,那么将在 Spark 层上处理它。
  4. 否则,会将它下推到数据源层。

聚合下推

SQL 中经常使用聚合函数利用一组输入值来计算单个结果。最常用的聚合函数是 AVG、COUNT、MAX、MIN 和 SUM。如果 SQL 语句中的聚合得到与 Spark 具有相同语义的数据源的支持,那么可以将这些聚合下推到数据源级别,以提升性能。性能提升主要表现在两个领域:

  • Spark 与数据源之间的网络 IO 显著减少。
  • 由于索引的存在,数据源中的聚合计算速度变得更快。

聚合通常与过滤器结合使用。例如:

Select sum(i)
From T
Where i > 3
Group by j
Having sum(i) > 10

下图展示了如何使用 Spark Data Source v2 下推上述过滤器并进行聚合。

流程图

DSV:数据源视图
AGG:聚合
FilterPD: 过滤器下推
AGGPD:聚合下推

下推聚合和无下推聚合的逻辑/物理计划

让我们看看前一个 SQL 语句对于下推和非下推情况的逻辑和物理计划。在 Spark 中,前面的 SQL 语句可以编写为

df.filter('i > 3).groupBy('j).agg(sum($"i")).where(sum('i) > 10)

在聚合下推前,只有过滤器 i> 3 被下推到数据源。但聚合函数 sum (i) 和聚合过滤器 sum(i) > 10 仍在 Spark 层。下面给出了经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
Project [j#1, sum(i)#10L]
+- Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- Aggregate [j#1], [j#1, sum(cast(i#0 as bigint)) AS sum(i)#10L,
     sum(cast(i#0 as bigint)) AS sum(cast(i#0 as bigint))#21L]
      +- DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int,
       j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]


== Physical Plan ==
*Project [j#1, sum(i)#10L]
+- *Filter (isnotnull(sum(cast(i#0 as bigint))#21L) && (sum(cast(i#0 as
      bigint))#21L > 10))
   +- *HashAggregate(keys=[j#1], functions=[sum(cast(i#0 as bigint))],
     output=[j#1, sum(i)#10L, sum(cast(i#0 as bigint))#21L])
       +- Exchange hashpartitioning(j#1, 5)
      +- *HashAggregate(keys=[j#1], functions=[partial_sum(cast(i#0 as
          bigint))], output=[j#1, sum#24L])
      +- DataSourceV2Scan(source=AdvancedDataSourceV2,
      schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)]

聚合下推后,它具有以下经过优化的逻辑计划和物理计划:

== Optimized Logical Plan ==
DataSourceV2Relation (source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

== Physical Plan ==
DataSourceV2Scan(source=AdvancedDataSourceV2,schema=[i#0 int, j#1 int],filters=[isnotnull(i#0), (i#0 > 3)] aggregates=[sum(cast(i#0 as bigint))], groupby=[j#1], havingClause==[sum(cast(i#0 as bigint))>10], options=Map())

使用 TPCDS 1TB 设置的性能结果

在具有大量聚合的工作负载中,此功能的早期原型表现出很大的改进。下面是一些结果:

测试 1(含 group by、完全下推、无 partition)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk

无聚合下推:782.287 秒
聚合下推:250.331 秒
性能提升:约 3 倍的提升

测试 2(无 group by、完全下推、无 partition)

SELECT avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost) FROM store_sales

无聚合下推:2219.104 秒
聚合下推:839.664 秒
性能提升:约 2.6 倍的提升

测试 3(含 group by、完全下推、partition 列与 group by 列相同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_date_sk

无聚合下推:588.918 秒
聚合下推:296.763 秒
性能提升:约两倍的提升

测试 4(含 group by、完全下推、partition 列与 group by 列不同)

select sum(cs_quantity), cs_sold_date_sk from catalog_sales group by cs_sold_date_sk partition by cs_sold_time_sk

无聚合下推:344.509 秒
聚合下推:225.186 秒
性能提升:约 1.5 倍的提升

本文翻译自:Data Source V2 aggregate push down(2018-10-23)