In Apache Spark release v2.2, there are new extension points that allow you to customize the Spark session with your own optimizer, parser, analyzer, and physical planning strategy rules in Spark.

This post is intended for developers who want to customize their Spark application with their own optimizer, parser, analyzer, or physical planning strategy rules. We’ll look at the extension points that are available today and show you how to use them to extend the optimizer with a new rule. You’ll need a basic understanding of Spark’s catalyst engine and have access to Spark codeline and jars.

Spark extensions

The extension points have been added in SPARK-18127. Let’s take a look at the API and some of the internals of how Spark exposes these extension points.
Note: This is an experimental API, so it could change across releases.

  • New class: SparkSessionExtensions
    This class has all the methods that you need to store and retrieve the customized rules. This new class is in Spark SQL’s core component in the org.apache.spark.sql package. Customized rules are stored in instance member variables. Let’s look at the relevant methods in this class. You’ll need to know which method to pass in the customized rule.
    • Pass in customized rules
      This class has methods that start with prefix inject that allows you to pass in your customized rules. It has the following inject methods:
      • injectOptimizerRule – Adds Optimizer rules
      • injectParser – Adds parser extensions rules
      • injectPlannerStrategy – Adds physical planning strategy rules
      • injectResolutionRule – Adds analyzer rules
      • injectPostHocResolutionRule – Adds postHoc resolution rules in the analyzer
      • injectCheckRule – Adds check rules in the analyzer during the analysis phase
      For example: To add an optimizer rule, you’ll need to use the injectOptimizerRule method to inject your customized optimizer rule.
    • Retrieve customized rules
      The following methods have the prefix ‘build’ that are used by the Spark internals to retrieve the customized rules and pass them to the Spark components.
      • buildOptimizerRules
      • buildParser
      • buildPlannerStrategies
      • buildResolutionRules
      • buildPostHocResolutionRules
      • buildCheckRules
  • Pass customized rules to SparkSession
    There are two ways that you can pass customized rules from the Spark application:
    1. Use the new method withExtensions on SparkSession.Builder. The withExtensions method takes in a function that accepts a SparkSessionExtensions.
      def withExtensions(f: SparkSessionExtensions => Unit): Builder
      You can inject the customizable rules by providing a function that calls the inject methods on the passed SparkSessionExtensions object. The SparkSession.builder instantiates a SparkSessionExtensions object and calls the user function. When a SparkSession is created, it will pass in the SparkSessionExtensions object that has the injected rules.
    2. Use the Spark configuration parameter.
      You can pass the customization rules by using the configuration parameter ‘spark.sql.extensions’. You define a class that takes in the SparkSessionExtensions and calls the inject methods on it to pass in the customized rules. You’ll need to set the Spark configuration parameter ‘spark.sql.extensions’ to this class and then create your SparkSession.
      Note: This article does not cover an example for this specific case; however, you can take a look at the unit test suite in the Spark SQL codeline at org.apache.spark.sql.SparkSessionExtensionSuite that shows an example of this scenario.

Extend Spark with a new Optimizer rule

Let’s now look into using the extension points API to add a new Optimizer rule that collapses adjacent sorts in the plan into one sort. The following examples are written in Scala.

Step 1: Create an Optimizer rule class

To implement the collapse sorts optimizer rule, use the CollapseSorts class as shown in the following example:


/**
 
  * Add a new Optimizer rule: CollapseSorts
 
  * Collapse two adjacent sort operators into one if possible. Keep the last sort
 
  * This rule applies to the scenario where the globalOrder is same for the Sort nodes    
  * and then
 either a) The sorts are adjacent or b) In between two Sort nodes, there 
  * is a Filter or 
a Project

  */
case class CollapseSorts(spark: SparkSession) extends Rule[LogicalPlan]  {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case ss @ Sort( _, globalOrder, ns @ Sort ( _, g, grandChild))
        if globalOrder == g => ss.copy( child = grandChild)
      case ss @ Sort( _, globalOrder, p @ Project( _, c @ Sort( _, g, ggchild)))
        if globalOrder == g => ss.copy( child = p.copy( child = ggchild))
      case ss @ Sort( _, globalOrder, f @ Filter( _, c @ Sort( _, g, ggchild)))
        if globalOrder == g => ss.copy( child = f.copy( child = ggchild))
    }
  }


