The Blog

 

The Spark community in the open source Apache Software Foundation recently announced the release of Apache Spark v2.4.0. This release is the fifth major release based on its 2.x code stream. In this release, there are approximately 1135 JIRAs (new features and bug fixes) from nearly 200 contributors worldwide. In comparison to earlier releases in the 2.x stream, the size of JIRA activities in v2.4.0 shows that the Spark 2.x code stream is becoming mature and stable. Since IBM’s active participation up to the latest v2.4.0 release, we have made numerous contributions with over 1300 commits with 69,000+ lines of new code in the areas of Spark Core, SQL, MLlib, Streaming, PySpark, and SparkR. With the accelerated development of machine learning and artificial intelligence, the Spark Technology Center at IBM has also expanded its mission to deep learning and AI, and relaunched the Center for Open source, Data and AI Technology (CODAIT). With our shift toward deep learning, CODAIT remains committed to Spark core technology enhancements with a number of developers and committers, as well as to developer advocacy work to continue accelerating the business value of Spark and driving intelligence to business applications. In Spark v2.4.0, IBM has continued to contribute approximately 100 commits with nearly 6000 lines of new code. You can find the latest data of our contributions on the CODAIT Spark JIRA Dashboard.

As revealed in the release note, the Apache Spark v2.4.0 release is mostly focused on usability, stability, and performance improvement with major features such as adding Barrier Execution Mode for better integration with deep learning frameworks, adding 30+ built-in higher-order functions to deal with complex data types, and improving the Kubernetes integration and ORC file reading. Other major updates include the built-in Avro data source, Image data source, flexible streaming sinks, elimination of the 2-GB block size limitation during transfer, and Pandas UDF improvements. Among the many features highlighted in the release note, some were delivered by the IBM Spark team. The following is a brief summary of these features.

Support of EXCEPT ALL/INTERSECT ALL (SPARK-21274)

In SQL, set operations allow the results of multiple queries to be combined into a single result set. Set operators include UNION, INTERSECT, and EXCEPT. The ALL keyword when used along with the set operators ensures that the duplicate keys are preserved in the result set. In Spark 2.4, we have added support for EXCEPT ALL and INTERSECT ALL to make the Spark Engine compliant with the SQL standard. For example, the following EXCEPT ALL query returns a result set of (NULL, 1, 1, 4).

SELECT a FROM VALUES (1), (1), (1), (null), (null), (4) AS t1(a)
EXCEPT ALL
SELECT b FROM VALUES (1), (null), (3) AS t2(b)

The following INTERSECT ALL query returns a result set of (NULL, 1, 1).

SELECT a FROM VALUES (1), (1), (1), (null), (null), (4) AS t1(a)
INTERSECT ALL
SELECT b FROM VALUES (1), (1), (null), (3) AS t2(b)

Given that this is a general enhancement to SQL functions, this benefits general Spark users. More specifically, customers of the IBM SQL Service can take advantage of this enhancement to set operators.

Adding a new option for specifying the query to read from JDBC (SPARK-24423)

Spark provides APIs to read from and write to JDBC data sources. While using the read API to read from a JDBC table, various JDBC options can be specified to limit the data that is transmitted over the network. Spark 2.4.0 introduces an additional JDBC option query to allow you to specify a table expression that identifies the source table. Like other data-limiting options such as user-supplied predicates, this option can also be used to limit the data that is exchanged between the underlying JDBC source and Spark. For example:

val first_ten_rows_query = "select * from tableName limit 10"
val jdbcDf = spark.read
  .format("jdbc")
  .option("query", first_ten_rows_query)
  .load()

Implement precedence rule for set operations (SPARK-24966)

Before Spark 2.4, when multiple set operators were chained together, Spark gave equal precedence to all of the operators and executed them from left to right as they appear in the query. This behavior is not consistent per the SQL standard that mandates a higher precedence to the INTERSECT operator compared to the UNION and EXCEPT operators. This inconsistency is addressed in Spark 2.4.0.

