For Apache Spark 1.6, I’ve been working to add Pearson correlation aggregation functionality to Spark SQL. The aggregation function is one of the expressions in Spark SQL. It can be used with the GROUP BY clause within SQL queries or DSL syntax within DataFrame/Dataset APIs. The common aggregation functions are sum, count, etc.
At first glance, Pearson correlation might not seem connected to the Spark SQL aggregation function. However, since we want to compute a single value from a group of data, we’ll see that it does natively fit to the nature of the aggregation function.
All aggregation functions as expressions are located in the file sql/catalyst/src/main/ scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala. The definition is:
case class Corr( left: Expression, right: Expression, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0)
In the snippet, left and right represent expressions (typically two columns in your DataFrame) that we can use for the Pearson correlation. Similarly, mutableAggBufferOffset and inputAggBufferOffset are parameters specified for the Spark SQL aggregation framework. They’re related to the positions of this expression in the aggregation buffer object during execution. Typically, you can leave the default values.
Spark SQL has two kinds of aggregation functions: ImperativeAggregate and DeclarativeAggregate. ImperativeAggregate provides a fixed interface including initialize(), update(), and merge() functions that work on aggregation buffers. DeclarativeAggregate is implemented with Catalyst expressions. Regardless of the kind of aggregation functions you choose to implement, your aggregation function must define the schema (aggBufferSchema) and attributes (aggBufferAttributes) that indicate the schema and attributes for the aggregation buffer used by this aggregation function.
For the Corr aggregation function, the schema and attributes are
def aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) val aggBufferAttributes: Seq[AttributeReference] = Seq( AttributeReference("xAvg", DoubleType)(), AttributeReference("yAvg", DoubleType)(), AttributeReference("Ck", DoubleType)(), AttributeReference("MkX", DoubleType)(), AttributeReference("MkY", DoubleType)(), AttributeReference("count", LongType)())
The code specifies six columns in the aggregation buffers and defines their attribute names and data types.
Because Corr implements the ImperativeAggregate interface, it needs to implement the initialize(), update(), and merge() functions.
The initialize() function has the signature: def initialize(buffer: MutableRow): Unit
Its goal is to initialize the aggregation buffer. You can see that the aggregation buffer is actually a row. We put the initial values into the buffer. The signature for the update() function is:
def update(buffer: MutableRow, input: InternalRow): Unit
It’s responsible for updating the content of the aggregation buffer with an input row. Corr’s update function follows the algorithm of Pearson correlation to update the buffer. In other words, we compute the co-variance for all rows in the same partition.
Then in the merge() function, we merge the two aggregation buffers from the two partitions:
def merge(buffer1: MutableRow, buffer2: InternalRow): Unit
The final evaluation calls the eval() function to compute the final result for this function based on the content of aggregation buffer:
def eval(buffer: InternalRow): Any
With the implementation of Pearson correlation aggregation function, we now can compute this measure between two columns in a DataFrame:
val df = Seq.tabulate(10)(i => (1.0 * i, 2.0 * i)).toDF("a", "b") val corr = df.groupBy().agg(corr("a", “b”)).collect()