I’ve previously written about doing cross-validation with sparklyr. This post will serve as an update, given the changes that have been made to sparklyr. The first half of my previous post may be worth reading, but the section on cross-validation is wrong, in that the function provided no longer works. If you want an overview of sparklyr, and how it compared to SparkR, see this post. Bear in mind, however, that post was written in December 2017, and both packages have added functionality since then.

Here, we’re going to start off by to writing the necessary functions for cross-validation ourselves. The hope is that this will make clear how CV works, as well as introducing some sparklyr functions. At the end, the in-built functions for cross-validation are discussed. A future post will show how these in-built functions can be used to do feature selection by cross-validation.


First we do some setup by loading in some packages and creating a local Spark connection.


sc <- spark_connect(master = 'local')

Here we’re going to use the famous, if a little dull, titanic dataset. We’re getting this from the titanic package, which you’ll need to install if you don’t have it. To keep things simple, we’re selecting a subset of columns and making sure everything in an integer.

# get the training data
df_titanic <- titanic::titanic_train %>%
  # select some columns
  select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare) %>%
  # make the col names lower case
  rename_all(tolower) %>%
  # turn sex to a interger
  mutate(sex = ifelse(sex == 'male', 1, 0)) %>%
  # remove NAs

Now we’ve cleaned the data, we can copy it to our Spark connection, creating a Spark DataFrame. In practice, we could have started by copying the data to Spark. We then could have used Spark’s string indexer function (ft_string_indexer()) to clean the sex column. This function will automatically index a column of strings with integers.

# create a Spark DataFrame for the data
tbl_titanic <- copy_to(sc, df_titanic)


We can now turn to doing some cross-validation. As noted above, we’re doing this ‘long-hand’ to begin with, before looking at the functions built into sparklyr.

Using user-defined functions

In order to do cross-validation, the first thing we need to do is split our data into different folds. To do this we’ll use sdf_partition(). To use this function we need to specify our data and then some weights to use when partitioning the data. These weights should sum to 1, and specify the proportion of data to put into each partition. For \(k\)-fold validation we want \(k\) equal folds, meanings our weights should just be \(1 / k\).

# number of folds
k = 5

# create the weights
weights <- rep(1 / k, times = k)

# give names to the weights so that our partitions are named
# sdf_parition will fail if we don't do this
names(weights) <- paste0("fold", 1:k)