Note: You’ll need to understand the Spark catalyst internals to write your own Optimizer rule. The LogicalPlan, the Sort, Project, and Filter classes are all part of the LogicalPlan; these are Spark internal classes. You’ll need the Spark jars to compile our new CollapseSorts rule.

Step 2: Create a function with the Extension Points API to add a new rule

Create a function that takes the SparkSessionExtensions object and inject the CollapseSorts optimizer rule as shown in the following example:


type ExtensionsBuilder = SparkSessionExtensions => Unit
val f: ExtensionsBuilder = { e => e.injectOptimizerRule(CollapseSorts)}


Step 3: Enable the customized rule (CollapseSorts) in SparkSession

We’ll now pass the newly created function in Step 2 and pass it to the SparkSession.builder‘s withExtensions method to create our new Spark session.

val spark = SparkSession.builder().master("local[1]").withExtensions(f).getOrCreate()


Step 4: Verify the new optimizer rule

Let’s now verify that your new optimizer rule gets picked up in the Spark application. To test the new optimizer rule:

  1. Inject the optimizer rule and create a Spark session.
  2. Run a query that triggers the new rule.
  3. Get the optimized plan by using explain and verify that two adjacent sorts collapsed.
  4. Note: This rule gets called in the Optimizer so the changes will be seen in the optimized plan. Because of the change in the optimized plan, the physical plan now shows that there is only one Sort.

    Test the code

    The following test code creates a dataset that will have two sorts separated by a filter. Run ‘explain’ to see the query execution plan and then check the optimized plan. The optimized plan will only have one Sort.

    
    // Create a SparkSession passing in the extensions, injecting the CollapseSorts 
    // optimizer rule 
    val session = 
    SparkSession.builder().withExtensions(e => {e.injectOptimizerRule(CollapseSorts)})
                .master("local[1]")
                .getOrCreate()
    import session.implicits._
    // Simple data
    val data = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("a", "b")
    // query with 2 orderBy
    val query = data.select('a, 'b).orderBy('b.asc).filter("b == 2").orderBy('a.asc)
    // Let us get the execution plan for the query
    query.explain(true)
    query.show
    // Stop the underlying SparkContext in this session and clear out the active and
    // default session
    session.stop()
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()
    

    Verify the plan

    If you compare the same query without adding any extensions and run an ‘explain’ on it, you’ll see it has two sorts in the optimized plan. Let’s look at the relevant differences in the optimized plan in the following image.

    Verify the Spark Optimized Plan to see if CollapseSorts rule was exercised

    As you can see, after injecting the new rule, there is only one Sort in the optimized plan, whereas before adding the rule there were two Sorts. We have successfully verified that our rule is working.

    Execution Plan output

    The complete explain output for the query is shown below for the two scenarios.

    • Default behavior – This does not have the CollapseSorts rule
    • With the CollapseSorts rule added to the SparkSession

    ************* DEFAULT BEHAVIOR - Begin ******************
    == Parsed Logical Plan ==
    'Sort ['a ASC NULLS FIRST], true
    +- Filter (b#6 = 2)
       +- Sort [b#6 ASC NULLS FIRST], true
          +- Project [a#5, b#6]
             +- Project [_1#2 AS a#5, _2#3 AS b#6]
                +- LocalRelation [_1#2, _2#3]
    
    == Analyzed Logical Plan ==
    a: string, b: int
    Sort [a#5 ASC NULLS FIRST], true
    +- Filter (b#6 = 2)
       +- Sort [b#6 ASC NULLS FIRST], true
          +- Project [a#5, b#6]
             +- Project [_1#2 AS a#5, _2#3 AS b#6]
                +- LocalRelation [_1#2, _2#3]
    
    == Optimized Logical Plan ==
    Sort [a#5 ASC NULLS FIRST], true
    +- Sort [b#6 ASC NULLS FIRST], true
       +- Project [_1#2 AS a#5, _2#3 AS b#6]
          +- Filter (_2#3 = 2)
             +- LocalRelation [_1#2, _2#3]
    
    == Physical Plan ==
    *Sort [a#5 ASC NULLS FIRST], true, 0
    +- Exchange rangepartitioning(a#5 ASC NULLS FIRST, 200)
       +- *Sort [b#6 ASC NULLS FIRST], true, 0
          +- Exchange rangepartitioning(b#6 ASC NULLS FIRST, 200)
             +- *Project [_1#2 AS a#5, _2#3 AS b#6]
                +- *Filter (_2#3 = 2)
                   +- LocalTableScan [_1#2, _2#3]
    ************* DEFAULT BEHAVIOR - End ********************
    
    ************* WITH COLLAPSESORTS OPTIMIZER RULE -  Begin ******************
    == Parsed Logical Plan ==
    'Sort ['a ASC NULLS FIRST], true
    +- Filter (b#23 = 2)
       +- Sort [b#23 ASC NULLS FIRST], true
          +- Project [a#22, b#23]
             +- Project [_1#19 AS a#22, _2#20 AS b#23]
                +- LocalRelation [_1#19, _2#20]
    
    == Analyzed Logical Plan ==
    a: string, b: int
    Sort [a#22 ASC NULLS FIRST], true
    +- Filter (b#23 = 2)
       +- Sort [b#23 ASC NULLS FIRST], true
          +- Project [a#22, b#23]
             +- Project [_1#19 AS a#22, _2#20 AS b#23]
                +- LocalRelation [_1#19, _2#20]
    
    == Optimized Logical Plan ==
    Sort [a#22 ASC NULLS FIRST], true
    +- Project [_1#19 AS a#22, _2#20 AS b#23]
       +- Filter (_2#20 = 2)
          +- LocalRelation [_1#19, _2#20]
    
    == Physical Plan ==
    *Sort [a#22 ASC NULLS FIRST], true, 0
    +- Exchange rangepartitioning(a#22 ASC NULLS FIRST, 200)
       +- *Project [_1#19 AS a#22, _2#20 AS b#23]
          +- *Filter (_2#20 = 2)
             +- LocalTableScan [_1#19, _2#20]
    ************* WITH COLLAPSESORTS OPTIMIZER RULE -  End ********************
    

    How did our new rule get to the Optimizer?

    To learn about the internal flow, let’s see how the new rule was invoked by the Spark Catalyst Optimizer.

    1. The Optimizer has a hook to add new rules.
      Spark’s Catalyst Optimizer has rules in batches and one of the fixed batch has a placeholder for any added optimizer rules. The relevant classes are org.apache.spark.sql.execution.SparkOptimizer and org.apache.spark.sql.catalyst.optimizer.Optimizer in Spark SQL.

      Batch("Operator Optimizations", fixedPoint, Seq(..)

      The rules from the extendedOperatorOptimizationRules method get added to the batch.
      def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil

    2. We passed in the new rule by using the SparkSession.builder.withExtensions interface. The SparkSession.builder passes in the SparkSessionExtensions object to the SparkSession. The SparkSession stores this in a transient variable ‘extensions’.
    3. The SparkOptimizer instance gets instantiated during the SessionState creation for the SparkSession. When the SparkOptimizer is instantiated, the extendedOperatorOptimizationRules method is overridden and the customized rules are passed in from this method. The customized rules are retrieved from the buildOptimizerRules on the extensions object. You can take a look at the optimizer method in the org.apache.spark.sql.internal.BaseSessionStateBuilder class.
      Note: The rules are injected in a certain specific batch that is already predefined by Spark. The Optimizer has different batches, but the customizable rules are only added to a certain batch. In general, it is good to be aware of where the customized rules are plugged in and their order.

    Summary

    In this post, we looked at the extension points that were introduced with the SparkSessionExtensions API in Apache Spark v2.2. We walked through the steps of adding a new optimizer rule by using this API. You learn that you can use this API to implement customizations by using parser extensions, analyzer, or physical planning strategy rules. The steps are similar to what has been presented for the optimizer rule. Just write your class with the customization rule and create a function to inject the rule using the appropriate inject method and pass it to the SparkSession.builder.withExtensions method. Make sure you know where the rules get called because the order might have an impact on the implementation of your customized rule.

1 comment on"Learn the extension points in Apache Spark and extend the Spark Catalyst Optimizer"

  1. João A. Ferreira January 12, 2018

    Congratulations Sunitha K. It’s a very nice post with very useful
    informations. Thank you to share your knowledge.

Join The Discussion

Your email address will not be published. Required fields are marked *