George Wang

 

Abstract:

Spark can be used in conjunction with DB2 for z/OS to perform analytical tasks on vast amounts of data without impacting transactional performance. This chapter provides a scenario on how to provide this capability for users running XML analytics using the Spark framework with SparkSQL. The concept makes the use of the in-memory clustered computing without storing the data outside of an existing database.

This is a proof-of-concept prototype that showcases that heavy-lifting analytical workload outside of the relational database. The XML data is transferred from DB2 for z/OS into Spark memory structures, enabling fast computing and advanced analytics by using the Spark system.

Background:

DB2 is a powerful database product designed for storing, analyzing and retrieving data efficiently. However, it is sometimes not the desired solution for running analytics applications against massive amounts of data, requiring repetitive large table scans. A new modernized technology for analytics and machine learning can mitigate the challenges as cost-friendly as possible.

This project was initiated to address a requirement shared by a number of IBM DB2 for z/OS customers globally. Today, XML data can be manipulated using script language, such as XQuery, xPath, etc. A DB2 for z/OS user can write query expressions to navigate through XML’s hierarchical data structures, and receive sequences of XML documents. But streaming real time analytics becomes more pervasive, and there will be more demand to process data in XML documents using analytical query processing support.

Use case scenario:

This is an example from a financial institute using DB2 queries on 400 TB of tax payers’ profile information which is stored as XML documents within the database. The XML data can be analyzed interactively by the user via online analytics processing tools from multiple dimensions.

The main goal of this use case is to enable advanced analytics on this business data without impacting the existing Online Transactional Processing (OLTP). Such analytics allows users data scientists to run queries on XML data that originates on the mainframe for various purpose such as IT monitoring, fraud detection, hotspot analysis, etc.

This DB2 use case introduces a number of challenging issues. From system’s perspective, no technology is available to analyze data in XML format that uses a real time query engine such as Hadoop Distributed File System(HDFS). Secondly, a large scale data offloading process from a DB2 XML table to Hadoop clusters is a performance bottleneck and would impose a complete new set of data currency issues if the XML is needed to be maintained continuously. Furthermore, DB2 has support for XML manipulation and functions for parsing. They are very effective for transactional processing like finding specific sub-nodes in a XML database. On the other hand they are not designed for XML analytics, such as shredding the data and running analytical applications or tooling.

Processing large amounts of XML data is a challenge in DB2 for z/OS today. Parsing XML on the z/OS platform is costly in terms of CPU consumption and potentially impractical for very large documents. In addition, it might consume significant amounts of memory and potentially result in storage issues. The problem gets more severe if the XML document is large and deeply nested or has many optional fields.

Addressing these requirements is key to success for DB2 in terms of enabling large scale XML analytical support. Starting from DB2 for z/OS Version 10, the OLAP capability on the mainframe are generally supported by the IBM DB2 Analytics Accelerator. However, the XML data type is currently not supported in this combination which requires a solution outside of this well-known accelerator.

Solution:

The solution to this concrete business problem is to incorporate Apache Spark with DB2 for processing mainframe hosted XML data by developing a Spark application. The overall design is in 2 steps: enabling fast data pipelining from DB2 into a Spark RDD. Once the data is loaded in Spark as collections of data partitions, transform the XML data for aggregation and analytics. The purpose of doing so is to push heavy lifting analytical processing workloads outside of DB2 to reduce CPU cost and enable new machine learning capabilities by using the Spark framework. Data scientists can then query the data by scanning the entire XML table in DataFrame format in which now complex queries can run faster more efficient than before while retaining the transactional lookup performance for specific XML nodes of the DB2 engine.

Spark on DB2 for z/OS Data:

To achieve this goal, the middleware support is given by data streaming from DB2 via a Spark API called JDBCRDD. This function call in Java is used to load data from DB2 database via a JDBC connection driver. The data is kept in Spark memory as needed, instead of being persisted on HDFS. This function call creates multiple tasks for JDBC connections, each executing sub-queries for a portion of the source table data. The number of tasks is based on the Spark optimizer by using a key range. As a result, data is streamed into Spark memory as parallel data scans from different DB2 partitions of the XML table.

