Spark SQL is capable of integrating natively with a large number of input sources. These integrations are made possible through the inclusion of the Spark SQL Data Sources API. Spark introduced the new Data Sources API V2 in its 2.3 release with a cleaner design and addressed a number of limitations from V1. Similar to V1, its reader path supports performance optimization techniques, such as local predicate filter push-down and column pruning. As explained in Databricks’ 2.3 release introduction talk, here are the motivations and features of the new Data Sources API V2.

Weakness with V1

  • Leak upper-level API in the data source (DataFrame/SQLContext)
  • Hard to extend the Data Sources API for more optimizations
  • Zero transaction guarantee in the write APIs

V2 design goals

  • Written in the Java™ programming language
  • No dependency on upper-level APIs (DataFrame/RDD, etc.)
  • Easy to extend, can add new optimizations while keeping backward-compatibility
  • Can report physical information like size, partition, etc.
  • Streaming source/sink support
  • Flexible, powerful, and transactional write API
  • No change to end users

V2 features in Spark 2.3

  • Support for row-based scan and columnar scan
  • Column pruning and filter push-down
  • Can report basic statistics and data partitioning
  • Transactional write API
  • Streaming source and sink support for micro-batch and continuous mode

Data Sources developers are encouraged to try out V2 or migrate from V1 to V2 to take advantage of the benefits listed above. Let’s look at a simple example to demonstrate some of the reader path differences between V1 and V2. The sample is a simple dummy data source with two integer columns, i and j, filled with values ranging from 0 to 9. Although it’s simple, it’s designed to eliminate unneeded columns and filter out rows using local predicates with push-down optimizations. Therefore, it reduces the size of the data to be read from the data source and improves performance.

How to define a data source

Users can implement Data Sources API to read the required columns as well as push down local predicates to the data source.

Sample data source implementing Data Sources API V1 (Reference: FilteredScanSuite)

case class AdvancedDataSource(from: Int, to: Int)(@transient val sparkSession: SparkSession)
  extends BaseRelation
  with PrunedFilteredScan {

  override def sqlContext: SQLContext = sparkSession.sqlContext

  /**
    * Define dummy data source schema with two integer columns, i and j
    * Overrides method in [[BaseRelation]]
    *
    * @return data source schema
    */
  override def schema: StructType =
    StructType(
      StructField("i", IntegerType, nullable = false) ::
      StructField("j", IntegerType, nullable = false) :: Nil)

  /**
    * Returns the list of [[Filter]]s that this datasource may not be able to handle.
    * Overrides method in [[BaseRelation]]
    *
    * @param filters
    * @return unhandled filters
    */
  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
    def unhandled(filter: Filter): Boolean = {
      filter match {
        // Dummy data source is able to handle
        // GreaterThan local predicate on column 'i' only
        case GreaterThan("i", _: Int) => true
        case _ => false
      }
    }
    filters.filter(unhandled)
  }

  /**
    * Select required projection columns and push local predicates down to data source
    * then return data back to Spark.
    * Overrides method in [[PrunedFilteredScan]]
    *
    * @param requiredColumns
    * @param filters
    * @return an RDD containing all matching tuples as Row objects
    */
  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
    // construct row data with required column(s) only
    val rowBuilders = requiredColumns.map {
      case "i" => (i: Int) => Seq(i)
      case "j" => (j: Int) => Seq(-j)
    }
    // Local predicate test on integer column 'i'
    def translateFilterOnI(filter: Filter): Int => Boolean = filter match {
      case GreaterThan("i", v: Int) => (i: Int) => i > v
      case _ => (i: Int) => true
    }
    // apply local predicate
    def eval(a: Int) = {
      filters.forall(translateFilterOnI(_)(a))
    }
    // fetch row data
    sparkSession.sparkContext.parallelize(from to to).filter(eval).map(i =>
      Row.fromSeq(rowBuilders.map(_(i)).reduceOption(_ ++ _).getOrElse(Seq.empty)))
  }
}


The sample data source based on V1 needs to extends BaseRelation with PrunedFilteredScan. PrunedFilteredScan is a mix-in interface for BaseRelation. Here are the descriptions on the main API methods used in the sample code above.

