Introduction

Steve Weston’s ‘foreach’ package is a remarkable parallel/distributed computing framework for the R language. Similarly to the parLapply, clusterMap, and related functions from R’s ‘parallel’ package, foreach enables computation across multiple CPU cores and computers. Parallel/distributed computation is engaged by using adapter packages that bridge the interface between R and particular back end computing frameworks like MPI or those used by R’s ‘parallel’ package.

Code using foreach runs sequentially in the absence of a parallel adapter, and works uniformly, more or less, across different adapters. Importantly, adapters are registered at run-time, allowing programmers and R package authors to write programs independent of specific parallel computing implementations. Package authors decide which parts of their programs can run in parallel, and users decide how to run in parallel depending on their available resources. This philosophy is shared by the closely-related ‘future’ package.

In the high-performance computing vernacular, the foreach package defines an application programming interface (API) for a remote procedure call, map/reduce framework. We will see specific examples of these concepts below.

Despite the long history of the foreach package, developed in 2008 by Steve and his team at Revolution Computing (including me), and the availability of many supported adapter packages, the internal foreach adapter interface is not very explicitly documented. This guide documents the foreach API and serves as a basic introduction to writing adapter packages.

It should be noted that there are neat ideas in the foreach package aside from the topics covered here, including loop composition and set comprehension-style syntax inspired by Python and Haskell (see the %:% operator and when function).

The structure of a foreach loop

Basic foreach loops that can take advantage of parallel adapters consist of three parts:

foreach(…) %dopar% { R expression }


  1. A foreach function call
    Returns a special object of class foreach that specifies what is to be mapped to the R expression, and how the results should be reduced.
  2. An R expression
    An R expression typically involves the for loop variables and optionally any other valid R object from the enclosing loop environment. It is the responsibility of the adapter to iterate over loop variables and map them to the expression.
  3. The %dopar% operator
    Combines the foreach call output with the R expression along with a version of its calling environment, and submits the work to a registered parallel adapter function for evaluation. If no adapter is registered, a default sequential evaluation one is used.

Unlike traditional for loops, each loop iteration within a foreach loop should be considered independent of the others, similar to the expression evaluations in replicate or function evaluations in Map or lapply. Also unlike traditional for loops, but again like replicate (with simplify=FALSE), the result from each iteration is returned, by default in a list but optionally combined through a reduction function.

Consider the following really simple example:

system.time(print(
  foreach(j = seq(2), .combine=sum) %dopar% {
    Sys.sleep(1)
    j
  }
))
## Warning: executing %dopar% sequentially: no parallel backend registered
## [1] 3
##    user  system elapsed 
##   0.056   0.000   2.061

Note the warning that no parallel backend adapter was registered and the loop has been evaluated sequentially. This loop iterates over a parameter variable j and the loop expression is evaluated twice, once for each value of j. The result of each expression evaluation is collected and passed to the sum function specified by the .combine option. In other words, the two possible values of j are mapped to the expression and the returned results are reduced through the sum function.

Now let’s try a simple parallel adapter using two local CPU cores:

library(doParallel, quietly = TRUE)
registerDoParallel(2)
system.time(print(
  foreach(j = seq(2), .combine=sum) %dopar% {
    Sys.sleep(1)
    j
  }
))
## [1] 3
##    user  system elapsed 
##   0.020   0.020   1.028

Note that the run time has been roughly halved as expected thanks to concurrent evaluation of each expression evaluation.

The foreach function is responsible for setting up important loop details. It returns an R object of class foreach containing details including:

  • How many loop iterations to run and, optionally, parameterized loop variable values
  • How to reduce the results
  • Whether the results need to be ordered or not
  • How to handle errors
  • Identifying R objects and packages required by the expression

The %dopar% operator is the de facto foreach parallel adapter API. The %dopar% operator is responsible for looking up a function associated with the registered parallel adapter and invoking that function with arguments:

  • The output of the foreach function of class foreach
  • The expression
  • The enclosing environment of the expression
  • Initialization data specific to the parallel adapter

It’s the responsibility of the parallel adapter function to map the data to the R expression, retrieve the results, and feed the results into an accumulator (reduction) function defined by the foreach object to return the result.

