Digital Developer Conference: Hybrid Cloud 2021. On Sep 21, gain free hybrid cloud skills from experts and partners. Register now

Migrating legacy applications to Apache Spark

Many enterprises are transforming core systems with Apache Spark to gain agility, performance, and access to a broader ecosystem. In this article, I share some of my team’s experiences in migrating a legacy application using Apache Spark as we’ve worked with enterprise clients. Apache Spark is a distributed computing framework suitable for big data analytic applications. Apache Spark was used beyond the typical analytics use case and co-located with transactional systems. We transformed the core processing by using parallelism, in-memory techniques, and integration with systems of record. We used various performance design and tuning techniques to achieve a 4X performance improvement from start to finish.

When we began, there was some work already in place to use SQL queries to execute the business logic. The SQL queries went against a proprietary transactional system. For example, without going into the details of the SQL queries, the queries would compute some aggregated data based on business logic, and then create intermediate tables to store that result. There were a few (4) intermediate tables, and there were join operations between the intermediate tables and a final target table with results for one of the phases. I’ll call it phase 1. In phase 1, there was similar logic for x number of steps where in each step, SQL queries were run, many intermediate tables were created, and a final target table held the results for that particular step. At the end of phase 1, all of the results from all of the steps were combined, then used in the next phase of processing. There were several phases involved as well. In this article, I only discuss the Spark-specific calls that were used to support this migration.

The process

The first step identified what parts of the processing could be done in Spark. There was some proprietary business logic that was already built into the existing systems downstream. The client wanted to keep that information in those systems, and they wanted to move the processing logic for the phases into Spark to achieve their service level agreement (SLA) targets.

Step 1: Connecting to the transaction system and fetching data into Spark

The data had to be fetched from the external systems of record into Spark. There is a performant driver from Rocket that we used to connect to the transactional system and fetch data through Spark. We used the MDS JDBC driver through the Spark data source API to connect and fetch data.

spark.read.format("jdbc").options(Map(
  "driver" -> Driver,
  "url" -> url,
  "dbtable" -> sql))
  .load())

Instead of using the numPartitions in the JDBC data source options, we split the load across n tasks by specifying the URL for each of the n partitions. The Rocket driver has a specific way to specify the partition that you want to read from the underlying system. So, we used that and specified all of the properties it expected in the URL to connect.

Parallelizing the load for performance

Spark can run multiple tasks in parallel to achieve better performance and make use of cluster resources. By default, parallelism is based on the input partitions. If the underlying data can be split, the partitions used depends on the number of splits. There are also APIs that take in an explicit number of partitions to use. We wanted to read the data in parallel to use Spark’s parallelism. In our scenario, the performant external driver could read data in parallel in a certain fashion. So the way we architected this was to read chunks of data from the input data source, and then combine them using the union API available in Spark.

We had N JDBC calls to fetch the data from the transactional system. We combined them using the Spark union call using the following code.

// Split the load across n Spark tasks => n JDBC connections in parallel
val dfs = List.tabulate(cnt) (n =>
  spark.read.format("jdbc").options(Map(
    "driver" -> Driver,
    "url" -> (url+mrcn+"="+(n+1)),
    "dbtable" -> sql))
    .load())
// Combine the results into a single DataFrame
df = dfs.reduce(_ union _)

The data is fetched as part of the load, and this data was used for the entire phase 1. As performance was critical for this application, we ran performance tests as we developed new code. We ran some tests to measure the load time, but it was not where we wanted it to be. Therefore, we increased the N to use of all the resources available by increasing the number of cores and executors for the Spark application. This was achieved by increasing the parallel number of tasks for the load and also increasing the cores. For example:

  • 16 executors
  • 5 cores/executor
  • 4 GB JVM heap/executor
  • Xgcthreads20 for executors
  • 10 GB JVM heap for Spark driver
# 16 executors by  5 cores w/4GB per executor
executor_cores="--executor-cores 5"
executor_total="--total-executor-cores 80"
executor_memory="--executor-memory 4G"
executor_jvmopt="--conf spark.executor.extraJavaOptions=-Xgcthreads20"
driver_memory="--conf spark.driver.memory=10G"
executor_options="$executor_total $executor_cores $executor_memory $driver_memory $executor_jvmopt"

./bin/spark-submit –class xyz –jars $jars $executor_options –master spark://1.11.11.158:xxxx /apps/myapp.jar

The business logic could be expressed in SQL and was already available in some SQL queries. Because Spark has Spark SQL that supports running SQL queries, we used Spark SQL to create in-memory views for the pipeline. We used the SQL Query on the SparkSession to create a DataFrame. After we had the DataFrame, we created an in-memory view using the createOrReplaceTempView API on the DataSet API.

