Blog Post

Best practices using Spark SQL streaming, Part 1

Get best practices in using Apache Spark Streaming


LikeSave



Archived content

Archive date: 2024-05-31

This content is no longer being updated or maintained. The content is provided “as is.” Given the rapid evolution of technology, some content, steps, or illustrations may have changed.

Apache Spark Structured Streaming (a.k.a the latest form of Spark streaming or Spark SQL streaming) is seeing increased adoption, and it's important to know some best practices and how things can be done idiomatically. This blog is the first in a series that is based on interactions with developers from different projects across IBM. This blog discusses:

  • Problems with processing multiple streaming sources with the same schema
  • What happens when each of the streams has a different schema -- and there are hundreds of them

Often, there is a request to add an Apache Spark SQL Streaming connector for a message queue or a streaming source. Sometimes the need for the connector is unavoidable, for example, if there is a cloud provider that is offering their own implementation of a feature-rich, scalable, distributed message queue that comes with support. In this situation, the customer gets numerous benefits by using the provider's message queue rather than having another software stack with its own operation overhead. Two cloud provider examples, in this case, are Amazon Kinesis and IBM Message Hub.

Message Hub has an excellent integration with Apache Spark, and Kinesis support is also receiving a lot of attention from developers. Apart from special cases like this, in general, it is not always required to write a connector for a message queue to be able to use it. Each message queue comes with a feature set that is different from others and might not match others in throughput characteristics or reliability. Their use for a particular application is well justified but might not have all of the features that Spark needs to offer its full capabilities. For example, some message queues might not allow an API to read from a range of offsets or might not have a concept of distributed/parallel readers. In such cases, it is a best practice to route the messages to Spark through an already well-integrated and supported message queue like Apache Kafka. By trying to directly implement a connector for a message queue, you can lose the reliability and performance guarantees that Apache Spark offers, or the connector might turn out to be pretty complex to develop and maintain.

By routing the messages through another well-integrated queue, an application does not need a new connector and the overhead of implementing one. In fact, most message queues can be easily routed through a Kafka message queue. One example is MQTT, which has good characteristics for lightweight messaging in IoT devices but does not have distributed support or persistence -- at least, at the specification level. Trying to make MQTT a complex, reliable, distributed message queue is much more effort than letting it do what it does best -- routing the low traffic messages coming from all of the individual MQTT queues through a scalable, distributed message queue like Kafka, for processing with Apache Spark SQL Streaming. There are many other message queues such as ActiveMQ and RabbitMQ that provide AMQP compliance but might not have the same kind of integration that Kafka has.

Q. What's the overhead of an additional layer? Or does routing through another queue add to the overall processing latency?

It does add to the latency, and the amount of latency depends on the setup. Ideally, it is very insignificant. But it can matter if you are tuning to the order of milliseconds because adding an additional message queue does add "maintenance or cost overhead." There is also a wide availability of Kafka as a service to make it convenient for cloud deployments. Therefore, the benefits must be carefully examined. Ask yourself, does adding Spark SQL streaming really solve the problem at hand and is writing a connector (maybe with limited capability) and maintaining it over time is more suited to your needs.

This blog gives you some real-world examples of routing via a message queue (using Kafka as an example).

Example 1: Classic word count using Spark SQL Streaming for messages coming from a single MQTT queue and routing through Kafka

What happens when there are multiple sources that must be applied with the same processing. One solution is having a different reader for each source and then performing a join. But this can get complex as the number of sources increases or if a source needs continuous streaming support. To keep things simple, the following example illustrates using a single MQTT streaming with Spark SQL Streaming and Kafka. The same example can be easily extended to support many streams, with each being directed using a single Kafka queue.

Create the Spark session

  val spark = SparkSession
      .builder.master("local[4]")
      .appName("SparkKafkaStreamingJob")
      .getOrCreate()
  1. Set up MQTT to a Kafka service by registering a callback with MQTT.

     import spark.implicits._
     val mqttTopic = "test"
     val mqttBrokerUrl = "tcp://localhost:1883"
     val kafkaTopic = "test"
     val kafkaServer = "localhost:9092"
     // Publish every message received from MQTT to Kafka.
    
     MqttToKafkaService.mqttToKafka(mqttTopic, mqttBrokerUrl, kafkaTopic, kafkaServer, Array[Byte]())
     // For more streaming sources, one can go an adding a sourceToKafka service.
    
  2. Calculate the word counts on a streaming data frame connected to a Kafka queue with the topic test.

     // Create Data set representing the stream of input lines from Kafka
     val lines = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", kafkaServer)
       .option("subscribe", kafkaTopic)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
    
     // Calculate word counts
     val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
    
     // Start running the query that prints the running counts to the console
     val query = wordCounts.writeStream
       .outputMode("complete")
       .format("console")
       .option("checkpointLocation", "/tmp/temporary-" + UUID.randomUUID.toString)
       .start()
    
     query.awaitTermination()
    