# create the partitioned data
tbl_titanic_cv_split <- sdf_partition(
  weights = weights, 
  seed = 2018

We now have our data split into 5 equally sized folds. For doing cross-validated training/testing we don’t really want the data in this form. Instead we want \(k\) test sets that contains the data from one fold, and \(k\) training sets that contains the combined data from the others folds. We can get these training sets by creating a list with \(k\) elements, where each element contains the data from \(k - 1\) folds joined together.

To get this data, the first thing we need to do is get the indices for the folds that will make up these different training set.

# get the indicies for our different training sets
# e.g. a dataFrame for partitions 1, 2, 3, and 4
K <- 1:k
(indices <- map(K, ~ K[-.x]))
## [[1]]
## [1] 2 3 4 5
## [[2]]
## [1] 1 3 4 5
## [[3]]
## [1] 1 2 4 5
## [[4]]
## [1] 1 2 3 5
## [[5]]
## [1] 1 2 3 4

It may not be obvious how this code is working, so let’s talk through it. In the above code K is the vector c(1, 2, 3, 4, 5). The first argument to map() is just saying that K is the thing we want to loop over. Then the bit after the ~ is the function we want to apply. K[-.x] is saying subset K removing a particular element. .x is used inside map to refer to the thing we’re iterating over. On the first iteration, .x will be 1, the first element of K. This means that K[-.x] = K[-1] = c(2, 3, 4, 5). In other words, K without it’s first element.

Importantly, K[-1] is saying ‘remove the first element of K, not ’remove the element of K that equals 1’. It just so happens here that the first element of K is 1. For example, if K were instead c(5, 4, 3, 2, 1) then K[-1] would leave c(4, 3, 2, 1).

Next we use these indices to create our \(k\) training data sets. We create these training sets by looping over our indices and binding together the partitions of tbl_titanic_cv_split that match those indices. This gives us back a list where each element contains 4 out of the 5 partitions from tbl_titanic_cv_split. We’re using sdf_bind_rows() here, which is analogous to dplyr::bind_rows() but for Spark DataFrames.

# create our training sets by binding together partitions 
# of data_cv. We get back a list where each element is a 
# dataFrame of the partions indexed by each element 
# of indices combined together
tbl_titanic_cv_train <-
  map(indices, ~ sdf_bind_rows(tbl_titanic_cv_split[.x]))

# name the elements
names(tbl_titanic_cv_train) <- paste0("train", 1:k)

Now that we’ve been through the process of creating our training sets, lets make it into a function. The arguments to this function are the data, the number of cross-validation folds, and an optional seed for the partitioning of the data. Following sdf_partition(), if the seed is not specified then it is generated by sample(.Machine$integer.max, 1).

sdf_cross_validation_train_sets <- 
           k = 5,
           seed = sample(.Machine$integer.max, 1)) {
  # create weights for partitioning the data
  weights <- rep(1 / k, times = k)
  # name the elements of weights
  names(weights) <- paste0("fold", 1:k)
  # partition the data using weights
  tbl_cv_split <-
    sparklyr::sdf_partition(tbl, weights = weights, seed = seed)
  # get the indicies for our different training sets
  # e.g. a dataFrame for partitions 1, 2, 3, and 4
  K <- 1:k
  indices <- purrr::map(K, ~ K[-.x])
  # create our training sets by binding together 
  # partitions of data_cv. We get back a list where 
  # each element is a dataFrame of the partions
  # indexed by each element of indices combined together
  tbl_cv_train <-
    purrr::map(indices, ~ sparklyr::sdf_bind_rows(tbl_cv_split[.x]))
  # name the training sets
  names(tbl_cv_train) <- paste0("train", 1:k)

The next thing we’ll want is the test sets to go with each of our training sets. The way our indices worked was that the first fold was left out of the first training set, the second out of the second at so on. Thus, the first training set contains the 2nd, 3rd, 4th and 5th partitions of the data. This means that the original partitioned data is what we’d used as test data. The \(i^{th}\) element of tbl_cv_split is the partition not in the \(i^{th}\) element of tbl_cv_train. We can add this into our function.

sdf_cross_validation_train_test_sets <-
           k = 5,
           seed = sample(.Machine$integer.max, 1)) {
    # first off create weights for partitioning the data
    weights <- rep(1 / k, times = k)
    # name the elements of weights
    names(weights) <- paste0("fold", 1:k)
    # partition the data using weights
    data_cv_split <-
      sparklyr::sdf_partition(data, weights = weights, seed = seed)
    # get the indicies for our different training sets
    # e.g. a dataFrame for partitions 1, 2, 3, and 4
    K <- 1:k
    indices <- purrr::map(K, ~ K[-.x])
    # create our training sets by binding together 
    # partitions of data_cv. We get back a list where each 
    # element is a dataFrame of the partions
    # indexed by each element of indices combined together
    data_cv_train <-
      purrr::map(indices, ~ sparklyr::sdf_bind_rows(data_cv_split[.x]))
    # name the training sets
    names(data_cv_train) <- paste0("train", 1:k)
    # rename the test sets
    names(data_cv_split) <- paste0("test", 1:k)
    # return a tibble with list columns
    # for the training and test sets
    tibble::data_frame(cv_fold = K,
                       train = data_cv_train,
                       test = data_cv_split)

The function gives us back a data frame with list columns for the training and test data. By putting the data in this format, we can use some of the cool techniques for combining purrr with list columns. Let’s take a look at what the function returns.

# use the functions
tbl_titanic_cv <- sdf_cross_validation_train_test_sets(
  seed = 2018

## # A tibble: 5 x 3
##   cv_fold train        test        
##     <int> <named list> <named list>
## 1       1 <tbl_sprk>   <tbl_sprk>  
## 2       2 <tbl_sprk>   <tbl_sprk>  
## 3       3 <tbl_sprk>   <tbl_sprk>  
## 4       4 <tbl_sprk>   <tbl_sprk>  
## 5       5 <tbl_sprk>   <tbl_sprk>

In order to actually do our modelling, we’re going to need to create a formula to pass into the modelling functions. We can do this with a bit of string manipulation. First off lets specify our response/outcome column, and the features we want to feed in as predictors.

# the response columns
response = 'survived'

# the colnames of our data
columns <- colnames(tbl_titanic)

# get the col names that aren't the response
features <- columns[columns != response]

Now we’ve got this stuff we can build up our formula. First we’ll collapse the features into a single string separated by ' + '. We’ll then put 'am ~ ' on the front and turn the whole string into a formula.

model_formula <- features %>%
  paste0(collapse = ' + ') %>%
  paste0(response, ' ~ ', .) %>%

## survived ~ pclass + sex + age + sibsp + parch + fare
## <environment: 0x7fa5ebf76c58>

Again, lets wrap this up into a function. We’re setting it up so that a vector of features can optionally be passed in. If it isn’t then every column in the data except the response are used as the predictors.

get_model_formula <- function(data, response, features = NULL) {
  # If a vector of feature names hasn't been specified
  if (is.null(features)) {
    # Get the column names of the data
    columns <- colnames(data)
    # Create the feature names by using all 
    # the columns except the response
    features <- columns[columns != response]
  features %>%
    paste0(collapse = ' + ') %>%
    paste0(response, ' ~ ', .) %>%

Now we can use the function to verify we get the same result back.

# use the function
get_model_formula(tbl_titanic, 'survived')
## survived ~ pclass + sex + age + sibsp + parch + fare
## <environment: 0x7fa5ebfb4f48>

Now we have a model formula, we can actually do some modelling. To keep this simple we’re going to be using logistic regression. This is provided for us by Spark, and can be called with the ml_logistic_regression() function.

Firstly, we’re going to add a new list column that contains model fits for each training set. We do this by putting map() inside a mutate(). We create the new column fit_lr by mapping over the train column, passing each element as the first argument to ml_logistic_regression(). Additionally, we pass the formula that we created to the formula argument of ml_logistic_regression().

tbl_titanic_cv <- tbl_titanic_cv %>%
  mutate(fit_lr = map(train,
                      ~ ml_logistic_regression(
                        formula = model_formula)))

Now we have some trained models, we can add the predictions on the test data into another list column. We’re using map2() as we want to loop over our fits and test data simultaneously.

tbl_titanic_cv <- tbl_titanic_cv %>%
  mutate(pred_lr = map2(fit_lr, test, ml_predict))

Now we’ve got our predictions, we can add in a column with some evaluation metrics. Here we’re using map_dbl() to indicate that we want a vector of double values back. If we didn’t do this, we’d get a list where each element is a single value.

tbl_titanic_cv <- tbl_titanic_cv %>%
  mutate(eval_lr = map_dbl(pred_lr, 
                           label_col = response))

## # A tibble: 5 x 6
##   cv_fold train        test         fit_lr       pred_lr      eval_lr
##     <int> <named list> <named list> <named list> <named list>   <dbl>
## 1       1 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>     0.839
## 2       2 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>     0.837
## 3       3 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>     0.855
## 4       4 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>     0.888
## 5       5 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>     0.848

Rather than doing this manually we could wrap the whole process into a function, making use of some of the helpers we’ve written along the way. We have a model_fun argument to our function, which allows for different algorithms to be passed in.

ml_classification_cross_validation <- 
  function(tbl, # a tbl_spark
           response, # the response column as a character
           model_fun, # the modelling function to use (unquoted)
           k = 5, # number of folds
           seed = sample(.Machine$integer.max, 1), # optional seed for partitioning
           features = NULL, # character vector or else all the columns except response
           ...) { # additional arguments
  # quote the dots
  dots = rlang::quos(...)
  # get the dataframe with train and test sets
  tbl_cv <- sdf_cross_validation_train_test_sets(tbl, k, seed)
  # get the model formula
  model_formula <- get_model_formula(tbl, response, features)
  # fit, predict, evaluate
  tbl_cv %>%
      fit = purrr::map(.x = train, ~ model_fun(.x, formula = model_formula,!!!dots)),
      pred = purrr::map2(fit, test, ml_predict),
      eval = purrr::map(pred, ml_binary_classification_evaluator, label_col = response),
      eval = unlist(eval)

Now lets use this function to see that we get the same result.

tbl_titanic_cv <-
    tbl = tbl_titanic,
    response = 'survived',
    model_fun = ml_logistic_regression,
    k = 5,
    seed = 2018

# print out the result
## # A tibble: 5 x 6
##   cv_fold train        test         fit          pred          eval
##     <int> <named list> <named list> <named list> <named list> <dbl>
## 1       1 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>   0.839
## 2       2 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>   0.837
## 3       3 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>   0.855
## 4       4 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>   0.888
## 5       5 <tbl_sprk>   <tbl_sprk>   <ml_mdl__>   <tbl_sprk>   0.848

We can average the eval column to get a summary of the performance.

# take the average of the eval metric
## [1] 0.853543

Our function has a ... argument to allow us to pass in additional arguments to the various modelling functions offered by Spark. Lets demonstrate how this works using a random forest.

tbl_titanic_cv_rf <-
    tbl = tbl_titanic,
    response = 'survived',
    model_fun = ml_random_forest_classifier,
    k = 5,
    seed = 2018,
    num_trees = 50

## # A tibble: 5 x 6
##   cv_fold train        test         fit          pred          eval
##     <int> <named list> <named list> <named list> <named list> <dbl>
## 1       1 <tbl_sprk>   <tbl_sprk>   <ml_md___>   <tbl_sprk>   0.840
## 2       2 <tbl_sprk>   <tbl_sprk>   <ml_md___>   <tbl_sprk>   0.837
## 3       3 <tbl_sprk>   <tbl_sprk>   <ml_md___>   <tbl_sprk>   0.870
## 4       4 <tbl_sprk>   <tbl_sprk>   <ml_md___>   <tbl_sprk>   0.893
## 5       5 <tbl_sprk>   <tbl_sprk>   <ml_md___>   <tbl_sprk>   0.856

We could now plot the performance of the two different models we’ve tried.

# pull out the evaluation metrics from the two tibbles
tbl_model_eval <- tibble(
  model = rep(c('Logistic regresion', 'Random forest'), each = 5),
  auc = c(tbl_titanic_cv$eval, tbl_titanic_cv_rf$eval)

# boxplots of the auc over the 5 folds
ggplot(tbl_model_eval, aes(model, auc, fill = model)) +
  coord_cartesian(ylim = c(0.75, 1)) +
  geom_boxplot(alpha = .75, show.legend = FALSE) +
  scale_fill_viridis_d(option = 'C', end = 0.4) +
  theme_minimal(base_size = 16, base_family = 'mono') +
  theme(panel.grid.major.x = element_blank()) +
  labs(x = 'Model', y = 'Area under the ROC curve')

Using in-built functions

Now we’re happy with the logic of how cross-validation works, we can leverage some in-built functions to do cross-validation. The cross-validation functions built into Spark / sparklyr are designed for parameter tuning. However, we can use them to get a cross-validated estimate of performance for a single model. To make use of the in-built functions we can use Spark pipelines. Pipelines describe a number of stages we want to apply to some input data, such as feature transformations and fitting a model.

Below we’re creating a pipeline by intialising it for our Spark connection, specifying a model formula, and specifying an algorithm. ft_r_formula() will take an R formula and use this to specify how our model should be fit to Spark.

pipeline <- ml_pipeline(sc) %>%
  ft_r_formula(survived ~ .) %>%

In order to use the cross-validation function, we need to provide a grid of tuning parameters to try. As we only want to fit a single model, we can just provide a single grid of parameters containing the default values.

grid <-
  list(logistic_regression = list(elastic_net_param = 0, reg_param = 0))

Next we specify how we want our cross-validation to work. We are not actually doing the cross-validation here, just specifying its parameters.

cv <- ml_cross_validator(
  estimator = pipeline, # use our pipeline to estimate the model
  estimator_param_maps = grid, # use the params in grid
  evaluator = ml_binary_classification_evaluator(sc), # how to evaluate the CV
  num_folds = 5, # number of CV folds
  seed = 2018

So far all we’ve done is lay out a pipeline. Nothing has actually ‘happened’ in terms of fitting a model to our data. Instead we’ve laid out a number of step that we ultimately want to be carried out.

  1. ml_pipeline(sc): create a pipeline associated with our Spark connection (sc)
  2. ft_r_formula: use the Spark equivalent of this R formula when it comes to fitting the model
  3. ml_logistic_regression: use logistic regression when fitting the model
  4. grid: these are parameters to try out when doing the cross-validation
  5. ml_cross_validator: create a cross-validator where the estimator is our pipeline, combining a model formula and algorithm. Try out the parameters in grid and use ml_binary_classification_evaluator to evaluation performance.

Now we have all these steps, we can actually carry out the cross-validation.

cv_model <- ml_fit(cv, tbl_titanic)

This gives us a list back containing a range of elements.

##  [1] "uid"                  "param_map"            "estimator"           
##  [4] "evaluator"            "estimator_param_maps" "best_model"          
##  [7] "num_folds"            "metric_name"          "avg_metrics"         
## [10] "avg_metrics_df"       "sub_models"           ".jobj"

There are some useful bits we can pull out, like a data frame of performance metrics. This would be very useful if we’d actually tried a range of parameters.

##   areaUnderROC elastic_net_param_1 reg_param_1
## 1    0.8539799                   0           0

One thing to note is that the in-built cross-validation function is a lot faster than the one we wrote. However, it does not return performance metrics for each fold, just an overall average. If we were fitting a lot of models we’d have to use the in-built functions for performance reasons. In addition, Spark 2.3 and sparklyr 0.8 support doing cross-validation in parallel. This will result in considerable performance gains, if you have access to a computer cluster.

In the next post we’ll explore using the in-built functions some more to do feature selection. We’ll also look at how you can use ml_cross_validator without using pipelines, while discussing some of the advantages of pipelines.