Apache Spark MLlib’s DataFrame-based API provides a simple, yet flexible and elegant framework for creating end-to-end machine learning pipelines. Leveraging the power of Spark’s DataFrames and SQL engine, Spark ML pipelines make it easy to link together the phases of the machine learning workflow, from data processing, to feature extraction and engineering, to model training and evaluation.

However, while Spark SQL can provide significant performance gains to some parts of the ML workflow, in other areas there are important shortcomings. One of these is that many of the most commonly used Spark ML components operate on a single column at a time. This particularly impacts the common use case of “wide” datasets, where there are many variables or features that typically need to be processed in the same manner (for example, encoding many categorical feature columns or discretizing many numerical feature columns). The resulting ML pipeline must have a separate and independent transformer for each feature column, which makes constructing the pipeline somewhat cumbersome, leading to code such as:

val indexers = categoricalColumns.map { inputColumn =>
  new StringIndexer()
    .setInputCol(inputColumn)
    .setOutputCol(s"${inputColumn}_idx")
}

val encoders = indexers.map { indexer =>
  new OneHotEncoder()
    .setInputCol(indexer.getOutputCol)
    .setOutputCol(s"${indexer.getOutputCol}_enc")
}

val stages: Array[PipelineStage] = indexers ++ encoders
val pipeline = new Pipeline().setStages(stages)

More importantly, this approach makes executing the pipeline inefficient, due to the overhead introduced by independent query planning and optimization of each stage, together with the missed opportunity to extract any performance gains from processing multiple dataframe columns in a single pass. Finally, as the number of stages in the pipeline grows, the cost of analyzing the independent query plans also grows incrementally (see SPARK-20392 and SPARK-19846 for details). The net result is poor performance when transforming many dataframe columns in a pipeline, especially when the number of rows is relatively small (because with larger data sizes the query planning overhead is lower relative to the computation time).

Multi-column transformers

Instead, it makes sense to have a transformer that is able to process multiple input columns at once. This makes pipeline construction simpler and eliminates the overhead associated with having multiple independent transformers for each column. The previous code becomes:

val indexer = new StringIndexer()
  .setInputCols(categoricalColumns)
  .setOutputCols(categoricalColumns.map(c => s"${c}_idx"))

val encoder = new OneHotEncoderEstimator()
  .setInputCols(indexer.getOutputCols)
  .setOutputCols(categoricalColumns.map(c => s"${c}_enc"))

val stages: Array[PipelineStage] = Array(indexer, encoder)
val pipeline = new Pipeline().setStages(stages)

The CODAIT machine learning team together with others in the Spark community have recently been working on adding support for multiple columns to the most widely used Spark ML transformers, including StringIndexer, QuantileDiscretizer, Bucketizer, and the new OneHotEncoderEstimator. These enhancements (with the exception of StringIndexer, which is still in progress) are available in the recently released Apache Spark 2.3.0.

Performance tests

To demonstrate the impact of these changes, we compared the execution time for an example pipeline, with and without multi-column support. We randomly generated an input dataframe of 1 million rows, with 100 numeric columns. The pipeline consisted of discretizing the numerical columns into 10 buckets (turning them into categorical feature indices) followed by one-hot encoding these indices into feature vectors. For the discretization step, the QuantileDiscretizer was used (with either single- or multi-column input), while for one-hot-encoding the legacy OneHotEncoder was used for single-column input and the new OneHotEncoderEstimator was used for multi-column input (to fix other issues with the legacy OneHotEncoder, a new class was created that also supported multiple columns. The legacy encoder is used for the single column case to more accurately show the performance impact).

The performance tests were run on a cluster with 4 executors (each with 48 cores and 100 GB of allocated RAM). Total execution times are measured in seconds and averaged over 3 runs.

Multi- vs single-column performance

The results can be seen in the previous chart. The multi-column pipeline is 2.7x faster overall than the single-column version. Much of the performance gain comes from the fitting phase, which is where most of the computation occurs. However, even in the transform phase, the multi-column version is 30% faster, showing the impact of eliminating the overhead of query planning for multiple columns.

To use these new, scalable transformers, download Apache Spark 2.3.0 and take a look at the example code in the distribution and in the MLlib documentation. Happy pipelining!

Join The Discussion

Your email address will not be published. Required fields are marked *