Spark is a component of IBM® Open Platform with Apache Spark and Apache Hadoop. Apache Spark is a fast and general-purpose cluster computing system that allows you to process massive amount of data using your favorite programming languages including Java, Scala and Python. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLLib for machine learning, GraphX for combined data-parallel and graph-parallel computations, and Spark Streaming for streaming data processing.

As with any new system, running example code on small sets of data is easy, but there are gotchas that can crash your workload as you move up data volume and number of concurrent users. Here is a quick list of common problems and solutions we discovered when testing Spark on terabytes of data, with multiple concurrent users, and on large Hadoop clusters with 20 or more nodes. For basic troubleshooting as a beginner Spark user, please refer to this blog: A Beginner’s Guide to Spark Troubleshooting.

“No space left on device”

 stage 89.3 failed 4 times, most recent failure: 
Lost task 38.4 in stage 89.3 (TID 30100, node4.test.com): java.io.IOException: No space left on device
        at java.io.FileOutputStream.writeBytes(Native Method)
        at java.io.FileOutputStream.write(FileOutputStream.java:326)
        at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)

What space is this? Spark actually writes temporary output files from “map” tasks and RDDs to external storage called “scratch space”, and by default, “scratch space” is on local machine’s /tmp directory. /tmp is usually the operating system’s (OS) temporary output directory, accessed by OS users, and /tmp is typically small and on a single disk. So when Spark runs lots of jobs, long jobs, or complex jobs, /tmp can fill up quickly, forcing Spark to throw “No space left on device” exceptions.

Because Spark constantly writes to and reads from its scratch space, disk IO can be heavy and can slow down your workload. The best way to resolve this issue and to boost performance is to give as many disks as possible to handle scratch space disk IO. To achieve both, explicitly define parameter spark.local.dir in spark-defaults.conf configuration file, as follows:

 spark.local.dir /data1/tmp,/data2/tmp,/data3/tmp,/data4/tmp,/data5/tmp,/data6/tmp,/data7/tmp,/data8/tmp

The above comma-delimited setting will spread out Spark scratch space onto 8 disks (make sure each /data* directory is configured on a separate physical data disk), and under the /data*/tmp directories. You can create any sub directory names instead of ‘tmp’.

“serialized results of 381610 tasks (5.0 GB) is bigger than spark.driver.maxResultSize”

stream5/query_05_22_77.sql.out:Error: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 381610 tasks (5.0 GB) is bigger than spark.driver.maxResultSize (5.0 GB) (state=,code=0))

When you call collect() in Spark, you are telling it to compute the result of a transformation on your data set. The result needs to be serialized and to come from all partitions. Large data set will force Spark to run a ton of tasks that generate serialized result much larger than the defined spark.driver.maxResultSize which defaults to 1G.

This type of error tends to occur when running a SQL query in Spark SQL where result set is not limited, i.e., without a limit 100 clause in the query itself. When you run the query on a small data set, it works fine, but as soon as you run the same query on a larger data set, you would likely hit this exception. To increase to a larger limit, add the following line to spark-defaults.conf file:

  spark.driver.maxResultSize     5G

Note that default value is 1G. You can also set it to “unlimited” but such a setting may throw OutOfMemory error on the driver. There are also other ways to avoid this exception but would require code change in your application. For example:

– Avoid using collect() as this attempts to pull data from all workers nodes down to the single driver node
– Instead, use filter() to limit the amount of data coming back to the driver
– Use limit 10; in SQL statement to limit query results
– Use saveAsParquetFile(), saveAsTextFile(), etc to write the data to HDFS for downstream consumption

“org.apache.spark.sql.catalyst.errors.package$TreeNodeException”

16/06/27 18:44:32 ERROR thriftserver.SparkExecuteStatementOperation: Error executing query, currentState RUNNING,
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
...
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
...
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
        at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)

When running complex SQL queries (e.g., some of the TPCDS queries) concurrently (e.g., 4 or more users running queries against the same data set at the same time) on Spark on terabytes of input data, the above exceptions occur often. On surface, the exception looks to be query plan related because the they show up as “sql.catalyst.errors” in the stack, but further down, you will see the internal timeout errors thrown as java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]. Further analysis showed that the physical plan of the query calls for broadcast joins. And the awaitResult has a default timeout value of 300 seconds for the broadcast wait time in broadcast joins. And our concurrent query test exceeded this time.

To resolve this exception, we found the spark.sql.broadcastTimeout parameter that controlled the internal timeout value. We then set it to the longest query time when the query does work to completion with a single user, for example:

   spark.sql.broadcastTimeout     1200

“org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]”

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [800 seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:143)

When heavy workloads run in Spark, you can maximize, at time, disk and network utilization. When those conditions occur, time out exceptions can occur in many operations in Spark: storage, rpc, heart beat wait, shuffle connection wait. The one exception above is related to RPC communication in Spark, and generally timeout value is 120 seconds by default. Spark provides an “umbrella” time out setting that controls all of the following called spark.network.timeout:

spark.core.connection.ack.wait.timeout
spark.akka.timeout
spark.storage.blockManagerSlaveTimeoutMs
spark.shuffle.io.connectionTimeout
spark.rpc.askTimeout
spark.rpc.lookupTimeout

It maybe easier to tune spark.network.timeout so that you don’t have to spend time trying to set each “correctly”. You need to increase the value for your heavy workloads, sometime by a large margin. For SQL queries processing terabytes of data and supporting multiple users concurrently, we tuned it to 800 seconds, as shown below:

spark.network.timeout     800

We are actively writing more blogs for Spark. Please leave comments and Check back soon!

3 comments on"Troubleshooting and Tuning Spark for Heavy Workloads"

  1. This is great, thanks

  2. Great article Thanks. If we increase the parameter “spark.network.timeout” to say 10 minutes. does it mean if a slave went down, other executor will know about it after 10*3 (default number of retry) = 30 minutes ?

    • Thnaks, Satheesh. Yes, the worst case it is possible for wait 3X the timeout. But the purpose of setting this value is that your executors never hit one since all tasks will complete within the network.timeout value.

Join The Discussion

Your email address will not be published. Required fields are marked *