Tuning a Spark ML model with cross-validation can be an extremely computationally expensive process. As the number of hyperparameter combinations increases, so does the number of models being evaluated. When running this process sequentially with a large number of models, if the training and evaluation of a model does not fully utilize the available cluster resources then that waste will be compounded for each model and lead to long run times.

Beginning with Apache Spark 2.3.0, it will be possible to enable model parallelism when performing model selection over a parameter grid with CrossValidator and TrainValidationSplit. Model parallelism means that more than one model can be trained and evaluated at once for a given set of hyperparameters. This can be a substantial improvement over sequentially training one model at a time. By enabling model parallelism, you can better utilize all available cluster resources when running a large Spark ML job.

New Spark API to set model parallelism

A new parameter is introduced in CrossValidator and TrainValidationSplit called parallelism. It takes an integer value (that must be >= 1, and defaults to 1) which will train models in serial just like previously. A value greater than 1 roughly means the number of models that can be evaluated in parallel. Here is how to set this parameter for a TrainValidationSplit with a level of parallelism of 4:

val tvs = new TrainValidationSplit()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.8)
  .setParallelism(4)

It is important to understand that the level of parallelism set does not mean you will necessarily see that factor of speedup, but instead it is more of an upper-bound and will control the maximum number of models that are trained and evaluated at once. The actual speedup that you will see depends on many factors and models will only be run in parallel if there are enough resources available in the cluster. Otherwise, models will be queued in the Spark scheduler and have to wait for the current jobs to complete before being run.

Benefits of model parallelism

When tuning ML models, Spark will naturally perform data parallelization, so what additional benefit does model parallelism give? If training and evaluating models sequentially, the parallelism achieved is dependent on data partitioning and how the stages of each model execute, which might not use all resources all the time. To run your cluster at maximum efficiency, you will want to make sure that all of the available resources remain fully utilized for the duration of processing the workload. By enabling model parallelism, you can easily ensure that as soon as the required resources are available the next model will be trained and evaluated, instead of simply waiting for the previous model to complete.

Example with cross-validation

Let’s work through a simple example that will demonstrate how model parallelism can pay off. This example creates a linear regression estimator and builds a 6×2 parameter grid to select the values for parameters regParam and fitIntercept using cross-validation. With this grid, 12 models must be training and evaluated for each fold during cross validation. The parallelism parameter of CrossValidator is set to 3, which allows up to 3 models to be trained and evaluated in parallel, if there are enough resources available in the cluster.

val lr = new LinearRegression()

val paramMaps = new ParamGridBuilder()
    .addGrid(lr.fitIntercept)
    .addGrid(lr.regParam, Array(0.001, 0.003, 0.01, 0.03, 0.1, 0.3))
    .build()

val eval = new CrossValidator()
    .setEstimator(lr)
    .setEvaluator(new RegressionEvaluator())
    .setEstimatorParamMaps(paramMaps)
    .setNumFolds(4)
    .parallelism(3)

val cvModel = eval.fit(dataset)

Performance comparison tests

Taking the above example, performance tests were done on a cluster to compare running cross-validation in serial (parallelism set to 1) vs. a level of parallelism of 3. The input data will have 10 features of randomly generated double values with the number of records ranging from 100,000 to 5,000,000, and will be split into 10 partitions.

Cluster details

The cluster used for the tests consists of 4 nodes with 16 GB memory each and 30 total available cores. The memory for the driver and executors are set to 4 GB each. Spark was run as a standalone cluster manager with the master and driver running on a node. The remaining configuration uses Spark defaults.

Test results

The data below shows the elapsed wall clock time in milliseconds to complete 1 run of cross-validation.

Since the cluster has 30 available cores and the input data has 10 partitions, running cross-validation in serial will only make use of up to 10 cores at once while with a level of parallelism set to 3, it can run 3 models at once and utilize up to all 30 cores. The actual speedup seen in these tests is ~2.4 on average, which is not quite to set level of 3 due to other overhead besides data processing.

Best practices

Since the parallelism parameter is configured with just a single integer, it might be difficult to know what the best value to set is. Setting the value too low, or using the default of 1, might not fully utilize the cluster. Setting the value extremely high might result in excessive memory usage or overloading the Spark scheduler. For a given workload, there will be a point of getting diminishing returns on speedup as the parallelism value is increased. For the example above, this value is 3 and increasing it higher will not show much improvement, if any. If you are able to run some experiments, start with a low value and increase until there is not much improvement. A good rule-of-thumb is to choose a value not more than 10 for a mid-sized cluster.

Learn more

Nick Pentreath and Bryan Cutler will be giving a talk at Spark+AI Summit in June to dive deeper into this new functionality, discuss performance implications and best practices, as well as preview ongoing work in progress to reduce the amount of computation required when tuning ML pipelines by eliminating redundant transformations and intelligently caching intermediate datasets. Get the details for this event.

Join The Discussion

Your email address will not be published. Required fields are marked *