Note: The DataFrame is only a DataSet of org.apache.spark.sql.Row. The DataSet is created using the SQL query on the SparkSession API sql method. The queries used joins, aggregates, subqueries, and other operations that built on the intermediate temporary views that were created. SQL queries had to be slightly modified to make them work in Spark. Spark SQL has some limited capability in terms of subqueries, but we were able to adapt the business logic using Spark SQL.

df.createOrReplaceTempView(table)

So, in phase 1, for the four different intermediate states, we created in-memory temporary views using the previous Spark call. The SQL queries were changed to refer to these temporary views and to query against them.

Reduce DataSet/predicate pushdown

We noticed that we were reading a lot of data, but it was not all relevant to the processing. There were three areas to reduce the data set size.

  1. We optimized the pipeline by adding the required predicates (that is, filters) on the query that is sent to the external datasource. This allowed us to decrease the amount of data that is moved from the external system to Spark. This helped improve performance significantly because now Spark was doing less work for our processing. I have seen this issue multiple times in different customer cases. This is critical to most Spark applications, identifying what your pipeline is doing and to push any filters closer to the source and higher up in your pipeline.

  2. The same principle to reduce the data set size can be used to do column pruning as well. We optimized the queries to only fetch the columns that were needed instead of a select * query that fetches all of the columns. The tables were very wide, that is, there were a lot of columns in them. So, we looked at all of the queries that were used in each of the phases and added projections of columns into the query instead of selecting all of the columns. We eliminated fetching of any column that was not required downstream.

  3. We optimized the queries that were used to create the intermediate tables in each phase and step to add filter predicates to not fetch any data that was not needed. This was done by adding filter predicates.

Caching

Spark has support for caching results in memory, which can create significant improvements to your performance. There are different levels of caching that are available. The following APIs can be used on the DataFrame: the cache() call uses the default MEMORY_AND_DISK storage level, and the persist() call can be used to specify the storage level that your application requires.

def persist(newLevel: StorageLevel)

There is also another API that you can use on the SparkSession through the catalog API.

spark.catalog.cacheTable(tableOrViewName)

tableOrViewName stands for the table or the view that you want to cache, and spark is the SparkSession instance in your application.

The MEMORY_AND_DISK storage level caches the results in memory and also spills to a disk if there is not enough memory. There is a MEMORY_ONLY option that you can use to cache the results in memory and not write to disk. For our application, we had enough memory available, and we used the persist API so that we could specify the StorageLevel that we needed. We cached our temporary views that were used multiple times further down the pipeline in join operations to achieve better performance.

It is a good practice to call unpersist() to clean up the resources when you are finished, especially when you are on a shared cluster. Spark uses the LRU cache eviction policy.

We used caching in a number of places in the application. The first place we used it was in the load. After we fetched the data into Spark and performed a union of all of the data for the given table, we cached it. This is because the N steps used the same data from load and it is better to not have to perform the fetch every time. This improved performance greatly.

df = df.cache()

We also used the persist call for the intermediate DataFrames. We used the StorageLevel.MEMORY_ONLY for them because we only needed these intermediate DataFrames during the time to compute the target result for that particular step. After the step processing is complete, we no longer needed these intermediate DataFrames to be computed again. Another reason was that there was sufficient memory available on the cluster so we could use the MEMORY_ONLY option, and we did not want to incur the cost to write out to disk.

p1DF.persist(StorageLevel.MEMORY_ONLY)

Optimize the queries

Typically, joins are expensive. The general rule is to optimize your join operations as much as you can. There were several join and aggregates grouped by operations in the SQL queries. Most of these required a shuffle operation in Spark. A shuffle operation is where you must move data from across the nodes, and this is an expensive operation because there is network and disk I/O involved. It is critical to watch out for shuffle, to reduce the amount of data that needs to be shuffled, and to optimize your queries to avoid shuffle if possible. One key parameter to tune is the spark.sql.shuffle.partitions. The default is 200, but this must be tuned for your application. We tuned this to help improve our performance.

‘Order by’ operations are expensive as they involve a shuffle operation in Spark. We had some queries that had ‘order by’, but that were not being used later. We optimized those queries to remove the ‘order by’ operations when it was not adding any value and not needed for the business logic.

Task imbalance/data skew

