Apache Spark MLlib’s 基于数据帧的 API 提供简单而又灵活的出色框架,用于创建端到端的机器学习管道。借助 Spark 的数据帧和 SQL 引擎之力,Spark ML 管道可帮助轻松连接机器学习工作流的各个阶段,包括数据处理到特征提取和工程,再到模型训练和评估。

然而,虽然 Spark SQL 可以为 ML 工作流的某些部分提供显著的性能提升,但在其他方面却存在重大缺陷。其中之一就是,许多最常用的 Spark ML 组件一次只能运行一列。这尤其会影响“宽”数据集的常见用例,这些数据集中包含许多变量或特征,它们通常需要以相同的方式进行处理(例如,对许多分类特征列进行编码或对许多数值特征列进行离散化处理)。由此产生的 ML 管道必须对每个特征列具有一个独立且分开运行的转换器,这会使管道构建流程稍显繁琐,导致编码如下:

val indexers = categoricalColumns.map { inputColumn =>
  new StringIndexer()
    .setInputCol(inputColumn)
    .setOutputCol(s"${inputColumn}_idx")
}

val encoders = indexers.map { indexer =>
  new OneHotEncoder()
    .setInputCol(indexer.getOutputCol)
    .setOutputCol(s"${indexer.getOutputCol}_enc")
}

val stages: Array[PipelineStage] = indexers ++ encoders
val pipeline = new Pipeline().setStages(stages)

更重要的是,由于每个阶段独立的查询计划和优化所产生的开销,这种方法会使管道执行效率低下,错失通过一次性处理多个数据帧列获得任何性能提升的机会。最后,随着管道中阶段数量的增加,分析独立查询计划的成本也会逐步增加(请参阅 SPARK-20392SPARK-19846 ,了解详细信息)。最终结果就是,在管道中变换许多数据帧列时,尤其是在行数相对较少时,性能欠佳(因为相对于计算时间来说,数据量越大,查询计划开销就越低)。

多列转换器

相反,具备能够一次性处理多个输入列的转换器更有意义。这会简化管道构建流程,避免因具有用于各列的多个独立转换器而产生的开销。上述代码变为:

val indexer = new StringIndexer()
  .setInputCols(categoricalColumns)
  .setOutputCols(categoricalColumns.map(c => s"${c}_idx"))

val encoder = new OneHotEncoderEstimator()
  .setInputCols(indexer.getOutputCols)
  .setOutputCols(categoricalColumns.map(c => s"${c}_enc"))

val stages: Array[PipelineStage] = Array(indexer, encoder)
val pipeline = new Pipeline().setStages(stages)

最近,CODAIT机器学习团队与 Spark 社区中的其他人员携手合作,致力于为使用最广泛的 Spark ML 转换器(包括 StringIndexer, QuantileDiscretizer, Bucketizer, 以及全新的 OneHotEncoderEstimator)添加多列支持。这些增强功能(尚未完成的 StringIndexer 除外)都已经在最近发布的 Apache Spark 2.3.0中同步上线。

性能测试

为证明这些变更的影响,我们将具备多列支持和不具备多列支持的示例管道的执行时间进行了比较。我们随机生成 100 万行的输入数据帧,其中包含 100 个数值列。管道包括将这些数值列离散化为 10 个存储区(将它们转换为分类特征指标),然后使用独热编码将这些指标编码为特征向量。对于离散化步骤,使用了 QuantileDiscretizer(用于单列或多列输入),而对于独热编码,则将遗留 OneHotEncoder 用于单列输入,将全新 OneHotEncoderEstimator 用于多列输入(修复遗留 OneHotEncoder 的其他问题,创建同样支持多列的新类遗留编码器用于单列情况,可更准确地显示性能影响)。

性能测试在具备 4 个执行器(每个 48 个核心,100 GB 分配的 RAM)的集群上运行。总执行时间按秒计算,取三次运行的平均值。

Multi- vs single-column performance

可在上图中查看结果。多列管道的运行速度整体上是单列版本的 2.7 倍。性能提升主要来自于拟合阶段,此处出现了大部分计算工作。但是,甚至在转换阶段,多列版本的速度也加快了 30%,充分体现了消除多列查询计划开销的影响。

赶快使用这些全新的可扩展转换器,下载 Apache Spark 2.3.0 ,然后在分发版和 Mllib 文档中查看示例代码吧。管道构建愉快!

本文翻译自:Improve performance of ML pipelines for wide DataFrames in Apache Spark 2.3(2018-04-23)

加入讨论