Test the application

To test the application:

  1. Download and extract or clone the repository from the GitHub link. Follow the instructions on how to build using sbt. (You can get and install sbt by following the instructions here.) If you prefer an Apache Maven build system, you can also build the project using Maven because the project comes with a pom.xml build file. For brevity, this blog only mentions usage instructions for sbt users.

    You need three open command-line terminals. (Assuming that a Kafka service is already available and that it is up and running locally with the topic "test." See the Quickstart guide for more information.)

    • On terminal one, run the MQTT server.

      cd streaming-integration-pattern
      sbt "run-main org.codait.streaming.mqtt.MQTTServer"
      
    • On terminal two, run the Spark job.

      sbt "run-main org.codait.streaming.SparkJobSimple <kafka broker address> <MQTT broker address>"
      

      Substitute the default values for Kafka and the MQTT broker address.

      sbt "run-main org.codait.streaming.SparkJobSimple localhost:9092 tcp://localhost:1883"
      

      Note that the recommended method for submitting jobs to Spark is through a spark-submit script. For brevity and convenience, we are using sbt to run locally.

    • On terminal three, publish to MQTT.

      sbt "run-main org.codait.streaming.mqtt.MQTTPublisher"
      
  2. On the console of terminal two, observe the word counts that appear.

      -------------------------------------------
      Batch: 1
      -------------------------------------------
      +-----+-----+
      |value|count|
      +-----+-----+
      |World|    1|
      |Hello|    1|
      +-----+-----+
    

    See SparkJobSimple.scala for a complete example.

Example 2: Processing multiple streams using Spark SQL Streaming

How do you handle the situation of two streams with different schemas producing at different rates and even disparate sources? You might be tempted to say by having two different Spark SQL streaming jobs for each stream. But, how then would you handle situations where there are hundreds and even thousands of these "ultra low throughput streams" that put together results in a significant throughput. By having a separate job for each stream, you would be wasting resources in scheduling overhead or jobs holding up resources even when they're not in use.

The example SparkJobMultiStreams.scala illustrates the point with just two streams and therefore, can become a starting point for implementing multiple streams.

In this example, you first start two parallel tasks, each of which forwards the messages received at the MQTT receiver to the Kafka queue. These messages are distinguished from each other through a prepended bit. Each message stream is separated and processed differently based on their schemas. One stream counts the unique entries based on the year for car records, and the other stream computes the max, min, and the average age for the customer records. The output is generated on the console. This can be easily configured to be directed to another database or queue.

  1. Launch the parallel tasks to read the input from the MQTT sources and relay each message to the respective Kafka message queue topics.

    After the Spark session is created as shown in the previous example and is available as spark, you can proceed as follows:

     // Begin accumulating messages on two different partitions of spark. As two long running
     // tasks. These tasks simply relay the messages arriving at mqtt streams to kafka topics.
     // The two tasks are run in parallel, based on availability of cores.
     spark.sparkContext.parallelize(Seq(0, 1), 2).mapPartitionsWithIndex { (x, y) =>
       if( (x + 2) % 2 == 0)
         MqttToKafkaService.mqttToKafka(mqttTopic, mqttBrokerUrl, kafkaTopic, kafkaServer,
           "0".getBytes())
       else
         MqttToKafkaService.mqttToKafka(mqttTopic2, mqttBrokerUrl, kafkaTopic2, kafkaServer,
           "1".getBytes())
         Seq(true).toIterator
     }.collectAsync()
    

    Here you have used an old RDD API to run two tasks in parallel inside Spark executors on two different partitions. collectAsync ensures that you do not get blocked there so that these long-running tasks can finish. An important point to note is that each message here is prepended with bits "0" and "1" to distinguish the origin of these messages. There are better ways to do it (using topic), but this is the most simple and general.

  2. Next, create the data set reading from the Kafka topic. In this example, Kafka does not interpret the messages and deals with them as bytes. To create a string from those bytes, use .selectExpr("CAST(value AS STRING)"). This step is very different in cases where the format of the incoming messages is something other than pure strings converted to bytes. How to deal with various deserializing techniques is out of the scope of this blog.

    Note that after the messages have entered the Kafka queue, you do not need to deal with connecting to the MQTT queues.

     // Create DataSet representing all the incoming messages from kafka from different topics.
     val mainInputStream = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", kafkaServer)
       .option("subscribe", s"$kafkaTopic,$kafkaTopic2")
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
    
  3. After you have the streaming dataframe called mainInputStream, you can separate the two incoming streams for different processing schemes by checking the prepend bit set in Step 1.

    On the two different streams, parse the csv strings and apply the following schemas according to their topic of origin.

    Schema for topic 1, customerRecords:

    | Field names   | Field types   |
    | ------------- | ------------- |
    | Id            | String        |
    | Name          | String        |
    | Age           | Number        |
    | Gender        | String        |