We tuned the number of cores and memory on the executors so that we were making the best use of the available hardware resources. However, we were not able to drive the CPU usage to an optimal amount. We noticed a saw-tooth behavior in the CPU usage. By looking at the Spark UI, we noticed that the task sizes were imbalanced. Out of all the input partitions, only a few tasks had reasonable size, the rest were almost empty. This gave us an indication that there was some imbalance, and we narrowed it down to the parallel threads reading the data from the input data source. The input data at the source systems is partitioned a certain way and cannot be changed. That said, when we used the query to fetch results to Spark, we were not able to control the partition balance. Spark has a repartition call that can be used to partition your data evenly. We used the repartition(n) call to equally size the input partitions in Spark. This reduced our elapsed time by half, and we were able to use the CPU efficiently. The task imbalance is a version of a data skew problem, and we were able to fix it using the repartition call. Note that repartition involves a shuffle, so it is an expensive operation. In our scenario, we incurred a cost for the repartition during the load time of the data, but the significant improvements during the pipeline processing was well worth it.

Further, we cached the results after the repartition call to boost performance. This data gets used multiple times in the pipeline processing, so we did not want the load, union, and repartition to be executed each time. This was a perfect place to add caching to improve performance.

// Eliminate data skew
df = df.repartition(n)
// cache the load as it is used multiple times in the pipeline
df = df.cache()
// To prime the cache as part of the load, call Spark action
val dfcount = df.count()
// Create the in memory temporary view to be used further in processing
df.createOrReplaceTempView(table)

Output results

The Parquet file format is a recommended file format for Spark. It is columnar, it supports compression, and it is highly performant. We used Parquet with compression to write out intermediate results in our pipeline that were used in the next phase of the processing.

df.write.mode("overwrite").parquet(savePath)

Update operation

The application involved an update of a certain row based on a key. This was challenging. Spark’s DataFrames are immutable. When you apply a transformation, Spark processes it on the input data set and creates a new data set. It does not change the input data set. Initially, the requirement was to do one update for one set of data. We could actually do this in Spark SQL APIs, but it is a bit convoluted to achieve it. One way we achieved it using the Spark SQL API is shown below.

The requirement was to update a certain row that had a max value for a certain column. It was possible to have multiple rows that had the max value, and in that case, only one of the rows needed to be updated to a certain amount.

The pseudocode for one of the scenarios that had multiple matched rows is as follows:

  1. Find the row that you want to update by filtering out using the matching condition for the row you want to update.
  2. If there is more than one matched row that satisfied the update criteria, we added a unique key column to uniquely identify the row that needed to be updated.

    a. Pick only one of the rows that you need to update.
    b. Create a data set with the rest of the matched rows.
    c. Update the row you picked in step (a) and create a new data set.
    d. Union back the data set from b and c.

  3. Create a data set with rows that don’t match the update criteria.

  4. Union back the data set from 3 and output from step 2(d).

It was possible that there could be multiple rows that matched the update condition. So, to uniquely identify a row, we needed to add a column using the monotonically_increasing_id function to keep an ID for each row.

The following sample Spark code using Spark SQL calls achieved the update that was needed.

val allMaxRows = p4DF.filter(s"$maxValue == colAMT")
val p4DFWithoutMax = p4DF.except(allMaxRows)
val allMaxRowsWithId = allMaxRows.withColumn("myid", monotonically_increasing_id())
// Get just one row
val fixMaxRow = allMaxRowsWithId.limit(1)
 // Get the other rows apart from the row we will update
val allOtherMaxRows = allMaxRowsWithId.except(fixMaxRow).selectExpr(
  "col1",
  "col2",
  "col3",
  "colAMT")

// Update the row
val fixRow = fixMaxRow.selectExpr(
  "col1",
  "col2",
  "col3",
s"( colAMT + $diffAmt) as colAMT")


// Union them together
val result = p4DFWithoutMax.union(allOtherMaxRows.union(fixRow))

The performance of this was reasonable given that there were only a few updates. Later, we had some requirements change, and we had to perform a lot of updates. In that scenario, processing the entire data set to do one update breaks down in terms of performance. It is no longer a feasible operation. We could not afford to use the Spark SQL API to do our custom update because it would involve multiple passes on the data. We needed a way to do the updates with one single pass of the data set.

Spark’s RDD API came to the rescue.

We optimized the updates using the RDD API so that we could do all of the updates in one pass of the data. The RDD API gave us a lot of flexibility because we could use group by and then pass in a function that could transform the data for us. The rest of the processing was using DataFrame, so when it was time to do the update, we went to the RDD API by calling the rdd method on the DataFrame. We also kept track of the schema for the DataFrame by using the schema method on the DataFrame.

DataFrame.rdd gave us the RDD for that DataFrame/DataSet.