Support column resolution of fully qualified column name (SPARK-19602)

This feature adds support for three part names .

Cache function name from the external catalog for lookupFunctions (SPARK-23486)

Before this improvement, if a function was called multiple times in an SQL query, there were unnecessary metastore accesses to resolve the named function, which significantly slowed down the query in process. With this pull request, Spark caches the function name from the external catalog, which improves performance in these queries.

Support ANSI-SQL-compliant syntax for GROUPING SET (SPARK-24424)

As per the SQL standard, the column list is optional when GROUPING SET is specified. Before release 2.4, the column list needed to be explicitly specified even when GROUPING SET is specified. This restriction is removed in 2.4, and the grouping specification is made standard-compliant. For example:

SELECT c1, c2, max(c3)
FROM t1
GROUP BY GROUPING SETS ((c1), (c1, c2))

In the previous example, column list is not specified after the group by clause.

Support FOR ALL COLUMNS in ANALYZE TABLE (SPARK-25458)

In Spark 2.4, the ANALYZE command is extended to collect statistics for all of the columns of a table. In the absence of this syntax, you must specify the column list explicitly.

ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR ALL COLUMNS;

In the previous example, table- and column-level statistics are computed and recorded for all of the columns of the specified table. Care must be taken to not use this command for very wide tables because computation of statistics is expensive and resource-intensive.

Add Python API for PIC to spark.ml (SPARK-19826)

Power Iteration Clustering (PIC) is a scalable graph clustering algorithm. It finds a very low-dimensional embedding of a data set using truncated power iteration on a normalized pair-wise similarity matrix of the data. This embedding turns out to be an effective cluster indicator, consistently outperforming widely used spectral methods such as NCut on real data sets. PIC is very fast on large data sets, running over 1,000 times faster than an NCut implementation based on the state-of-the-art Rational Asset Manager eigenvector computation technique.

Add EXCEPT ALL and INTERSECT ALL support in R (SPARK-25117)

In Spark 2.4, the R data set API is enhanced to add two new methods, intersectAll and exceptAll to support INTERSECT ALL and EXCEPT ALL operations respectively. For example:

exceptAllDf <- exceptAll(df1, df2)

Add R partitionBy API in DataFrame (SPARK-21291)

Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive’s partitioning scheme. Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. For partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.

Add R version of array operators

  • Add array_intersect/array_except/array_union/shuffle to SparkR (SPARK-25007)
  • Add array_remove /array_zip /map_from_arrays /array_distinct in Spark R (SPARK-24537)
  • Add array_join function to SparkR (SPARK-24187)
  • Add flatten function to SparkR (SPARK-24185)

Decrease memory consumption with toPandas() collection using Arrow (SPARK-23030)

This feature improved performance of using Pandas with Spark by using the Arrow streaming format when creating a Spark DataFrame from Pandas and when collecting a Spark DataFrame using toPandas().

Pandas Grouped Map UserDefinedFunction mixes column labels (SPARK-24324)

This function improved the usability of Pandas GroupedMap UDFs in Spark by letting you index columns by name.

Upgrade apache/arrow to 0.10.0 (SPARK-23874)

Upgraded Apache Arrow in Spark to version 0.10.0, which includes a number of improvements and fixes, and also allowed for the addition of BinaryType support, in Pandas operations (SPARK-23555)

Summary

In addition to IBM continued collaboration with the Apache Spark community to improve its usability, stability, and performance, we are also working on some features that will improve data federation capabilities of Spark to achieve higher performance. To gain the mind shares of the community, we have submitted the design and initial performance results of these features for community feedback, and they are:

Furthermore, we have submitted two pull requests with 3000+ new lines of code to create the ground work for query optimization techniques that rely on referential integrity constraints. (SPARK-21784, SPARK-23750).