Functional programming #rstats: closure hack for parallel processing on windows!

4 minute read Published:

This post describes a useful application of functional programming. Parallel processes on Windows don't have access to the global environment. Closures can encapsulate dependencies in the functions you pass, saving you from missing function errors.

Rachel explains closure..

Your situation:

  • you have a big data frame
  • you want to apply a (pretty complex) function to each row
  • you are on a Windows server

For example, you know baby names are much cooler when they have no vowels and no uppercase letters.

# a dataset of babynames
library(babynames)

# a function that drops vowels from names
drop_vowels <- function(text) {
  gsub("[aeiou]", "", text) 
}
mutate_names <- function(tbl) {
  # names are cooler with no vowels and no capital letters.
  dplyr::mutate(tbl, name := drop_vowels(tolower(name)))
}

mutate_names(babynames[1:10, ])
## # A tibble: 10 x 5
##     year sex   name      n   prop
##    <dbl> <chr> <chr> <int>  <dbl>
##  1 1880. F     mry    7065 0.0724
##  2 1880. F     nn     2604 0.0267
##  3 1880. F     mm     2003 0.0205
##  4 1880. F     lzbth  1939 0.0199
##  5 1880. F     mnn    1746 0.0179
##  6 1880. F     mrgrt  1578 0.0162
##  7 1880. F     d      1472 0.0151
##  8 1880. F     lc     1414 0.0145
##  9 1880. F     brth   1320 0.0135
## 10 1880. F     srh    1288 0.0132

But babynames has 1,858,689 rows! It may take a long time to process everything. So your first idea is to start parallel processes, split the data frame, parLapply the function to each table in the split list, then bind_rows them back together.

library(parallel)
cl <- makeCluster(detectCores())
bind_rows(parLapply(cl, split(babynames, babynames$year), mutate_names))
stopCluster(cl)
## Error in checkForRemoteErrors(val) : 
##    2 nodes produced errors; first error: 
##    Evaluation error: could not find function "drop_vowels". 

On Mac, you can choose between parLapply and mclapply, but on Windows you only have parLapply. parLapply creates processes that don’t have access to the global environment (see this Stack Overflow Q for technical details on parallel processing on Windows vs Mac), so add_bang isn’t passed to the clusters that are trying to run mutate_names. They don’t know anything other than what you give them (they also don’t know where to find mutate unless you use dplyr::mutate or library(dplyr) inside the function). (Even then, if your libraries aren’t on the search path, you can’t find them no matter what, but I haven’t solved that problem yet.)

One option is to use parallelsugar (via nathanvan), which approximates the mclapply function on Mac that has access to the global environment. parallelsugar works by using parallel::clusterExport to export objects to the child processes. However, if your data frame is large (or if you have other large things in the global environment, or lots of unrelated packages), it can take forever to set up the clusters.

If you like, you can use the same logic to pick and choose a few things to export until it starts to work. Here we could use:

# start the cluster here
clusterExport(cl, "drop_vowels")
# call parLapply
# stop cluster

But you don’t want to keep track of all the dependencies every time you write a new function, and you can’t do it when you write a wrapper to parLapply that takes a function like mutate_names as an input (because the wrapper function won’t know what the argument function depends on).

Closure to the rescue!

Instead, you can use a closure to encapsulate drop_vowels function inside the mutate_names function, so that mutate_names always has access to that function without you passing it to every new child.

# this returns a function! 
mutate_names_ <- function() { 
  # "save" the `drop_vowels` from the global environment 
  # into this function's environment,
  # which then gets passed to each child process
  drop_vowels <- drop_vowels
  function(tbl) {
    # names are cooler with no vowels and no capital letters.
    dplyr::mutate(tbl, name = drop_vowels(tolower(name)))
  }
}

# then **call** the mutate_names_() function to return the
# function with an environment that contains the functions
# you need, add_bang, drop_vowels
mutate_names <- mutate_names_()

# then start the cluster
cl <- makeCluster(detectCores())
bind_rows(parLapply(cl, split(babynames, babynames$year), mutate_names))
stopCluster(cl)

Done! Now your baby names are cooler in half the time!

More complex use case

The real payoff comes when you write a function to perform the clustering steps together:

parLapply_tbl <- function(.x, .f) {
  # randomly split the tbl into parts
  n_cl <- detectCores()
  .x <- split(.x, sample.int(n_cl, nrow(.x), replace = TRUE)) 
  
  # then cluster apply things on that list of split tbls
  cl <- makeCluster(n_cl)
  tryCatch({
    .x <- bind_rows(parLapply(cl, .x, .f))
  }, finally = {
    stopCluster(cl)   
  })
  .x
}

In this case, you can’t dynamically clusterExport things until you get it right without going with the full parallelsugar approach (passing everything in globalenv, big and small). But the closure passes exactly what you need!

(N.B. the tryCatch keeps you from hanging child processes without any way to kill them.)