Apache Spark supports building standalone applications in Scala, Java and other programming languages for advanced analysis. Below is the processing flow chart that shows the flow of the application being processed.

1. Load Data Frame with JDBC data source where the XML is received as string (CLOB)

2. Parse the XML in Scala xml document format

3. Load all records from high-level XML nodes where all nodes’ values are normalized and mapped to a record/case class. This is essentially mapping the unstructured XML data to structured records of values that are interesting for later analytical processing

4. Register the DataFrame as a temporary table

5. Optionally cache the registered table

6. Run queries using Spark SQL or use ML algorithms to detect correlations

 

1

 

A following sample code is used to demonstrate the feasibility of the Spark application. The source data that is processed in this example is retrieved from the Organization for Economic Cooperation and Development. The content of data is matrices showing employment rates of European countries on quarterly basis, sorted by gender. This data was loaded into a DB2 for z/OS table using the XML data type. The initial data is pipelined into Spark memory as shown in the following example:

  • Connect to DB2 via JDBC
  • Construct JDBC URL:

    val url="jdbc:db2://theDB2Server:446/DBLOC"
    url: String = jdbc:db2://theDB2Server:446/DBLOC

    Create a connection properties object with username/password:

    val prop = new java.util.Properties
    prop: java.util.Properties = {}

    prop.setProperty("user","db2user")
    prop.setProperty("password","secret")

    res158: Object = null
    res159: Object = null

  • Load XML as Strings (sample output)
  • Get a DataFrame with JDBC data-source (url, table_name, connection_properties)

    val xmlRecord = sqlContext.read.jdbc(url,"tbdk157a",prop)
    xmlRecord: org.apache.spark.sql.DataFrame = [ID1: int, MYXML1: string]

    +---+-----------------------+
    |ID1| MYXML1|
    +---+-----------------------+
    | 1 |<DataSet keyFamil...|
    | 2 |<DataSet keyFamil...|
    +---+-----------------------+

  • Creating a case class to store the XML attribute values in relational form
  • case class Record(countryID: String, gender: String, age: String, time:String, obsValue: Double)
    defined class Record

  • Parsing XML into document
  • val xmleveryDoc = xmlDoc.collect.map( _.getString( 0 ) ).map( scala.xml.XML.loadString( _ ) )
    xmleveryDoc: Array[scala language=".xml.Elem"][/scala] = Array(
    <DataSetkeyFamilyURI="http://stats.oecd.org/RestSDMX/sdmx.ashx/GetKeyFamily/GENDER_EMP">
    <KeyFamilyRef>GENDER_EMP</KeyFamilyRef>
    <Series>
    <SeriesKey>
    <Value value="AUS" concept="COU"/>
    <Value value="EMP4_E" concept="IND"/>
    <Value value="MEN" concept="SEX"/>
    <Value value="15PLUS" concept="AGE"/>
    <Value value="Q1-2007" concept="TIME"/>
    </SeriesKey>
    <Attributes> <Value value="Percentage" concept="UNIT"/>
    <Value value="0" concept="POWERCODE"/>
    </Attributes>
    <Obs>
    <ObsValue value="69.3"/>
    </Obs>
    </Series>
    <Series>
    <SeriesKey>
    <Value value="AUS" concept="COU"/>
    <Value value="EMP4_E" concept="IND"/>
    <Value value="MEN" concept="SEX"/>
    <Value value="15PLUS" concept="AGE"/>
    <Value value="Q1-2008" concept="TIME"/>
    </SeriesKey>
    <Attributes>
    <Value value="Percentage" concept="UNIT"/...

    Load all records from Series node where all nodes’ values are normalized and mapped to the Record class. Here only Country, Gender, Age, Time, and employment rate value are selected as part of the case class attributes. Note that the mapping scheme s => s \\ “@value” interprets that the values are always in the same order, therefore the transformation can always easily be mapped to array n for the pseudo class.

    val allSeriesRecord = (xmleveryDoc(0) \\ "Series").map(s => s \\ "@value" ).toArray.map{ n => Record(n(0).text, n(2).text, n(3).text, n(4).text, n(7).text.toDouble) }
    allSeriesRecord: Array[Record] = Array(Record(AUS,MEN,15PLUS,Q1-2007,69.3),
    Record(AUS,MEN,15PLUS,Q1-2008,69.9), Record(AUS,MEN,15PLUS,Q1-2009,68.6),
    Record(AUS,MEN,15PLUS,Q1-2010,68.4), Record(AUS,MEN,15PLUS,Q1-2011,69.1),
    Record(AUS,MEN,15PLUS,Q1-2012,68.3), Record(AUS,MEN,15PLUS,Q1-2013,67.7),
    Record(AUS,MEN,15PLUS,Q2-2007,69.7), Record(AUS,MEN,15PLUS,Q2-2008,69.8),
    Record(AUS,MEN,15PLUS,Q2-2009,68.1), Record(AUS,MEN,15PLUS,Q2-2010,68.4),
    Record(AUS,MEN,15PLUS,Q2-2011,68.6), Record(AUS,MEN,15PLUS,Q2-2012,68.2),
    Record(AUS,MEN,15PLUS,Q2-2013,67.5), Record(AUS,MEN,15PLUS,Q3-2007,69.7),
    Record(AUS,MEN,15PLUS,Q3-2008,69.8), Record(AUS,MEN,15PLUS,Q3-2009,68.0),
    Record(AUS,MEN,15PLUS,Q3-2010,68.8), Record(AUS,MEN,15PLUS,Q3-2011,68.4),
    Record(AUS,MEN,15PLUS,Q3-2012,67.8), Record(AUS,MEN,15P...

  • Make the data accessible as query
  • Create a RDD for all the records and convert it into DataFrame

    val allrecordDF = sc.parallelize(allSeriesRecord ).toDF()
    allrecordDF: org.apache.spark.sql.DataFrame = [countryID: string, gender: string, age: string, time: string, obsValue: double]

    Register this DataFrame as a temporary table named “recordtable”

    allrecordDF.registerTempTable("recordtable")

    Optionally cache the table data

    allrecordDF.cache

    res182: allrecordDF.type = [countryID: string, gender: string, age: string, time: string, obsValue: double]

  • Sample query
  • An example to select average of employment rate for all countries for only men

    sql ("select avg(obsValue) from recordtable where gender='MEN'").collect.foreach(r => println("gender: MEN " + " rate= " + r(0)))
    gender: MEN rate= 35.00280561122244

    Data visualization with Spark:

    As shown above, once data is loaded in Spark memory, the XML data is transformed to the normalized records as defined in a case class structure. Apache Zeppelin is used for this project to visualize data. This allows data scientists to perform data exploration on a cloud-based platform. As part of an open source effort, Zeppelin is also supported by the IBM Open Platform having many language interpreters such as Scala, SparkSQL, etc. In the example shown below, a query is issued in Spark SQL to find out the different employments rate of each country by gender. The result is presented in graphical charts.

     

    2

    3

    Conclusion:

    DB2 XML analytics is extended by incorporating spark technology and pipelining the raw data from a DB2 XML table into a SparkRDD. This is saving CPU costs compared to direct query execution against XML documents within from DB2. The only CPU cost on the mainframe side is associated with the data pipelining when reading the XML table. Such pipelining process is required once to query the entire XML table. After the XML documents are converted to the normalized data form, data aggregation and data analytics can be performed. Since the data parsing and transformation is done on the Spark side, memory consumption and storage issues are less of a concern. The use of Spark SQL with cached DataFrames allows repeated querying to increase the efficiency of the process. With the incorporation of the Spark framework, a whole new machine learning capability is extended to DB2 for mass XML documents analytics in addition to the existing extraordinary built-in XML capability of DB2 for processing single documents.

    Special thanks to Oliver Draese for guidance and direction which made this project possible.

    Contact Info

    Email: fgwang@us.ibm.com

    Join The Discussion

    Your email address will not be published. Required fields are marked *