开源数据和 AI 技术中心 (CODAIT) 团队在 Apache Spark 中实现了对信息参照完整性约束的支持。这项工作开辟了一个依赖于参照完整性约束语义的查询优化技术领域。我们看到性能得到显著改善,其中包括在某些 TPC-DS 基准查询中的性能提高了 8 倍。在本博客中,我们将回顾这项工作中的一些重要方面。

在数据仓库领域,参照完整性 (RI) 约束用来强制执行一个实体的某些属性。SQL 约束指定了应用于数据的各种规则。其用途有两种:

  • 数据完整性
  • 查询优化

主键约束用于防止多个行在同一列或同一个列组合中包含相同的值,并避免值为 null。外键约束(也称为参照完整性约束)需要一列或一个列组合中的值与其父表中的主键值相匹配。

RI 约束可以强制执行或仅提供信息。

  • 强制执行:确保应用到一个表的所有数据修改都符合该约束。
  • 已验证:确保一个表中目前包含的所有数据都符合该约束。
  • 信息或统计:用于优化查询和提高性能。已知满足给定约束的条件,所以不需要验证或强制执行该约束。

我们在 Spark 中添加了对信息性主键和外键(参照完整性)的支持,以实现企业级 TPC-DS 性能。

性能改善

这些约束用于优化 Spark 优化器的查询处理性能。有两个方面得到了改善:查询重写转换,以及优化器和运行时增强。其中一些改善包括:

  • 查询重写转换
    • 消除了 RI 联接
    • 消除了冗余联接
    • 通过联接将 group by 下移
    • 消除了排序
    • 要联接的现有子查询
  • 优化器和运行时改善
    • 证明最大基数
    • 提前退出联接
    • 星型模式检测

使用 1TB TPC-DS 基准测试的性能结果

下表显示了使用星型模式检测、对要执行内联接的现有子查询进行转换重写,以及使用 RI 通过联接优化对 group by 下移的结果。

IBM microservices

我们使用以下系统配置运行这些测试:

  • 集群:4 节点集群,每个节点拥有:
    • 12 个 2 TB 磁盘
    • 英特尔 (R) 至强 (R) CPU E5-2680 v3 @ 2.50GHz,128 GB RAM,10 Gb 以太网
    • 核心数量:48
  • Apache Hadoop 2.7.3、Apache Spark 2.2 main
  • 数据库信息:
    • 模式:TPCDS
    • 扩展系数:1TB 总空间
    • 存储格式:采用 Snappy 压缩的 Parquet

对社区的贡献

为了实现这些约束,我们添加了数据模型来定义和存储这些约束。我们添加了对新 DDL 的支持,以便可使用 `ALTER TABLE ..ADD CONSTRAINT` 命令添加主键和外键约束。

我们向 Apache Spark 社区提交了以下性能结果(PR,Performance Result)。

我们提交了设计规范,其中涵盖约束规范、元存储内容的存储、约束验证,以及维护和优化特性,参见此处。总体 JIRA 为 SPARK-19842。 欢迎对这个 JIRA 或这些 PR 提供任何反馈或意见。

联接消除重写 (Join elimination rewrite)

该转换可以检测 RI 联接,如果查询中没有引用父/主键 (PK) 表的列(PK 列除外),则不会查询该表。

获取代码

要使用并尝试联接消除优化 (Join elimination optimization),可从 PR 获取代码:https://github.com/apache/spark/pull/20868

构建

从此 PR 获取代码后,可以构建源代码。前提是您已设置好系统来构建 Spark。 如果满足此前提,构建此分支的命令与 Spark 类似。

mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -Phive-thriftserver -DskipTests clean package

启用 RI 优化联接消除重写规则。

spark.conf.set("spark.sql.riJoinElimination", "true")

测试

在 spark-shell 中运行以下 SQL 命令。在 spark 主目录下,运行 `./bin/spark-shell`,然后复制以下命令并粘贴到 shell 中。


sql("create table dim1 (dim1c1 int, dim1c2 int, dim1c3 int, dim1c4 int)")
sql("create table dim2 (dim2c1 int, dim2c2 int, dim2c3 int, dim2c4 int)")
sql("create table fact (factc1 int, factc2 int, factc3 int, factc4 int)")

sql("alter table dim1 add constraint pk1 primary key(dim1c1)")
sql("alter table dim2 add constraint pk1 primary key(dim2c1)")
sql("alter table fact add constraint fk1 foreign key (factc1) references dim1(dim1c1)")
sql("alter table fact add constraint fk2 foreign key (factc2) references dim2(dim2c1)")

sql("insert into dim1 values (1,1,1,1)")
sql("insert into dim1 values (2,2,2,2)")

sql("insert into dim2 values (10,1,1,1)")
sql("insert into dim2 values (20,2,2,2)")

sql("insert into fact values (1,10,1,1)")
sql("insert into fact values (1,20,1,1)")
sql("insert into fact values (2,20,2,2)")
sql("insert into fact values (2,20,2,2)")

spark.sql("select factc1 from fact, dim1, dim2 where factc1 = dim1c1 and factc2 = dim2c1 and dim1c1= 1 and dim2c1=20").explain(true)
spark.conf.set("spark.sql.riJoinElimination", "true")
spark.sql("select factc1 from fact, dim1, dim2 where factc1 = dim1c1 and factc2 = dim2c1 and dim1c1= 1 and dim2c1=20").explain(true)

启用 `spark.conf.set(“spark.sql.riJoinElimination”, “true”)` 后,物理计划和优化后的计划类似于:

== Optimized Logical Plan ==
Project [factc1#200] +- Filter ((((factc1#200 = 1) && (factc2#201 = 2)) && isnotnull(factc1#200)) && isnotnull(factc2#201))
+- HiveTableRelation `default`.`fact`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [factc1#200, factc2#201, factc3#202, factc4#203]

== Physical Plan ==
*(1) Project [factc1#200] +- *(1) Filter ((((factc1#200 = 1) && (factc2#201 = 2)) && isnotnull(factc1#200)) && isnotnull(factc2#201))
+- HiveTableScan [factc1#200, factc2#201], HiveTableRelation `default`.`fact`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [factc1#200, factc2#201, factc3#202, factc4#203]

应该可以看到,fact、dim1 和 dim2 之间的联接已重写,并优化为对 fact 表执行一次查询。

结束语

我们在 Spark 中添加了对信息性主键和外键(参照完整性)约束的支持,以实现企业级 TPC-DS 性能。这项工作开辟了一个依赖于参照完整性约束语义的查询优化技术领域。我们在这次 Apache 峰会演讲中展示了 8 倍的性能提升。还有一篇相关的博客解释了星型模式增强。

我们已将初始 PR 提交给社区。我们需要合并该 DDL 和数据模型 PR,才能提交其他 DDL 和优化的 PR。欢迎在社区中对总体 JIRA 或这些 PR 提供反馈。

本文翻译自:Performance enhancements in Spark using informational referential integrity constraints(2018-10-24)

加入讨论