**Schema for topic 2, carRecords:**
    | Field names   | Field types   |
    | ------------- | ------------- |
    | Id            | String        |
    | Make          | String        |
    | Model         | String        |
    | Year          | Number        |
```
// First stream has schema for customer records.
val topic1Messages = mainInputStream.filter(x => x.startsWith("0")).flatMap {
  x => val array = x.replaceFirst("0", "").split(",")
    if (array.length == 4) {
      Some(array)
    } else {
      None
    }
}.map(x => (x(0), x(1), Integer.parseInt(x(2).trim), x(3)))
  .as[(String, String, Int, String)]
  .toDF("Id", "Name", "Age", "Gender")
  .groupBy($"Gender").agg(min("age"), max("age"), avg("age"), count("Name"))

// Second stream has schema for car records.
val topic2Messages: DataFrame = mainInputStream.filter(x => x.startsWith("1")).flatMap {
  x => val array = x.replaceFirst("1", "").split(",")
    if (array.length == 4) {
      Some(array)
    } else {
      None
    }
}.map(x => (x(0), x(1), x(2), Integer.parseInt(x(3))))
  .as[(String, String, String, Int)]
  .toDF("Id", "Make", "Model", "Year")
  .groupBy($"Make", $"Model", $"Year").agg($"Make", $"Model", count("Year"))

```
  1. Finally, these streams can be started as follows.

     // Start running the query for topic1 that prints the running counts to the console
     val query = startConsoleStream(topic1Messages)
    
     // Start running the query for topic2 that prints the running counts to the console
     val query2 = startConsoleStream(topic2Messages)
    

    In this example, they are configured to output on the console. You can easily change this to send the output to another message queue or even a database.

Testing the application

To test this application:

  1. Download or clone the repository from the GitHub link.

    You need three open command-line terminals. (Assuming that a Kafka service is already available and that it is up and running locally with the topic set to autocreate.)

    • On terminal one, run the MQTT server.

      cd streaming-integration-pattern
      sbt "run-main org.codait.streaming.mqtt.MQTTServer"
      
    • On terminal two, run the Spark job.

      sbt "run-main org.codait.streaming.SparkJobMultiStreams localhost:9092 tcp://localhost:1883"
      

      This lets the Spark job connect to the Kafka broker at localhost:9092 and the MQTT receiver at tcp://localhost:1883. If required, replace these values based on your setup.

      Note that the recommended method for submitting jobs to Spark is through a spark-submit script. For brevity and convenience, we are using sbt to run locally.

    • On terminal three, publish to MQTT.

      sbt "run-main org.codait.streaming.mqtt.MQTTPublisher customerRecords 1,PS,29,M localhost:1883 1"
      

      This command pushes one record to the MQTT queue with the topic customerRecord, the name PS, and the age 29.

      sbt "run-main org.codait.streaming.mqtt.MQTTPublisher carRecords 2,Ford,Escort,2000 localhost:1883 1"
      

      This command pushes one record to the MQTT queue with the topic carRecord, the name Ford Escort, and the year 2000.

  2. On the Spark console, observe that the output is different for the two separate analyses.

           -------------------------------------------
           Batch: 3
           -------------------------------------------
    
           +------+--------+--------+--------+-----------+
           |Gender|min(age)|max(age)|avg(age)|count(Name)|
           +------+--------+--------+--------+-----------+
           |M     |29      |29      |29.0    |1          |
           +------+--------+--------+--------+-----------+
    
           -------------------------------------------
           Batch: 3
           -------------------------------------------
    
           +----+------+----+----+------+-----------+
           |Make|Model |Year|Make|Model |count(Year)|
           +----+------+----+----+------+-----------+
           |Ford|Escort|2000|Ford|Escort|2          |
           +----+------+----+----+------+-----------+
    

See SparkJobMultiStreams.scala for the full example.

The previous examples can form the basis for applying Spark streaming to various streaming setups, which are complicated by a large number of different sources. This way, many less throughput jobs can be multiplexed into a single Spark streaming job. Coming next in this series is a discussion about using fair scheduling, serialization, how to deal with erroneous records, databases as sinks, and more.