The Apache Spark community announced the release of Spark 3.0 on June 18 and is the first major release of the 3.x series. The release contains many new features and improvements. It is a result of more than 3,400 fixes and improvements from more than 440 contributors worldwide. IBM Center of Open Source for Data and AI Technology (CODAIT) focuses on a number of selective open source technologies on machine learning, AI workflow, trusted AI, metadata, and big data process platform, etc., and has delivered approximately 200 commits, including a couple of key features in this Spark 3.0 release.
Spark 3.0 highlights
The announcement of release 3.0 introduces a number of important features and improvements:
- Adaptive query execution — Reoptimizing and adjusting query plans based on runtime statistics collected during query execution
- Dynamic partition pruning — Optimized execution during runtime by reusing the dimension table broadcast results in hash joins
- SQL compatibility — A number of enhancements on ANSI SQL compliance on syntax, function and store assignment policy, etc.
- Language — 3.0 moved to Python 3, Scala 2.12, and JDK 11
- PySpark — Significant improvements in pandas APIs
- New UI for structured streaming — Now contains a structured streaming tab, which provides information about running and completed queries statistics
- Performance — Up to 40x speedups for calling R user-defined functions and a 2x performance improvement in TPC-DS benchmark
- Accelerator-aware scheduler — Users can now specify and use hardware accelerators (e.g. GPUs) to improve performance on tasks such as deep learning
- SQL reference documentation — Detailed, easily navigable Spark SQL reference documentation includes syntax, semantics, keywords, and examples for common SQL usage
IBM contribution to Spark 3.0
Among highlighted features, IBM has made 190+ commits and contributed to Spark SQL, MLlib, PySpark, K8s, and R. Here is a brief summary of IBM contributions to Spark 3.0.
In Spark, SQL is the data processing component used by all the high-level libraries, including MLlib, Structural Streaming, PySpark, etc.
Enhanced explain command
The SQL explain command is very important because it provides logical/physical plans for an input statement. The current explain output is not very user-friendly and doesn’t contain all the detailed information. In Spark 3.0, IBM has worked on the following features to improve the explain command:
- SPARK-29366 — Print subqueries for Dynamic Partition Pruning (DPP) in explain.
- SPARK-29092 — Report additional information about a scan such as pushed-down filters, partition filters, location in Explain
- SPARK-27395 — Implement new explain format so output is more user-friendly
- SPARK-27480 — Improve
EXPLAIN DESC QUERYto show the input SQL statement
Spark SQL is the most active component in the Apache Spark ecosystem: It provides a robust SQL implementation, underlies MLlib, structural streaming, and GraphX functionalities, and is evolving rapidly. However, there exist differences between Spark SQL and the standard SQL syntax. It may be difficult for new users to learn Spark SQL — it is sometimes required to refer to the Spark source code, which is not feasible for all users. Thus, it is crucial to have a detailed, easily navigable Spark SQL reference documentation for Spark 3.0, featuring exact syntax and detailed examples. Such docs would obviate the need for end users to refer to the Spark source when learning Spark SQL syntax. IBM has driven the work of building a comprehensive Spark SQL reference in umbrella jira for end users SPARK-28588, which contains about 100 subtasks.
Other important SQL fixes and enhancements
- Add support for
SOME) aggregates which are part of the SQL standard (SPARK-19851)
FOR ALL COLUMNSin
ANALYZE TABLEto collect the statistics of all the columns without specifying the column names (SPARK-25458)
- Make important SQL commands to go through the same catalog/table resolution framework of V2 commands. This includes
RECOVER PARTITIONS) (SPARK-29612),
DROP PARTITIONS) (SPARK-29643),
RENAME PARTITIONS) (SPARK-29676),
ALTER TABLE SerDe properties(SPARK-29695),
ALTER VIEW QUERY(SPARK-29730), and
- Pushing left semi and left anti joins below operators such as project, aggregate, window, union etc. (SPARK-19712)
#to mark expression ID embedded in the subquery name in the
- Enforce idempotence on the
PullupCorrelatedPredicatesoptimizer rule (SPARK-28375)
- Make JDBC query option work with Oracle database (SPARK-27596)
- Convert .sql tests into UDF integrated test base, which includes outer-join.sql (SPARK-28285), pgSQL/join.sql (SPARK-28393), except.sql (SPARK-28277), and having.sql (SPARK-28281)
Spark Core is the fundamental unit of Spark. IBM has worked on a couple of important improvements, such as:
Spark ML is an important component in Spark. It provides common machine learning algorithms for large datasets on distributed systems. Spark 3.0 continues adding new important algorithms and improving existing ones.
Instance weight support
The majority of algorithms in Spark ML don’t support sample weights and treat all training samples with equal importance. Spark ML added sample weight support in many of the common algorithms; among these, IBM provided the weight support for KMeans (SPARK-29967) and BisectingKMeans (SPARK-30351).
ML transformer multi-column support
It is convenient and efficient for feature transformers to support transforming multiple input columns to output columns simultaneously. Spark 3.0 added multi-column support for several transformers. Among those, IBM provided multiple columns support for
StopWordsRemover (SPARK-29808) and Python multiple columns support for
QuantileDiscretizer (SPARK-22796), and
ML function parity between Scala and Python
Prior to Spark 3.0, there were many structure and hierarchy differences between ML Scala and Python. These damage parity and make the source code hard to maintain. IBM contributed to Scala and Python parity work in all Spark ML algorithms (classification, clustering, evaluation, feature, frequent pattern mining, recommendation, regression) (SPARK-28958).
Power iteration clustering support in SparkR
Power iteration clustering is a simple and scalable graph clustering method which is very fast on large datasets. IBM contributed to porting this algorithm to SparkR (SPARK-19827).
Python version of fit with validation set to Gradient Boosted Trees
Add fit with validation set to Gradient Boosted Trees in Python (SPARK-24333).
IBM continues contributing to PySpark, especially in Arrow and pandas. This includes the following important improvements in Spark 3.0:
- Upgrade Apache/Arrow to 0.12.0 (SPARK-26566)
- Increase the minimum pandas version to 0.23.2 (SPARK-28041)
- Improve pandas with Arrow by sending out-of-order record batches to improve performance (SPARK-25274)
- Cleanup and consolidate pandas UDF functionality (SPARK-27163)
- Prevent unnecessary copies of data in Arrow to pandas conversion with timestamps (SPARK-30640)
- Make pandas grouped UDFs skip over empty partitions to improve performance (SPARK-28128)
In addition to working on Spark 3.0 features and improvements, IBM also had three sessions in the Spark 2020 summit:
- Scaling up Deep Learning by Scaling Down
- Fine Tuning and Enhancing Performance of Apache Spark Jobs
- SQL Performance Improvements at a Glance in Apache Spark 3.0
IBM is strongly involved in the advancement of AI, machine learning, big data, and analytics tools globally, actively supporting ongoing improvements in Apache Spark.