Hands-on Exercises

What is a stochastic learning algorithm?

Stochastic learning algorithms are a broad family of algorithms that process a large dataset by sequential processing of random samples of the dataset. Since their per-iteration computation cost is independent of the overall size of the dataset, stochastic algorithms can be very efficient in the analysis of large-scale data. Examples of stochastic algorithms include:


What is Splash?

Stochastic learning algorithms are generally defined as sequential procedures and as such they can be difficult to parallelize. Splash is a general framework for parallelizing stochastic learning algorithms on multi-node clusters. You can develop a stochastic algorithm using the Splash programming interface without worrying about issues of distributed computing. The parallelization is automatic and it is communication efficient. Splash is built on Scala and Apache Spark, so that you can employ it to process Resilient Distributed Datasets (RDD).

On large-scale datasets, Splash can be substantially faster than existing data analytics packages built on Apache Spark. For example, to fit a 10-class logistic regression model on the mnist8m dataset, stochastic gradient descent (SGD) implemented with Splash is 25x faster than MLlib’s L-BFGS and 75x faster than MLlib’s mini-batch SGD for achieving the same value of the loss function. All algorithms run on a 64-core cluster.


Download the example package

First, download the Splash Example package and extract it at any directory. The source code locates at /src/main/scala/. The Splash library file is at /lib/, which puts Splash in your project classpath.

For your convenience, we have provided a pre-built JAR named splashexample.jar in the root of the splash directory. To recompile the code after making changes, type:

../sbt/sbt package

This generates a jar file at ./target/scala-2.10/splashexample.jar. To run the code, submit this jar file as a Spark job from within the splash directory:

splash/$ ../spark/bin/spark-submit --class ExampleName \
  --driver-memory 4G \
  --jars lib/splash-0.2.0.jar target/scala-2.10/splashexample.jar \
  [data files] > output.txt

Here, ExampleName should be replaced by the name of the example (see the following sections). The file splash-0.2.0.jar is the Splash library and splashexample.jar is the compiled code to be executed. The arguments [data files] should be replaced by the path of data files (see the following sections). The result is written to output.txt.


Example 1: document statistics

The Document Statistics example computes the number of lines, words and characters of a text file. It illustrates how to write an application using Splash’s programming interface. Before running this example, let’s take a look at the source code:

import org.apache.spark.{SparkConf,SparkContext}
import splash.core.ParametrizedRDD

object DocumentStatistics {
  def main(args: Array[String]) {
    val path = args(0)
    val sc = new SparkContext(new SparkConf())
    
    val paramRdd = new ParametrizedRDD(sc.textFile(path))
    val sharedVar = paramRdd.setProcessFunction((line, weight, sharedVar, localVar) => {
      sharedVar.add("lines", weight)
      sharedVar.add("words", weight * line.split(" ").length)
      sharedVar.add("characters", weight * line.length)
    }).run().getSharedVariable()
    
    println("Lines: " + sharedVar.get("lines").toLong)
    println("words: " + sharedVar.get("words").toLong)
    println("Characters: " + sharedVar.get("characters").toLong)
  }
}

Using this piece of code, we summarize the procedure to write a Splash program:

  1. Declear a ParametrizedRDD object called paramRdd. A ParametrizedRDD is a wrapper of the standard RDD. It maintains both the RDD elements and the variables to be updated by the stochastic algorithm.
  2. Implement the stochastic algorithm by providing a data processing function to the setProcessFunction method. In this example, the algorithm is implemented in line 11-13.
  3. Start running the algorithm by calling the run method. It makes the data processing function taking a full pass over the dataset. You can call the run method mutliple times to take many passes over the dataset. If there are mutliple cores available, the data processing will be automatically parallelized.
  4. After the algorithm terminates, collect the variable values using the getSharedVariable method.

Now let’s take a closer look at line 11-13:

(line, weight, sharedVar, localVar) => {
  sharedVar.add("lines", weight)
  sharedVar.add("words", weight * line.split(" ").length)
  sharedVar.add("characters", weight * line.length)
}

The data processing function should take four arguments: (1) an element of the RDD; (2) the weight of this element; (3) the set of shared variables and (4) the set of local variables. These arguments are named as (line, weight, sharedVar, localVar). The goal of the data processing function is to read these input to perform particular updates on the variable sets. This is precisely the goal of a stochastic algorithm. The weight of the element is automatically generated by the system. It tells the algorithm how important the element is. A weight of x indicates that the element has been consecutively observed for x times.

