Spark currently supports JDBC Data Source, which works with DB2, Oracle, Derby, MS SQL Server, MySQL, Postgres and Teradata. If you use a different database, you’ll likely have problems if you try to use it with Spark JDBC Data Source. There could be data type mapping inconsistency between your database and Spark; that is, some of the data types Spark uses are not supported by your database, and vice versa. Additionally, there might be other database-specific properties that are not consistent with the Spark JDBC Data Source default values. For example, Spark JDBC Data Source uses double quotes as a delimited identifier, but some databases may use a single quote instead.

To make your database work properly with Spark JDBC Data Source, you may need to implement your specific database dialect. This article will show you step by step how to do this.

In Spark code (https://github.com/apache/spark.git), inside the package org.apache.spark.jdbc, there is an abstract class JdbcDialect. It’s used to handle the SQL dialect of a certain database or JDBC driver. The following dialects have been implemented in Spark:

  • DB2Dialect
  • DerbyDialect
  • MsSqlDialect
  • MySQLDialect
  • OracleDialect
  • PostgresDialect
  • TeradataDialect

If you have a different database, you might need to implement your own database dialect. I will use SQLite as an example to show how to implement your own database dialect.

Let’s take a look of the methods in JdbcDialect.

canHandle

/**
   * Check if this dialect instance can handle a certain jdbc url.
   * @param url the jdbc url.
   * @return True if the dialect can be applied on the given jdbc url.
   * @throws NullPointerException if the url is null.
   */
  def canHandle(url : String): Boolean

You will need to override this API for SQLite. The SQLite JDBC URL is jdbc:sqlite, so in SQLiteDialect, you will need to override the canHandle API to the following:


override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlite")

getCatalystType

  /**
   * Get the custom datatype mapping for the given jdbc meta information.
   * @param sqlType The sql type (see java.sql.Types)
   * @param typeName The sql type name (e.g. "BIGINT UNSIGNED")
   * @param size The size of the type.
   * @param md Result metadata associated with this type.
   * @return The actual DataType (subclasses of [[org.apache.spark.sql.types.DataType]])
   *         or null if the default type mapping should be used.
   */
  def getCatalystType(
    sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = None

getCatalystType is used when reading data from a Spark JDBC Data Source. First, check the method JdbcUtils.getCatalystType to see if the mapping between the JDBC type and Spark Catalyst type covers all your database types correctly. Here is the mapping:

  /**
   * Maps a JDBC type to a Catalyst type.  This function is called only when
   * the JdbcDialect class corresponding to your database driver returns null.
   *
   * @param sqlType - A field of java.sql.Types
   * @return The Catalyst type corresponding to sqlType.
   */
  private def getCatalystType(
      sqlType: Int,
      precision: Int,
      scale: Int,
      signed: Boolean): DataType = {
    val answer = sqlType match {
      // scalastyle:off
      case java.sql.Types.ARRAY         => null
      case java.sql.Types.BIGINT        => if (signed) { LongType } else { DecimalType(20,0) }
      case java.sql.Types.BINARY        => BinaryType
      case java.sql.Types.BIT           => BooleanType // @see JdbcDialect for quirks
      case java.sql.Types.BLOB          => BinaryType
      case java.sql.Types.BOOLEAN       => BooleanType
      case java.sql.Types.CHAR          => StringType
      case java.sql.Types.CLOB          => StringType
      case java.sql.Types.DATALINK      => null
      case java.sql.Types.DATE          => DateType
      case java.sql.Types.DECIMAL
        if precision != 0 || scale != 0 => DecimalType.bounded(precision, scale)
      case java.sql.Types.DECIMAL       => DecimalType.SYSTEM_DEFAULT
      case java.sql.Types.DISTINCT      => null
      case java.sql.Types.DOUBLE        => DoubleType
      case java.sql.Types.FLOAT         => FloatType
      case java.sql.Types.INTEGER       => if (signed) { IntegerType } else { LongType }
      case java.sql.Types.JAVA_OBJECT   => null
      case java.sql.Types.LONGNVARCHAR  => StringType
      case java.sql.Types.LONGVARBINARY => BinaryType
      case java.sql.Types.LONGVARCHAR   => StringType
      case java.sql.Types.NCHAR         => StringType
      case java.sql.Types.NCLOB         => StringType
      case java.sql.Types.NULL          => null
      case java.sql.Types.NUMERIC
        if precision != 0 || scale != 0 => DecimalType.bounded(precision, scale)
      case java.sql.Types.NUMERIC       => DecimalType.SYSTEM_DEFAULT
      case java.sql.Types.NVARCHAR      => StringType
      case java.sql.Types.OTHER         => null
      case java.sql.Types.REAL          => DoubleType
      case java.sql.Types.REF           => StringType
      case java.sql.Types.REF_CURSOR    => null
      case java.sql.Types.ROWID         => LongType
      case java.sql.Types.SMALLINT      => IntegerType
      case java.sql.Types.SQLXML        => StringType
      case java.sql.Types.STRUCT        => StringType
      case java.sql.Types.TIME          => TimestampType
      case java.sql.Types.TIME_WITH_TIMEZONE
                                        => TimestampType
      case java.sql.Types.TIMESTAMP     => TimestampType
      case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
                                        => TimestampType
      case -101                         => TimestampType // Value for Timestamp with Time Zone in Oracle
      case java.sql.Types.TINYINT       => IntegerType
      case java.sql.Types.VARBINARY     => BinaryType
      case java.sql.Types.VARCHAR       => StringType
      case _                            =>
        throw new SQLException("Unrecognized SQL type " + sqlType)
      // scalastyle:on
    }

    if (answer == null) {
      throw new SQLException("Unsupported type " + JDBCType.valueOf(sqlType).getName)
    }
    answer
  }

If any of the type mapping in the above code doesn’t work for your specific database, you will need to customize the mapping by overriding the JdbcDialects.getCatalystType API in your dialect class. In SQLite, one of the data type storage classes is NULL. If the data is null, the SQLite JDBC driver returns java.sql.Types.NULL for a few types, such as Bit and Byte. This will cause an Unsupported Type Exception in Spark JDBC, so we will need to map Bit and Byte to the correct Spark Catalyst Types by overriding the SQLiteDialect.getCatalystType API:

  override def getCatalystType(
              sqlType: Int,
              typeName: String,
              size: Int,
              md: MetadataBuilder): Option[DataType] = sqlType match {
    case Types.NULL =>
      typeName match {
        case "BIT" => Option(BooleanType)
        case "BYTE" => Option(IntegerType)
        case _ => None
      }
    case _ => None
  }

getJDBCType

  /**
   * Retrieve the jdbc / sql type for a given datatype.
   * @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]])
   * @return The new JdbcType if there is an override for this DataType
   */
  def getJDBCType(dt: DataType): Option[JdbcType] = None

  /**
   * Quotes the identifier. This is used to put quotes around the identifier in case the column
   * name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
   */
  def quoteIdentifier(colName: String): String = {
    s""""$colName""""
  }

Check the data types that Spark uses to write to the JDBC Data Source; make sure all these data types are supported by your database. If any of these data types are not supported by your database, you will need to map them to the one that supports by your database by overriding the getJDBCType method. First, go to the JdbcUtils class inside the org.apache.spark.jdbc package; you will find the getCommonJDBCType method as shown below:

def getCommonJDBCType(dt: DataType): Option[JdbcType] = {
  dt match {
    case IntegerType => Option(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
    case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
    case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
    case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
    case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
    case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
    case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))
    case DateType => Option(JdbcType("DATE", java.sql.Types.DATE))
    case t: DecimalType => Option(
      JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => None
  }
}

By comparing the data types in the above method and the data types that are supported by SQLite, you will find that SQLite supports all these data types. However, since SQLite supports SMALLINT, we can map ShortType to SMALLINT instead of INTEGER by overriding the getJDBCType API.

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match 
{
  case ShortType => Option(JdbcType("SMALLINT", Types.SMALLINT))
  case _ => None
}

quoteIdentifier

/**
 * Quotes the identifier. This is used to put quotes around the 
 * identifier in case the column name is a reserved keyword, or in
 * case it contains characters that require quotes (e.g. space).
 */

def quoteIdentifier(colName: String): String = {
  s""""$colName""""
}

SQLite can use double quotes, single quotes, or a backtick to quote the identifier, so there is no need to override this API.

getTableExistsQuery and getSchemaQuery

/**
 * Get the SQL query that should be used to find if the given table
 * exists. Dialects can override this method to return a query
 * that works best in a particular database.
 * @param table  The name of the table.
 * @return The SQL query to use for checking the table.
 */

def getTableExistsQuery(table: String): String = {
  s"SELECT * FROM $table WHERE 1=0"
}

/**
 * The SQL query that should be used to discover the schema of a
 * table. It only needs to ensure that the result set has the same
 * schema as the table, such as by calling
 * "SELECT * ...". Dialects can override this method to return a
 * query that works best in a particular database.
 * @param table The name of the table.
 * @return The SQL query to use for discovering the schema.
 */
@Since("2.1.0")
def getSchemaQuery(table: String): String = {
  s"SELECT * FROM $table WHERE 1=0"
}

Spark uses “SELECT * FROM $table WHERE 1=0” for both getTableExistsQuery and getSchemaQuery. You will need to check if this SQL statement works with your database. If not, you will need to override this API in your dialect implementation. SQLite supports this SQL statement, so there is no need to do anything here.

After implementing your own database dialect, that is, SQLiteDialect, there are two ways to use it. You can open a jira and submit a Pull Request to integrate this new class into Spark. You will need to put this new class under the org.apache.spark.sql.jdbc package, along with DB2Dialect, OracleDialect, PostgresDialect and other database dialects. Also, you will need to register SQLiteDialect in JdbcDialects using registerDialect(SQLiteDialect). After the PR is integrated into Spark, SQLiteDialect will be part of the Spark code.

If you don’t want to integrate your database dialect into Spark, you will have to include your database dialect in your Spark JDBC Data Source program. In addition, in your JDBC Data Source program, you will need to register your database dialect before using it, and unregister it afterwards. For example:

JdbcDialects.registerDialect(SQLiteDialect)
val df = spark.read.jdbc(urlWithUserAndPass, "TESTDB", new Properties())
df.filter($"Col1" > 21).show()
...... 
JdbcDialects.unregisterDialect(SQLiteDialect)

4 comments on"Customize Spark JDBC Data Source to work with your dedicated database dialect"

  1. Nice article and topic!

    May I suggest a link to the IBM Informix JDBC Dialect? It is part of the code I wrote I wrote to support the series or articles I wrote for developerWorks explaining Informix with Spark and going further with Spark. The articles are here: https://www.ibm.com/developerworks/opensource/library/ba-offloading-informix-data-spark/index.html?ca=drs- and the GitHub repo is here: https://github.com/jgperrin/net.jgp.labs.informix2spark.

  2. A nice link and well written articles!

  3. Vincent Van Steenbergen December 18, 2017

    “You can open a jira and submit a Pull Request to integrate this new class into Spark.” Has this been done already? If so would be worth adding a link to the JIRA ticket & PR?

  4. How can I customize the schema to use different varchar lengths for a table as well as include other stringtypes such as nvarchar?

Join The Discussion

Your email address will not be published. Required fields are marked *