Evaluation of time series forecasting using Spark windowing

Friday, December 18, 2015
Evaluation metrics play a critical role in machine learning ecosystem. Especially for machine learning products, evaluation metrics are like the heart beats. They show how healthy the model is and how good it is performing in real life and they are the only numbers that the decision makers care about.
Definition and implementation of evaluation metrics highly depend on the application and it changes from one data product to another one. In this post, I aim to introduce Mean Directional Accuracy (MDA) and how we can calculate it in Spark.
MDA is used in time series prediction where it compares the forecast direction (upward or downward) to the actual realized direction. It is interpreted as the probability that the under study forecasting method can detect the correct direction of the time series. It is highly used metric [1]
in economics applications where the economists is often interested only in directional movement of variable of interest. As an example in macroeconomics, a monetary authority who likes to know the direction of the inflation, to raises interest rates or decrease the rates if inflation is predicted to rise or drop respectively. Another example can be found in financial planning where the user wants to know if the demand has increasing direction or decreasing trend.
For a time series \(A_t\) and its prediction \(F_t\), the Mean Directional Accuracy is calculated as follows:
\[\frac{1}{N} \sum_t \mathbf{1}_{ sign(A_t-A_{t-1}) == sign(F_t-F_{t-1})} \]

Toy example

As a toy example, let’s use the GDP time series dataset that shows the GDP of nine major countries from 1950 to 1983 [2]. Moreover, for each country and for a given year the GDP prediction for that year is also available. For example, the following plot shows the yearly actual GDP and the predicted GDP for United States:

The goal is to calculate the MDA metric for GDP prediction per country in Spark. Note that the data is not that big that one needs Spark to do the calculation. This is just a toy to show how to use Spark window for such a problem. By loading data into Spark dataframe we have the following data structure where each row shows the GDP and GDP prediction for each year and each country.
|country|     gdp|        predicted|year|
|    USA|4.470303|5.012966057409855|1950|
|    USA|4.734335|4.404831278549317|1951|
|    USA|4.826502|4.978599656728077|1952|
|    USA|4.981746|5.035932340179457|1953|
|    USA| 4.79081|4.853806067158911|1954|
By looking at MDA formulation, we need to partition the dataframe by country and then for each year, compare the GDP direction and its prediction from previous year to year of study. In Spark data frame, spark.sql.window is the tool for this operation [3].
We can divide the MDA metric calculation into 4 steps.
Step 1: window configuration
As mentioned before, the partioning is per country. Then we order the data for each partition by year since we would like to compare each year GDP to previous year.
from pyspark.sql import window
windowSpec = window.Window.partitionBy('country')\
For MDA, we need to know the GDP and its prediction in last year. Therefore, the size of the moving window needs to be only two: the current entry and the one before. This is coded in windwoSpec as following:
mywindow = windowSpec.rowsBetween(-1,0)
Step 2: add last year column to each row
To calculate the direction of GDP and its prediction, for each row of data frame we need to add the previous year GDP and prediction. For that, in each window we only pick the first element (corresponding to last year data) and then bind it to the current row.
import pyspark.sql.functions as func
df_w = df.withColumn("gdp_last_year", func.first(df.gdp).over(mywindow))\
         .withColumn("predicted_last_year", func.first(df.predicted).over(mywindow))
Let’s look at the new dataframe:
|country|     gdp|    predicted|year|gdp_last_year|predicted_last_year|
|    USA|4.470303|5.01296605740|1950|     4.470303|  5.012966057409855|
|    USA|4.734335|4.40483127854|1951|     4.470303|  5.012966057409855|
|    USA|4.826502|4.97859965672|1952|     4.734335|  4.404831278549317|
|    USA|4.981746|5.03593234017|1953|     4.826502|  4.978599656728077|
|    USA| 4.79081|4.85380606715|1954|     4.981746|  5.035932340179457|
Step 3: find the sign of GDP and GDP prediction for each year
Now, we need to compare each year GDP and its prediction with previous year and find the direction. Since we have all this information per row in df_w data frame, this operation can be done per row. We just need to define a Spark User Defined Function (UDF) and apply it to all the rows:
def f(x):
    if x > 0:
        return 1.0
        return -1.0
sign_udf = func.UserDefinedFunction(lambda x: f(x), types.DoubleType())
df_w = df_w.withColumn("sign_gdp", sign_udf(df_w.gdp - df_w.gdp_last_year))\
           .withColumn("sign_predicted", sign_udf(df_w.predicted - df_w.predicted_last_year))    
The updated df_w dataframe is as follows:
|country|     gdp|        predicted|year|gdp_last_year|predicted_last_year|sign_gdp|sign_predicted|
|    USA|4.470303|5.012966057409855|1950|     4.470303|  5.012966057409855|    -1.0|          -1.0|
|    USA|4.734335|4.404831278549317|1951|     4.470303|  5.012966057409855|     1.0|          -1.0|
|    USA|4.826502|4.978599656728077|1952|     4.734335|  4.404831278549317|     1.0|           1.0|
|    USA|4.981746|5.035932340179457|1953|     4.826502|  4.978599656728077|     1.0|           1.0|
|    USA| 4.79081|4.853806067158911|1954|     4.981746|  5.035932340179457|    -1.0|          -1.0|
Step 4: calculate the MDA
Finally, for each country we need to apply the indicator function for each directional prediction and take the average. This can be done as follows:
def indicator_function(x):
    if x:
        return 1.0
        return 0.0

## apply indicator function to each directional prediction
mda_udf = func.UserDefinedFunction(lambda x: indicator_function(x), types.DoubleType())
df_w = df_w.withColumn("MDA", mda_udf(df_w.sign_gdp == df_w.sign_predicted))

## MDA calculation
mda_result = df_w.groupBy('country').mean()
mda_result = mda_result.withColumnRenamed("AVG(MDA)", "MDA")\
The MDA result for all nine countries is give below:
|country|               MDA|
| GREECE|0.7058823529411765|
|     UK|0.6470588235294118|
| CANADA|0.6470588235294118|
|    USA|0.7058823529411765|
|  ITALY|0.7352941176470589|
| SWEDEN|0.7352941176470589|
| FRANCE|0.7941176470588235|



  1. Thank you for a great post. I'm working on a file popularity study and I could use your knowledge. Could you please provide me your email address so I can discuss with you? Thanks.

    1. Glad to help, but unfortunately due to security reason I can't relieve my email here. I suggest you add me @ LinkedIn (linked in provided on left side of the post) and send me a message there. Thanks



Favorite Quotes

"I have never thought of writing for reputation and honor. What I have in my heart must out; that is the reason why I compose." --Beethoven

"All models are wrong, but some are useful." --George Box

Copyright © 2015 • Ensemble Blogging