Run-time and ‘compile’-time options

Foreach adapters supply a registration function to register the adapter with %dopar%. These functions are usually called registerDo*, replacing * with the name of the adapter. The registration function allows users to configure run-time specific adapter details.

Arguments supplied to the foreach function itself, by comparison, are so-called ‘compile’-time options and are independent of run-time specific behavior.

The foreach (and future package) philosophy is that computed results should be independent of run-time options. Be aware that in practice that can’t always be achieved. Numerical computations, for instance, are subject to the non-associativity of floating point arithmetic and bounded integer values, among other potential pitfalls.

Writing a simple adapter

This guide proceeds by building a simple adapter called ‘doR’ designed to be as simple of an example as I could think of. The adapter does not compute its results in parallel, but does illustrate, very simply, most of the important details needed to implement an adapter.

The doR adapter launches worker R processes on the local machine using the ‘callr’ package. Each worker R process evaluates, in sequence, one or more loop iterations depending on a run-time option.

Foreach adapters consist of two main parts: a registration function, and an evaluation function.

Registration

Registration declares the name of the evaluation function that %dopar% will end up calling, along with any extra run-time options your adapter requires. Our simple doR adapter has a single option, chunkSize that determines the number of loop iterations run per task.

Registration proceeds by calling the setDoPar function which takes three arguments:

  1. The adapter evaluation function
  2. A list of adapter-specific run-time options
  3. An optional ‘info’ function that reports information about the adapter

We don’t use the ‘info’ function in our simple example.

registerDoR <- function(chunkSize = 1) {
  setDoPar(doR, data = list(chunkSize = chunkSize))
}

We still need to write the doR evaluation function, covered in the next section.

Evaluation function

The heart of every foreach adapter is the evaluation function. The %dopar% API invokes this function with four arguments:

  • An object of class foreach created by the foreach function call
  • The R expression to evaluate
  • The local environment enclosing the R expression
  • A list of optional run-time parameters from the registration function

It’s pretty simple!

The evaluation function is responsible for mapping loop iteration variables to the R expression, and for setting up a foreach ‘accumulator’ function to combine the output and return a result.

The adapter evaluation function needs to perform the following tasks:

  1. Set up a reduction function, called an ‘accumulator’ by foreach, that processes results based on foreach options as they arrive.
  2. Construct a list of iterator variable values specified by the foreach loop.
  3. Set up an environment, based on the local expression enclosing environment, for export to the workers that the expression can be evaluated in. The environment should contain all R objects required by the expression and make sure that functions within the expression have valid environments on the workers.
  4. Construct an evaluation wrapper to run on the worker that:
    1. loads any required packages
    2. assigns loop iteration variables in the exported environment
    3. optionally sets a parent namespace for the exported environment
    4. evaluates the expression on the worker within the exported environment
    5. returns evaluated results

The example below passes the iteration variables, environment and expression to the workers, and retrieves results from the workers, in serialized form. Note that this is not required by the ‘callr’ package which can do that for us under the hood. We explicitly use serialized values because typical real-world foreach adapters will need to do that anyway.

Here is the full evaluation function with lots of in-line comments explaining each step:

# The %dopar% API:
# obj    foreach object
# expr   the expression to evaluate
# envir  the local enclosing environment
# data   run-time options
doR <- function(obj, expr, envir, data)
{
  if (!inherits(obj, "foreach")) stop("obj must be a foreach object")
  it <- iter(obj)
# Set up the accumulator function that will process returned results
  accumulator <- makeAccum(it)
# Construct a list of iterator variable values
  argsList <- as.list(it)

# Set up an environment for export within with to evaluate expr Just in case we're called from within a function,
# we need an environment will all the optional `...` arguments (if any).
  exportenv <- tryCatch({
    qargs <- quote(list(...))
    as.environment(eval(qargs, envir))
  }, error = function(e) {
    new.env(parent = emptyenv())
  })

# The foreach `getexports` function automagically determines a list of R objects required by 'expr'.  `getexports`
# is not functional, it modifies its 'exportenv' argument. Beta version of foreach use the `getGlobalsAndPacakges`
# function from the future package. We use that explicitly below to discover required packages.  We combine
# those required packages with explicitly listed packages in 'obj' and the optional package parent environment for
# a complete list of require packages.
  noexport <- union(obj$noexport, obj$argnames)
  getexports(expr, exportenv, envir, bad=noexport)
  packages <- unique(c(obj$packages,
    future::getGlobalsAndPackages(expr, envir)$packages, obj$parentenv))

# There may still be some additional *explicitly* specified objects to  export. These require some additional
# processing, similar to what  `getexports` does under the hood. It would be nice to expand the  foreach
# `getexports` function to handle these objects too!
  export <- setdiff(obj$export, unique(c(ls(exportenv), obj$noexport)))
  for (sym in export) {
    if (!exists(sym, envir, inherits=TRUE))
      stop(sprintf('unable to find variable "%s"', sym))
    val <- get(sym, envir, inherits=TRUE)
    if (is.function(val) &&
        (identical(environment(val), .GlobalEnv) ||
         identical(environment(val), envir))) {
      # Changing this function's environment to exportenv allows it to
      # access/execute any other functions defined in exportenv.
      environment(val) <- exportenv
    }
    assign(sym, val, pos = exportenv, inherits = FALSE)
  }


# A simple initialization + evaluation function run by the worker  This loads required packages, optionally
# sets a parent package  environment (when called from *within* a package namespace),  and evaluates the
# expression. It does not set a random seed, something you probably would want to do in an actual adapter
# for real-world use (via, for example, R's L'Ecuyer RNG stream).
  taskInit <- function(expr, envir, packages, parentenv, args)
  {
    function() {
      for (p in packages) library(p, character.only = TRUE)
      parent.env(envir) <- if(is.null(parentenv)) globalenv() else
        getNamespace(parentenv)
      Map(function(a) {
        Map(function(n) assign(n, a[[n]], pos = envir), names(a))
        serialize(eval(expr, envir), NULL)  # the result for this set of iterations
      }, args)
    }
  }

# Proceed through the loop in chunks of arguments at most 'chunksize' specified in the `setDoPar` function
# above. Note that we could have used callr's  `r` function to directly call `taskInit` and retrieve the
# result; the serialization steps here are in fact redundant.  But in a more typical setting, serialization
# is probably desired (the coordinator simply hands out  binary blobs to the workers for evaluation).
  for(i in seq(ceiling(length(argsList) / data$chunkSize))) {
    idx <-seq(from = (i - 1) * data$chunkSize + 1, to = min(i * data$chunkSize, length(argsList)))
    go <- serialize(taskInit(expr, exportenv, packages, packageName(envir), argsList[idx]), NULL)
    val <- Map(unserialize, callr::r(function(blob) unserialize(blob)(), args = list(blob = go), show = TRUE))
    accumulator(val, idx) # basically, invoke the .combine function
    i <- i + 1
  }
# return the accumulated results
  getResult(it)
}

Let’s try it out!

The following example registers out simple ‘doR’ adapter with chunkSize=1 which means each R worker process will process a single loop iteration. We accumulate results through R’s concatenate function with the option .combine = c. Each loop iteration returns the system process ID of the R worker process.

registerDoR(chunkSize = 1)

foreach(j = 1:3, .combine=c)  %dopar% Sys.getpid()
## [1] 17286 17296 17306

Note that there are three distinct process IDs, indicating that each loop iteration was evaluated by a separate R worker process. Registering a different chunkSize value lets us change that at run time.

registerDoR(chunkSize = 3)

foreach(j = 1:3, .combine=c)  %dopar% Sys.getpid()
## [1] 17316 17316 17316

Where to go from here

The biggest omission in the simple example above is handling of random number streams across the loop iterations. Fortunately, R has excellent support for high-quality reproducible distributed random number generation based on methods developed by L’Ecuyer. The full implementation details require a modicum of effort. I recommend checking out the ‘doRedis’ package source code (https://github.com/bwlewis/doRedis) for an example implementation. The doRedis adapter generates reproducible random number sequences independent of the number of back-end worker processes and number of loop iterations evaluted per process.