After we had a handle to the RDD, we could use the RDD operations. We used the groupBy to do the required group operations and then passed the custom logic to mapValues to do the updates as part of that logic and return the results. After we had the RDD after applying the transformations, we could use the createDataFrame to get a DataFrame back. Therefore, we were able to go from DataFrame to RDD and back to DataFrame quite easily.

The following code shows sample Spark code that computes the custom business logic and performs an update using the mapValues.

val p4rdd = PhaseDF.rdd
val schema = PhaseDF.schema
val resultRDD = p4rdd.groupBy(row =>
     (row.getString(0),
       row.getString(1),
       row.getString(3),
       row.getString(4),
       row.getString(5),
       row.getString(6),
       row.getString(7),
       row.getString(8),
       row.getString(9),
       row.getString(10),
       row.getString(11),
       row.getString(13),
       row.getString(14),
       row.getString(15),
       row.getString(16),
       row.getString(17),
       row.getString(18),
       row.getString(19),
       row.getString(21),
       row.getString(22),
       row.getString(23),
       row.getString(24)
       )).mapValues(it => {
     val rows = it.toSeq
     val len = rows.size
     val idx2 = 28
     val idx1 = 29
     // update the last row
      // some custom business logic
      val amt = .. //some custom logic
      val lastRow = rows(len-1)
      // Update the last row
       val cols = lastRow.toSeq
       val n = cols.length
       val values = new Array[Any](n)
       var i = 0
       while ( i < n) {
         values.update(i, lastRow(i))
         i = i+1
       }
       values.update(idx2,amt)
       val newRow = Row.fromSeq(values.toSeq)
       // Return all the rows from this function
       rows.init :+ newRow
     })
   // Retrieve all the rows back
   val result = resultRDD.flatMap(_._2)
   val output = spark.createDataFrame(result,schema)
   output.createOrReplaceTempView(targetTable)

One thing I wanted to mention is that the key function passed to the groupBy runs into a Scala limitation of 22 fields in a tuple limit. In the previous code snippet, the tuple is approximately 22 fields, but we also had to add a couple of other update functions that exceeded the 22 fields. To solve that, we combined some of the fields to reduce the number of fields in the tuple. The following code shows an example of this.

// get around the scala tuple 22 field limit
val resultRDD = cachedRDD.groupBy(row =>
  (row.getString(1) +","+ row.getString(3)+"," + row.getString(4) +"," +row.getString(5),
    row.getString(6),
    row.getString(7),
    row.getString(8),
    row.getString(9),
    row.getString(10),
    row.getString(11),
    row.getString(12),
    row.getString(13),
    row.getString(14),
    row.getString(15),
    row.getString(16),
    row.getString(17),
    row.getString(18),
    row.getString(19),
    row.getString(20),
    row.getString(21),
    row.getString(22),
    row.getString(24),
    row.getString(28)))

This was great! We achieved the performance that was needed by making the best use of the available APIs. We used the DataFrame and SQL because they were a good fit, and when we needed to do the update for each group we used the RDD to implement our custom logic for the update. Note that the DataFrame and Spark SQL all use the same execution path, so we could switch between them. RDD is more of a low-level API and does not go through the catalyst optimizer, but gives a lot of flexibility. We made the best use of both worlds: DataFrame/DataSet and RDD to achieve optimal performance.

Tuning

We also tuned the GC in terms of threads. We ensured that we were using all of the available hardware resources by tuning the executors, the number of cores, and memory. We had to increase the memory (spark.driver.memory) for the driver as well.

--conf spark.driver.memory=10G
--conf spark.executor.extraJavaOptions=-Xgcthreads20

Performance analysis/debugging

We used the Explain API on DataFrame to check the execution plan for the query. We also used the Spark UI to investigate the task sizes and the caching information. Our cluster had the Spark History Server set up so that we could analyze the different runs. We also collected the CPU and memory usage, which were useful to pinpoint issues.

Transformation and actions

Spark has two kinds of operations: transformations and actions. Spark has lazy loading for performance reasons. When you have a transformation, Spark does not execute that right away but only when Spark sees an action that will trigger the execution. When we were developing our code, we added count actions to double check and verify results. However, when we finalized the code we removed the count action calls that were not necessary so we didn’t incur unnecessary costs of executing them.

Summary

We were able to successfully migrate our application to Apache Spark and meet our SLA goals for performance. It is critical to have some understanding of how Spark works and all the features that are available to you as a Spark developer and architect. It is great to easily switch between the different APIs available in Spark, from the high-level SQL API to the RDD API. This article should have given you an overview of some of the areas that are critical to achieving good performance using Spark.