Hands-on Exercises

Spark SQL is the newest component of Spark and provides a SQL like interface. Spark SQL is tightly integrated with the the various spark programming languages so we will start by launching the Spark shell from the root directory of the provided USB drive:

usb/$ spark/bin/spark-shell
usb/$ spark/bin/pyspark

Once you have launched the Spark shell, the next step is to create a SQLContext. A SQLConext wraps the SparkContext, which you used in the previous lesson, and adds functions for working with structured data.

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@52955821
scala> sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
sqlCtx.sql("SET spark.sql.parquet.binaryAsString=true")

Now we can load a set of data in that is stored in the Parquet format. Parquet is a self-describing columnar format. Since it is self-describing, Spark SQL will automatically be able to infer all of the column names and their datatypes. The spark.sql.parquet.binaryAsString flag tells Spark SQL to treat binary-encoded data as strings (more doc). For this exercise we have provided a set of data that contains all of the pages on wikipedia that contain the word “berkeley”. You can load this data using the parquetFile method provided by the SQLContext.

scala> val wikiData = sqlContext.parquetFile("data/wiki_parquet")
wikiData: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
ParquetTableScan [id#0,title#1,modified#2L,text#3,username#4], (ParquetRelation data/wiki_parquet), []
>>> wikiData = sqlCtx.parquetFile("data/wiki_parquet")

The result of loading in a parquet file is a SchemaRDD. A SchemaRDD has all of the functions of a normal RDD. For example, lets figure out how many records are in the data set.

scala> wikiData.count()
res9: Long = 39365
>>> wikiData.count()
39365L

In addition to standard RDD operatrions, SchemaRDDs also have extra information about the names and types of the columns in the dataset. This extra schema information makes it possible to run SQL queries against the data after you have registered it as a table. Below is an example of counting the number of records using a SQL query.

scala> wikiData.registerTempTable("wikiData")
scala> val countResult = sqlContext.sql("SELECT COUNT(*) FROM wikiData").collect()
countResult: Array[org.apache.spark.sql.Row] = Array([39365])
>>> wikiData.registerTempTable("wikiData")
>>> result = sqlCtx.sql("SELECT COUNT(*) AS pageCount FROM wikiData").collect()

The result of SQL queries is always a collection of Row objects. From a row object you can access the individual columns of the result.

scala> val sqlCount = countResult.head.getLong(0)
sqlCount: Long = 39365
>>> result[0].pageCount
39365

SQL can be a powerfull tool from performing complex aggregations. For example, the following query returns the top 10 usersnames by the number of pages they created.

scala> sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10").collect().foreach(println)
[Waacstats,2003]
[Cydebot,949]
[BattyBot,939]
[Yobot,890]
[Addbot,853]
[Monkbot,668]
[ChrisGualtieri,438]
[RjwilmsiBot,387]
[OccultZone,377]
[ClueBot NG,353]
>>> sqlCtx.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10").collect()
[{u'username': u'Waacstats', u'cnt': 2003}, {u'username': u'Cydebot', u'cnt': 949}, {u'username': u'BattyBot', u'cnt': 939}, {u'username': u'Yobot', u'cnt': 890}, {u'username': u'Addbot', u'cnt': 853}, {u'username': u'Monkbot', u'cnt': 668}, {u'username': u'ChrisGualtieri', u'cnt': 438}, {u'username': u'RjwilmsiBot', u'cnt': 387}, {u'username': u'OccultZone', u'cnt': 377}, {u'username': u'ClueBot NG', u'cnt': 353}]

NOTE: java.lang.OutOfMemoryError : If you see a java.lang.OutOfMemoryError, you will need to restart the Spark shell with the following command line option:

usb/$ spark/bin/spark-shell --driver-memory 1G
usb/$ spark/bin/pyspark --driver-memory 1G

This increases the amount of memory allocated for the Spark driver. Since we are running Spark in local mode, all operations are performed by the driver, so the driver memory is all the memory Spark has to work with.

Hands-on Exercises