One of Apache Spark’s key advantages is its ability and flexibility in working with all kinds of different data sources and formats from unstructured data such as text or CSV to well-structured data such as relational database. Apache Solr on the other hand is a fast enterprise search platform built on top of Apache Lucene. Both Apache Spark and Apache Solr are key parts of IBM Open Platform (IOP).

One of the biggest challenges and therefore also one of Spark’s biggest use case is building reliable and scalable ETL pipeline with myriads of different data sources and formats. In this blog, we will discuss how to use the open source LucidWorks Spark-Solr Connector to read Solr data into a Spark Dataframe that can then be queried, transformed and processed using either Spark SQL or the Dataframe API.

Pre-requisite:
– IBM Open Platform 4.3 (currently in technical preview) installed and all components are running in healthy state.
– Download the open source LucidWorks Spark-Solr connector v3.0.0 beta jar file from Github to your host where you will run your Spark Shell.

wget http://repo1.maven.org/maven2/com/lucidworks/spark/spark-solr/3.0.0-beta/spark-solr-3.0.0-beta-shaded.jar

Create a Solr collection:
Apache Solr comes with a sample movie dataset and we will use it to populate a sample collection. A little bit about the dataset:

This data consists of the following fields:
* “id” – unique identifier for the movie
* “name” – Name of the movie
* “directed_by” – The person(s) who directed the making of the film
* “initial_release_date” – The earliest official initial film screening date in any country
* “genre” – The genre(s) that the movie belongs to

To create a Solr collection named “movies” with 1 shard using the data_driven_schema_configs config set and have a replication factor of 1, navigate to /usr/iop/current/solr-server directory and run the following command as user “solr” (default user for Solr):

[solr@yourhost solr-server]$ bin/solr create -c movies

Connecting to ZooKeeper at yourhostname1.com:2181, yourhostname2.com:2181, yourhostname3.com:2181/solr ...
Uploading /usr/iop/current/solr-server/server/solr/configsets/data_driven_schema_configs/conf for config films to ZooKeeper at yourhostname1.com:2181, yourhostname2.com:2181, yourhostname3.com:2181/solr

Creating new collection 'movies' using command:
http://localhost:8983/solr/admin/collections?action=CREATE&name=movies&numShards=1&replicationFactor=1&maxShardsPerNode=1&collection.configName=movies

{
"responseHeader":{
"status":0,
"QTime":6247},
"success":{"172.16.167.243:8983_solr":{
"responseHeader":{
"status":0,
"QTime":1793},
"core":"movies_shard1_replica1"}}}

One word of caution is that sometimes when Solr tries to infer the schema from the data, it may not always be able to correctly interpret it. In our case, we need to issue the following command to update the schema:

curl http://localhost:8983/solr/films/schema -X POST -H 'Content-type:application/json' --data-binary '{
"add-field" : {
"name":"name",
"type":"text_general",
"multiValued":false,
"stored":true
},
"add-field" : {
"name":"initial_release_date",
"type":"tdate",
"stored":true
}
}'

To index and store the result in collection “movies”, navigate to /usr/iop/current/solr-server directory and issue the post command as user “solr”:

[solr@yourhost solr-server]$ bin/post -c movies example/films/films.json

Launch Spark Shell using the LucidWorks Spark-Solr connector v3.0.0 beta jar:

spark-shell --jars spark-solr-3.0.0-beta-shaded.jar
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Run the following lines of scala code to connect to Solr and load the data from the “movies” collection:

scala> val options = Map("collection" -> "movies", "zkhost" -> "your-solr-host:2181/solr")

scala> val df = spark.read.format("solr").options(options).load

Verify that we have successfully loaded the whole dataset from Solr:

scala> df.count()
res0: Long = 1100

scala> df.show(5)
+--------------+---------------+-------------------+--------------------+----------------+
|   directed_by|          genre|                 id|initial_release_date|            name|
+--------------+---------------+-------------------+--------------------+----------------+
|    John Lafia|  Disaster Film|/en/10_5_apocalypse|2006-03-17 16:00:...|10.5: Apocalypse|
|Robert Moresco|        Mystery|      /en/10th_wolf|2006-08-17 17:00:...| 10th & Wolf    |
|   Gary Winick|Romantic comedy| /en/13_going_on_30|2004-04-13 17:00:...|  13 Going on 30|
| John Herzfeld|       Thriller|     /en/15_minutes|2001-02-28 16:00:...|      15 Minutes|
|    Aparna Sen|       Art film| /en/15_park_avenue|2005-10-26 17:00:...|  15 Park Avenue|
+--------------+---------------+-------------------+--------------------+----------------+
only showing top 5 rows

scala> df.printSchema()
root
 |-- directed_by: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- id: string (nullable = false)
 |-- initial_release_date: timestamp (nullable = true)
 |-- name: string (nullable = true)

From then on, you can use Spark SQL to query the data:

scala> df.createOrReplaceTempView("movies")

scala> spark.sql("SELECT name FROM movies WHERE directed_by = 'Ridley Scott'").show()
+-----------------+
|             name|
+-----------------+
|  Black Hawk Down|
|        Gladiator|
|         Hannibal|
|      A Good Year|
|American Gangster|
+-----------------+

Alternatively, you can also use the Dataframe API to get the same results:

scala> df.filter($"directed_by".like("Ridley Scott")).select("name").show()
+-----------------+
|             name|
+-----------------+
|  Black Hawk Down|
|        Gladiator|
|         Hannibal|
|      A Good Year|
|American Gangster|
+-----------------+

Wrap up

Apache Spark provides a powerful and versatile framework for dealing with vast different data sources and data formats and I hope in this post, I’ve shown you how to ingest/load data from Solr, which is a key open source enterprise grade search engine as Spark Dataframe. From there, this allows you to combine with data from other enterprise systems, from DB2, to Hive, to real-time streaming data, to perform advanced data analytics at scale. The other advantage with using Spark on Solr data is the ability to use any languages supported by Spark. For example, there is no native R interface to Solr, but with Spark, a data scientist can leverage Solr data into an R job seamlessly.

1 comment on"Processing Solr data with Apache Spark SQL in IBM IOP 4.3"

  1. […] 本文翻译自 Processing Solr data with Apache Spark SQL in IBM IOP 4.3(2017-03-21) […]

Join The Discussion

Your email address will not be published. Required fields are marked *