Explore best practices for Spark performance optimization
Learn some performance optimization tips to keep in mind when developing your Spark applications.
I am a senior software engineer working with IBM’s CODAIT team. We work on open source projects and advocacy activities. I have been working on open source Apache Spark, focused on Spark SQL. I have also been involved with helping customers and clients with optimizing their Spark applications. Apache Spark is a distributed open source computing framework that can be used for large-scale analytic computations. In this blog, I want to share some performance optimization guidelines when programming with Spark. The assumption is that you have some understanding of writing Spark applications. These are guidelines to be aware of when developing Spark applications.
I find it useful to think and remember the following goals when developing and tuning your applications:
- Reduce the Network I/O
- Reduce Disk I/O
- Improve/optimize CPU utilization by reducing any unnecessary computation, including filtering out unnecessary data, and ensuring that your CPU resources are getting utilized efficiently
- Benefit from Spark’s in-memory computation, including caching when appropriate
Let’s look at some characteristics of Spark that help us improve performance.
Lazy loading behavior
Apache Spark has two kinds of operations: transformations and actions.
Spark has lazy loading behavior for transformations. That means it will not trigger the computation for the transformation; it only keeps track of the transformation requested. When you are writing your transformations that give you another dataset from an input dataset, you can code it in a way that makes the code readable. You do not need to worry about optimizing it and putting it all in one line because Spark will optimize the flow under the covers for you. It’s good to write the transformations using intermediate variables with meaningful names so it is easier to read your code.
Spark actions are eager in that they will trigger a computation for the underlying action. So pay attention when you have a Spark action that you only call when needed. For example,
count() on a dataset is a Spark action. It is a common issue that I have seen where there are multiple
count() calls in Spark applications that are added during debugging and they don’t get removed. It’s a good idea to look for Spark actions and remove any that are not necessary because we don’t want to use CPU cycles and other resources when not required.
When you are designing your datasets for your application, ensure that you are making the best use of the file formats available with Spark. Some things to consider:
- Spark is optimized for Apache Parquet and ORC for read throughput. Spark has vectorization support that reduces disk I/O. Columnar formats work well.
- Use the Parquet file format and make use of compression.
- There are different file formats and built-in data sources that can be used in Apache Spark.Use splittable file formats.
- Ensure that there are not too many small files. If you have many small files, it might make sense to do compaction of them for better performance.
- Increase the number of Spark partitions to increase parallelism based on the size of the data. Make sure cluster resources are utilized optimally. Too few partitions could result in some executors being idle, while too many partitions could result in overhead of task scheduling.
- Tune the partitions and tasks. Spark can handle tasks of 100ms+ and recommends at least 2-3 tasks per core for an executor.
- Spark decides on the number of partitions based on the file size input. At times, it makes sense to specify the number of partitions explicitly.
- The read API takes an optional number of partitions.
spark.sql.files.maxPartitionBytes, available in Spark v2.0.0, for Parquet, ORC, and JSON.
- The shuffle partitions may be tuned by setting
spark.sql.shuffle.partitions, which defaults to 200. This is really small if you have large dataset sizes.
Shuffle is an expensive operation as it involves moving data across the nodes in your cluster, which involves network and disk I/O. It is always a good idea to reduce the amount of data that needs to be shuffled. Here are some tips to reduce shuffle:
- Tune the
- Partition the input dataset appropriately so each task size is not too big.
- Use the Spark UI to study the plan to look for opportunity to reduce the shuffle as much as possible.
- Formula recommendation for
- For large datasets, aim for anywhere from 100MB to less than 200MB task target size for a partition (use target size of 100MB, for example).
spark.sql.shuffle.partitions= quotient (shuffle stage input size/target size)/total cores) * total cores.
Filter/Reduce dataSet size
Look for opportunities to filter out data as early as possible in your application pipeline. If there is a filter operation and you are only interested in doing analysis for a subset of the data, apply this filter early. If you can reduce the dataset size early, do it. Use appropriate filter predicates in your SQL query so Spark can push them down to the underlying datasource; selective predicates are good. Use them as appropriate. Use partition filters if they are applicable.
Spark supports the caching of datasets in memory. There are different options available:
- Use caching when the same operation is computed multiple times in the pipeline flow.
- Use caching using the persist API to enable the required cache setting (persist to disk or not; serialized or not).
- Be aware of lazy loading and prime cache if needed up-front. Some APIs are eager and some are not.
- Check out the Spark UI’s Storage tab to see information about the datasets you have cached.
- It’s good practice to unpersist your cached dataset when you are done using them in order to release resources, particularly when you have other people using the cluster as well.
Join is, in general, an expensive operation, so pay attention to the joins in your application to optimize them.
BroadcastHashJoin is most performant for cases where one of the relations is small enough that it can be broadcast. Below are some tips:
- Join order matters; start with the most selective join. For relations less than
spark.sql.autoBroadcastJoinThreshold, you can check whether broadcast
HashJoinis picked up.
- Use SQL hints if needed to force a specific type of join.
- Example: When joining a small dataset with large dataset, a broadcast join may be forced to broadcast the small dataset.
- Confirm that Spark is picking up broadcast hash join; if not, one can force it using the SQL hint.
- Avoid cross-joins.
HashJoinis most performant, but may not be applicable if both relations in join are large.
- Collect statistics on tables for Spark to compute an optimal plan.
Tune cluster resources
- Tune the resources on the cluster depending on the resource manager and version of Spark.
- Tune the available memory to the driver: spark.driver.memory.
- Tune the number of executors and the memory and core usage based on resources in the cluster: executor-memory, num-executors, and executor-cores.
Check out the configuration documentation for the Spark release you are working with and use the appropriate parameters.
Avoid expensive operations
order byif it is not needed.
- When you are writing your queries, instead of using select * to get all the columns, only retrieve the columns relevant for your query.
- Don’t call count unnecessarily.
- Ensure that the partitions are equal in size to avoid data skew and low CPU-utilization issues.
- As an example: If you have data coming in from a JDBC data source in parallel, and each of those partitions is not retrieving a similar number of records, this will result in unequal-size tasks (a form of data skew). Maybe one partition is only a few KB, whereas another is a few hundred MB. Some tasks will be larger than others, and while the executors on larger tasks will be busy, the other executors, which are handling the smaller task, will finish and be idle.
- If data at the source is not partitioned optimally, you can also evaluate the tradeoffs of using repartition to get a balanced partition and then use caching to persist it in memory if appropriate.
- Repartition will cause a shuffle, and shuffle is an expensive operation, so this should be evaluated on an application basis.
- Use the Spark UI to look for the partition sizes and task duration.
Spark has a number of built-in user-defined functions (UDFs) available. For performance, check to see if you can use one of the built-in functions since they are good for performance. Custom UDFs in the Scala API are more performant than Python UDFs. If you have to use the Python API, use the newly introduced pandas UDF in Python that was released in Spark 2.3. The pandas UDF (vectorized UDFs) support in Spark has significant performance improvements as opposed to writing a custom Python UDF. Get more information about writing a pandas UDF.
I hope this was helpful to you as you go about writing your Spark applications. Happy developing! In an upcoming blog, I will show how to get the execution plan for your Spark job.