def buildScan(requiredColumns: Array[String],
                filters: Array[Filter]): RDD[Row]

  • Select a subset of columns and push local predicates down to data source then return data back to Spark
  • PARAMETERS

  • requiredColumns: Array[String]
    • Reflect project list column pruning optimization
    • Select a subset of columns from data source
    • For SELECT count(*), the array is empty
  • filters: Array[Filter]
    • Reflect local predicate push-down optimization
    • Apply local predicate at data source
    • The actual filter should be the conjunction of all filters (i.e., they should be joined with “and”)

    RETURN VALUES

  • an RDD containing all matching tuples as row objects

  • def unhandledFilters(filters: Array[Filter]): Array[Filter]
    

  • Not all predicates can be pushed down. It’s Data Source’s job to define what kinds of predicates it’s able to handle. Unhandled filters will be returned to Spark and evaluated by Spark SQL after data is output by a scan.
  • PARAMETERS

  • filters: Array[Filter]
    • Local predicates that Spark SQL is capable of pushing down.
    • If no predicate is eligible for push-down, array is empty

    RETURN VALUES

  • filters: Array[Filter]
    • Local predicates that data source is unable to handle

    Sample data source implementing Data Sources API V2 (Reference: DataSourceV2Suite)

    class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
    
      class Reader extends DataSourceReader
        with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
    
        // dummy data source schema with two integer columns, i and j
        var requiredSchema = new StructType().add("i", "int").add("j", "int")
        var filters = Array.empty[Filter]
    
        /**
          * Push down required columns to the data source
          * Implements method in [[SupportsPushDownRequiredColumns]]
          *
          * @param requiredSchema
          */
        override def pruneColumns(requiredSchema: StructType): Unit = {
          this.requiredSchema = requiredSchema
        }
    
        /**
          * Pushes down local predicates to data source
          * Implements method in [[SupportsPushDownFilters]]
          *
          * @param filters
          * @return filters that need to be evaluated after scanning
          */
        override def pushFilters(filters: Array[Filter]): Array[Filter] = {
          val (supported, unsupported) = filters.partition {
            // Dummy data source is able to handle
            // GreaterThan local predicate on column 'i' only
            case GreaterThan("i", _: Int) => true
            case _ => false
          }
          this.filters = supported
          unsupported
        }
    
        override def pushedFilters(): Array[Filter] = filters
    
        override def readSchema(): StructType = {
          requiredSchema
        }
    
        /**
          * Returns a list of reader factories.
          * Each factory is responsible for creating a data reader to output data for one RDD partition.
          * Implements method in [[DataSourceReader]]
          *
          * @return a list of reader factories
          */
        override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
          // Local filter on integer column 'i'
          val lowerBound = filters.collect {
            case GreaterThan("i", v: Int) => v
          }.headOption
    
          val res = new ArrayList[DataReaderFactory[Row]]
          // retrieve data
          if (lowerBound.isEmpty) {
            res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema))
            res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
          } else if (lowerBound.get < 4) {
            res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 5, requiredSchema))
            res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
          } else if (lowerBound.get < 9) {
            res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 10, requiredSchema))
          }
    
          res
        }
      }
    
      /**
        * Creates a [[DataSourceReader]] to scan the data from this data source.
        * @param options the options for the returned data source reader, which is an immutable
        * @return reader for this data source
        */
      override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
    }
    


    The sample data source based on V2 needs to extend DataSourceV2 with ReadSupport. It also needs to define a Reader that extends DataSourceReader with SupportsPushDownRequiredColumns with SupportsPushDownFilters. SupportsPushDownRequiredColumns and SupportsPushDownFilters are both push-down optimization APIs equivalent to PrunedFilteredScan in V1. You can pick and choose any push-down optimization API you like or none. The reader encapsulates the logic to retrieve the data from the data source. It in turn calls AdvancedDataReaderFactory, which does the actual scan from the dummy data source.

    class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType)
      extends DataReaderFactory[Row] with DataReader[Row] {
    
      private var current = start - 1
    
      override def createDataReader(): DataReader[Row] = {
        new AdvancedDataReaderFactory(start, end, requiredSchema)
      }
    
      override def close(): Unit = {}
    
      override def next(): Boolean = {
        current += 1
        current < end
      }
    
      override def get(): Row = {
        val values = requiredSchema.map(_.name).map {
          case "i" => current
          case "j" => -current
        }
        Row.fromSeq(values)
      }
    }
    


    pushFilters in V2 is similar to unhandledFilters in V1, which pushes down filters to data source and sends back unhandled ones. pruneColumns in V2 is similar to requiredColumns in V1.

    Instantiate a new data source

    Let’s instantiate the sample data source defined above with rows ranging from 0 to 9, then test a simple query against the data source.

    Instantiate sample data source implementing Data Sources API V1

    class AdvancedDataSourceProvider extends RelationProvider {
      override def createRelation(
          sqlContext: SQLContext,
          parameters: Map[String, String]): BaseRelation = {
        AdvancedDataSource(parameters("from").toInt, parameters("to").toInt)(sqlContext.sparkSession)
      }
    }
    


    Then create an instance of the new data source using AdvancedDataSourceProvider. Let's try a simple query select i from ds1 where i > 3 against the sample data source.

    
    spark.sql(
      """
        |CREATE TEMPORARY VIEW ds1
        |USING org.apache.spark.sql.sources.AdvancedDataSourceProvider
        |OPTIONS (
        |  from '0',
        |  to '9'
        |)
      """.stripMargin).show()
    
    val df = spark.sql("select i from ds1 where i > 3")
    df.explain(true)
    df.show()
    


    Here is the query plan referencing V1 data source.

    Query plan for sample data source implementing Data Sources API V1

    == Parsed Logical Plan ==
    'Project ['i]
    +- 'Filter ('i > 3)
       +- 'UnresolvedRelation `ds1`
    
    == Analyzed Logical Plan ==
    i: int
    Project [i#0]
    +- Filter (i#0 > 3)
       +- SubqueryAlias ds1
          +- Relation[i#0,j#1] AdvancedDataSource(0,9)
    
    == Optimized Logical Plan ==
    Project [i#0]
    +- Filter (i#0 > 3)
       +- Relation[i#0,j#1] AdvancedDataSource(0,9)
    
    == Physical Plan ==
    *(1) Filter (i#0 > 3)
    +- *(1) Scan AdvancedDataSource(0,9) [i#0] PushedFilters: [GreaterThan(i,3)], ReadSchema: struct<i:int>
    


    If we take a closer look at the Optimized Logical Plan and Physical Plan, we will notice that:

    • Local predicate i > 3 from our test query has been pushed down to the data source for optimization.
    • ReadSchema: struct<i:int> reflects the project list column pruning optimization. Although there are two columns (i and j) defined in the data source, only column i is read from the data source.

    Instantiate sample data source implementing Data Sources API V2

    val cls = classOf[AdvancedDataSourceV2]
    
    withClue(cls.getName) {
      val ds = spark.read.format(cls.getName).load()
      ds.createOrReplaceTempView("ds1")
      val df = spark.sql("select i from ds1 where i > 3")
      df.explain(true)
      df.show()
    }
    


    Let's try the same query select i from ds1 where i > 3 against the sample data source implementing V2.

    Query plan for sample data source implementing Data Sources API V2

    == Parsed Logical Plan ==
    'Project ['i]
    +- 'Filter ('i > 3)
       +- 'UnresolvedRelation `ds1`
    
    == Analyzed Logical Plan ==
    i: int
    Project [i#0]
    +- Filter (i#0 > 3)
       +- SubqueryAlias ds1
          +- RelationV2 AdvancedDataSourceV2[i#0, j#1]
    
    == Optimized Logical Plan ==
    Filter isnotnull(i#0)
    +- RelationV2 AdvancedDataSourceV2[i#0] (Filters: [GreaterThan(i,3)])
    
    == Physical Plan ==
    *(1) Filter isnotnull(i#0)
    +- *(1) ScanV2 AdvancedDataSourceV2[i#0] (Filters: [GreaterThan(i,3)])
    


    As you can see, the query plans from V1 and V2 look almost identical. Same as what we observed for V1, local predicate push-down as well as project list column pruning optimization have been applied to the data source implementing Data Sources API V2. One difference to note is that push-down is now implemented as a optimizer rule in V2, whereas push-down will not happen until the physical planning stage in V1. The query result for select i from ds1 where i > 3 is exactly the same for both V1 and V2 as expected:

    +---+
    |  i|
    +---+
    |  4|
    |  5|
    |  6|
    |  7|
    |  8|
    |  9|
    +---+

    Conclusion

    The examples above demonstrate some of the basic features of Data Sources API V2, and there are more for you to explore. Download Spark 2.3 to give the new Data Sources API V2 a try. Start creating your own data source and exploring various push-down optimization techniques available and stay tuned for future enhancements. Happy coding!

    Join The Discussion

    Your email address will not be published. Required fields are marked *