The shared variable are global variables that are shared across the entire dataset. The local variable is only associated with this particular element. In this example we only use the shared variables. To read or write the variable set, the algorithm should use operators. This example uses the add operator. The two argument provides the key of the variable to be modified and the quantity to add. There are other types of operators: get, multiply and delayedAdd. The get operator returns the value of the variable. The multiply operator scales the variable by a constant factor. The delayedAdd operator declears an add operation but delays its execution to the future. See the Splash API for more details.

To output the results, the following code uses the get operator to get the final value of the shared variables, then print them to the console:

println("Lines: " + sharedVar.get("lines").toLong)
println("words: " + sharedVar.get("words").toLong)
println("Characters: " + sharedVar.get("characters").toLong)

To run the example, please choose ExampleName = DocumentStatistics and [data files] = data/covtype.txt. The output should be like the following:

Lines: 581012
words: 7521450
Characters: 70024080

After you go through this example, you may write any application by yourself. Look at the Logistic Regression example for another concrete example of how to write the data processing function.


Example 2: machine learning package

Splash contains a collection of pre-built packages for machine learning. One of them is the SGD algorithm for optimization. It provides an API similar to that of the MLlib’s optimization package. Although MLlib support mini-batch SGD, it doesn’t support the native squential version of SGD which is usually much faster. Here is an example of using the SGD package of Splash to learn a logistic regression model:

import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel

object SGDExample {
  def main(args: Array[String]) {
    val path = args(0)
    
    val sc = new SparkContext(new SparkConf())
    val data = MLUtils.loadLibSVMFile(sc, path).repartition(sc.defaultParallelism)
    val numFeatures = data.take(1)(0).features.size
    
    // Split data into training (60%) and test (40%).
    val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    
    // Append 1 into the training data as intercept.
    val training = splits(0).map(x => (x.label, MLUtils.appendBias(x.features))).cache()
    val test = splits(1).cache()
    
    println("Splash Optimization Example")
    println(training.count() + " samples for training and " + test.count() + " samples for testing.")
    println("Feature dimension = " + numFeatures)
    
    // Train a logistic regression model
    val NumIterations = 10
    val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))
    val weightsWithIntercept = (new splash.optimization.StochasticGradientDescent())
      .setGradient(new splash.optimization.LogisticGradient())
      .setNumIterations(NumIterations)
      .optimize(training, initialWeightsWithIntercept)
    
    val model = new LogisticRegressionModel(
      Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)),
      weightsWithIntercept(weightsWithIntercept.size - 1))
    
    // Clear the default threshold.
    model.clearThreshold()
    
    // Compute raw scores on the test set.
    val scoreAndLabels = test.map { point =>
      val score = model.predict(point.features)
      (score, point.label)
    }
    
    // Get evaluation metrics.
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    val auROC = metrics.areaUnderROC()
    
    println("Area under ROC = " + auROC)
  }
}

If you compare this code with the MLlib example, you will see that the only difference is the following lines:

// Train a logistic regression model
val NumIterations = 10
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))
val weightsWithIntercept = (new splash.optimization.StochasticGradientDescent())
  .setGradient(new splash.optimization.LogisticGradient())
  .setNumIterations(NumIterations)
  .optimize(training, initialWeightsWithIntercept)

While the MLlib examples uses the batch L-BFGS to optimize the loss function, this codes uses stochastic gradient descent. The above code first declears a StochasticGradientDescent object, then set the gradient function and the number of iterations. The optimize method returns the vector that minimizes the logsitic loss. In practice, the SGD implementation can be much faster than the batch L-BFGS if the dataset is large. Despite this difference, the input and output of the SGD package is the same as MLlib, so that it can be easily integrated as a part of the machine learning pipeline.

To run this example, please choose ExampleName = SGDExample and [data files] = data/covtype.txt. The output should be like the following:

Splash Optimization Example
348579 samples for training and 232433 samples for testing.
Feature dimension = 54
Area under ROC = 0.8266852123167724

The machine learning pakcage provides efficient implementations for Optimization, Collaborative Filtering and Topic Modelling. Look at the LDA example as another concrete example of how to use the machine learning package to learn a LDA model.

Hands-on Exercises