Hands-on Exercises

Time Series Analysis on Surrogate Data

https://github.com/bellettif/sparkGeoTS

Getting Started

In the shell, from the usb directory, please enter

usb/$ ./spark/bin/spark-shell --master "local[4]" --jars timeseries/sparkgeots.jar --driver-memory 2G

and then please copy and paste the following in the Spark shell:

import breeze.linalg._
import breeze.stats.distributions.Gaussian
import breeze.numerics.sqrt
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import main.scala.overlapping._
import containers._
import timeSeries._

implicit def signedDistMillis = (t1: TSInstant, t2: TSInstant) => (t2.timestamp.getMillis - t1.timestamp.getMillis).toDouble

In this tutorial we are going to practice on artificially generated data and show that we are able to successfully identify autoregressive models.

Data Specification

Let’s generate some data with an order 3 Autoregressive Model.

In such a model X_t = A_1 * X{t-1} + A_2 * X{t-2} + A_3 * X_{t-3} + iid noise.

val actualP = 3

We have 3 spatial dimensions (3 sensors or data feeds).

val d = 3

Let’s generate 10000 samples (you can also try a million for instance).

val N = 10000L

Time Series Configuration

Let’s specify that there is one millisecond between each sample.

val deltaTMillis = 1L

Let’s have an overlap between partitions of 100 ms. The data in our partitions will overlap as follows:

------------------------------------
                                -----------------------------------
                                                              ---------------------------------

This is necessary to estimate our models without shuffling data between nodes. With this setup, we will be able to calibrate models of any order lower that 100 ms / 1 ms = 100.

val paddingMillis = 100L

We choose to have 8 partitions.

val nPartitions = 8

We gather all that information into the implicit val config which will be used later on in all the calls we make.

implicit val config = TSConfig(deltaTMillis, d, N, paddingMillis.toDouble)

Data Generation

(1) We generate the coefficients of the model randomly and try to enforce causality.

val ARCoeffs: Array[DenseMatrix[Double]] = Array.fill(actualP){DenseMatrix.rand[Double](d, d) - (DenseMatrix.ones[Double](d, d) * 0.5)}
Stability.makeStable(ARCoeffs)

We have the same amount of noise everywhere.

val noiseMagnitudes = DenseVector.ones[Double](d)

Let’s generate the surrogate data.

val rawTS = Surrogate.generateVAR(
    ARCoeffs,
    d,
    N.toInt,
    deltaTMillis,
    Gaussian(0.0, 1.0),
    noiseMagnitudes,
    sc)

And put it in the overlapping data structure

val (timeSeriesRDD: RDD[(Int, SingleAxisBlock[TSInstant, DenseVector[Double]])], _) =
    SingleAxisBlockRDD((paddingMillis, paddingMillis), nPartitions, rawTS)

We can inspect the parameters and plot some data

PlotTS.showModel(ARCoeffs, Some("Actual parameters"))
PlotTS(timeSeriesRDD, Some("In Sample Data"))

Note: If at this point the plot shows there are NAN values, it means that the model we have randomly generated is numerically unstable. This can happen, let’s just regenerate the coefficients of the model and the data. Go back to (1) if unfortunately this has happened.

Let’s take a look at the auto-correlation structure of the data we have generated. If that correlation vanishes to 0 after lag q, the data we see is most likely generated by an MA(q) model.

val (correlations, _) = CrossCorrelation(timeSeriesRDD, 4)
PlotTS.showModel(correlations, Some("Cross correlation"))

Let’s take a look at the partial auto-correlation structure of the data we have generated. If that correlation vanishes to 0 after lag p, the data we see is most likely generated by an AR(p) model.

val (partialCorrelations, _) = PartialCrossCorrelation(timeSeriesRDD,4)
PlotTS.showModel(partialCorrelations, Some("Partial cross correlation"))

Monovariate Analysis

Let’s fit a univariate AR model on each spatial dimension of the data

val p = actualP
val mean = MeanEstimator(timeSeriesRDD)
val vectorsAR = ARModel(timeSeriesRDD, p, Some(mean)).map(_.covariation)

Now we compute the prediction residuals (in sample).

val residualsAR = ARPredictor(timeSeriesRDD, vectorsAR, Some(mean))

We also compute the variance - covariance matrix of the residuals.

val residualSecondMomentAR = SecondMomentEstimator(residualsAR)

Let’s inspect the result:

PlotTS.showUnivModel(vectorsAR, Some("Monovariate parameter estimates"))
PlotTS(residualsAR, Some("Monovariate AR residual error"))

Multivariate Analysis

Let’s fit a multivariate AR model taking the information of all sensors jointly into account.

val (estVARMatrices, _) = VARModel(timeSeriesRDD, p)

We compute the predition residuals.

val residualVAR = VARPredictor(timeSeriesRDD, estVARMatrices, Some(mean))

Let’s take a look at the variance - covariance matrix of the residuals. It should be closer to a diagonal matrix than in the univariate analysis.

val residualSecondMomentVAR = SecondMomentEstimator(residualVAR)
PlotTS.showModel(estVARMatrices, Some("Multivariate parameter estimates"))
PlotTS.showCovariance(residualSecondMomentAR, Some("Monovariate residual covariance"))
PlotTS.showCovariance(residualSecondMomentVAR, Some("Multivariate residual covariance"))

Let’s compare the errors between univariate and multivariate models. We compute the mean squared errors which sum all squared errors along the sensing dimensions. The averate error we make when predicting is therefore obtained by dividing by the number of sensors and taking the square root of the result.

println("AR in sample error = " + trace(residualSecondMomentAR))
println("Monovariate average error magnitude = " + sqrt(trace(residualSecondMomentAR) / d))
println("VAR in sample error = " + trace(residualSecondMomentVAR))
println("Multivariate average error magnitude = " + sqrt(trace(residualSecondMomentVAR) / d))
println()
Hands-on Exercises