Steve Weston’s ‘foreach’ package is a remarkable parallel/distributed computing framework for the R language. Similarly to the parLapply
, clusterMap
, and related functions from R’s ‘parallel’ package, foreach enables computation across multiple CPU cores and computers. Parallel/distributed computation is engaged by using adapter packages that bridge the interface between R and particular back end computing frameworks like MPI or those used by R’s ‘parallel’ package.
Code using foreach runs sequentially in the absence of a parallel adapter, and works uniformly, more or less, across different adapters. Importantly, adapters are registered at run-time, allowing programmers and R package authors to write programs independent of specific parallel computing implementations. Package authors decide which parts of their programs can run in parallel, and users decide how to run in parallel depending on their available resources. This philosophy is shared by the closely-related ‘future’ package.
In the high-performance computing vernacular, the foreach package defines an application programming interface (API) for a remote procedure call, map/reduce framework. We will see specific examples of these concepts below.
Despite the long history of the foreach package, developed in 2008 by Steve and his team at Revolution Computing (including me), and the availability of many supported adapter packages, the internal foreach adapter interface is not very explicitly documented. This guide documents the foreach API and serves as a basic introduction to writing adapter packages.
It should be noted that there are neat ideas in the foreach package aside from the topics covered here, including loop composition and set comprehension-style syntax inspired by Python and Haskell (see the %:%
operator and when
function).
foreach
loopBasic foreach
loops that can take advantage of parallel adapters consist of three parts:
foreach(…) | %dopar% | { R expression } |
foreach
function call foreach
that specifies what is to be mapped to the R expression, and how the results should be reduced.%dopar%
operator foreach
call output with the R expression along with a version of its calling environment, and submits the work to a registered parallel adapter function for evaluation. If no adapter is registered, a default sequential evaluation one is used.Unlike traditional for
loops, each loop iteration within a foreach
loop should be considered independent of the others, similar to the expression evaluations in replicate
or function evaluations in Map
or lapply
. Also unlike traditional for
loops, but again like replicate
(with simplify=FALSE
), the result from each iteration is returned, by default in a list
but optionally combined through a reduction function.
Consider the following really simple example:
system.time(print(
foreach(j = seq(2), .combine=sum) %dopar% {
Sys.sleep(1)
j
}
))
## Warning: executing %dopar% sequentially: no parallel backend registered
## [1] 3
## user system elapsed
## 0.056 0.000 2.061
Note the warning that no parallel backend adapter was registered and the loop has been evaluated sequentially. This loop iterates over a parameter variable j
and the loop expression is evaluated twice, once for each value of j
. The result of each expression evaluation is collected and passed to the sum
function specified by the .combine
option. In other words, the two possible values of j
are mapped to the expression and the returned results are reduced through the sum
function.
Now let’s try a simple parallel adapter using two local CPU cores:
library(doParallel, quietly = TRUE)
registerDoParallel(2)
system.time(print(
foreach(j = seq(2), .combine=sum) %dopar% {
Sys.sleep(1)
j
}
))
## [1] 3
## user system elapsed
## 0.020 0.020 1.028
Note that the run time has been roughly halved as expected thanks to concurrent evaluation of each expression evaluation.
The foreach
function is responsible for setting up important loop details. It returns an R object of class foreach containing details including:
The %dopar%
operator is the de facto foreach parallel adapter API. The %dopar%
operator is responsible for looking up a function associated with the registered parallel adapter and invoking that function with arguments:
foreach
function of class foreachIt’s the responsibility of the parallel adapter function to map the data to the R expression, retrieve the results, and feed the results into an accumulator (reduction) function defined by the foreach
object to return the result.
Foreach adapters supply a registration function to register the adapter with %dopar%
. These functions are usually called registerDo*
, replacing *
with the name of the adapter. The registration function allows users to configure run-time specific adapter details.
Arguments supplied to the foreach
function itself, by comparison, are so-called ‘compile’-time options and are independent of run-time specific behavior.
The foreach (and future package) philosophy is that computed results should be independent of run-time options. Be aware that in practice that can’t always be achieved. Numerical computations, for instance, are subject to the non-associativity of floating point arithmetic and bounded integer values, among other potential pitfalls.
This guide proceeds by building a simple adapter called ‘doR’ designed to be as simple of an example as I could think of. The adapter does not compute its results in parallel, but does illustrate, very simply, most of the important details needed to implement an adapter.
The doR adapter launches worker R processes on the local machine using the ‘callr’ package. Each worker R process evaluates, in sequence, one or more loop iterations depending on a run-time option.
Foreach adapters consist of two main parts: a registration function, and an evaluation function.
Registration declares the name of the evaluation function that %dopar%
will end up calling, along with any extra run-time options your adapter requires. Our simple doR adapter has a single option, chunkSize
that determines the number of loop iterations run per task.
Registration proceeds by calling the setDoPar
function which takes three arguments:
We don’t use the ‘info’ function in our simple example.
registerDoR <- function(chunkSize = 1) {
setDoPar(doR, data = list(chunkSize = chunkSize))
}
We still need to write the doR evaluation function, covered in the next section.
The heart of every foreach adapter is the evaluation function. The %dopar%
API invokes this function with four arguments:
foreach
function callIt’s pretty simple!
The evaluation function is responsible for mapping loop iteration variables to the R expression, and for setting up a foreach ‘accumulator’ function to combine the output and return a result.
The adapter evaluation function needs to perform the following tasks:
The example below passes the iteration variables, environment and expression to the workers, and retrieves results from the workers, in serialized form. Note that this is not required by the ‘callr’ package which can do that for us under the hood. We explicitly use serialized values because typical real-world foreach adapters will need to do that anyway.
Here is the full evaluation function with lots of in-line comments explaining each step:
# The %dopar% API:
# obj foreach object
# expr the expression to evaluate
# envir the local enclosing environment
# data run-time options
doR <- function(obj, expr, envir, data)
{
if (!inherits(obj, "foreach")) stop("obj must be a foreach object")
it <- iter(obj)
# Set up the accumulator function that will process returned results
accumulator <- makeAccum(it)
# Construct a list of iterator variable values
argsList <- as.list(it)
# Set up an environment for export within with to evaluate expr Just in case we're called from within a function,
# we need an environment will all the optional `...` arguments (if any).
exportenv <- tryCatch({
qargs <- quote(list(...))
as.environment(eval(qargs, envir))
}, error = function(e) {
new.env(parent = emptyenv())
})
# The foreach `getexports` function automagically determines a list of R objects required by 'expr'. `getexports`
# is not functional, it modifies its 'exportenv' argument. Beta version of foreach use the `getGlobalsAndPacakges`
# function from the future package. We use that explicitly below to discover required packages. We combine
# those required packages with explicitly listed packages in 'obj' and the optional package parent environment for
# a complete list of require packages.
noexport <- union(obj$noexport, obj$argnames)
getexports(expr, exportenv, envir, bad=noexport)
packages <- unique(c(obj$packages,
future::getGlobalsAndPackages(expr, envir)$packages, obj$parentenv))
# There may still be some additional *explicitly* specified objects to export. These require some additional
# processing, similar to what `getexports` does under the hood. It would be nice to expand the foreach
# `getexports` function to handle these objects too!
export <- setdiff(obj$export, unique(c(ls(exportenv), obj$noexport)))
for (sym in export) {
if (!exists(sym, envir, inherits=TRUE))
stop(sprintf('unable to find variable "%s"', sym))
val <- get(sym, envir, inherits=TRUE)
if (is.function(val) &&
(identical(environment(val), .GlobalEnv) ||
identical(environment(val), envir))) {
# Changing this function's environment to exportenv allows it to
# access/execute any other functions defined in exportenv.
environment(val) <- exportenv
}
assign(sym, val, pos = exportenv, inherits = FALSE)
}
# A simple initialization + evaluation function run by the worker This loads required packages, optionally
# sets a parent package environment (when called from *within* a package namespace), and evaluates the
# expression. It does not set a random seed, something you probably would want to do in an actual adapter
# for real-world use (via, for example, R's L'Ecuyer RNG stream).
taskInit <- function(expr, envir, packages, parentenv, args)
{
function() {
for (p in packages) library(p, character.only = TRUE)
parent.env(envir) <- if(is.null(parentenv)) globalenv() else
getNamespace(parentenv)
Map(function(a) {
Map(function(n) assign(n, a[[n]], pos = envir), names(a))
serialize(eval(expr, envir), NULL) # the result for this set of iterations
}, args)
}
}
# Proceed through the loop in chunks of arguments at most 'chunksize' specified in the `setDoPar` function
# above. Note that we could have used callr's `r` function to directly call `taskInit` and retrieve the
# result; the serialization steps here are in fact redundant. But in a more typical setting, serialization
# is probably desired (the coordinator simply hands out binary blobs to the workers for evaluation).
for(i in seq(ceiling(length(argsList) / data$chunkSize))) {
idx <-seq(from = (i - 1) * data$chunkSize + 1, to = min(i * data$chunkSize, length(argsList)))
go <- serialize(taskInit(expr, exportenv, packages, packageName(envir), argsList[idx]), NULL)
val <- Map(unserialize, callr::r(function(blob) unserialize(blob)(), args = list(blob = go), show = TRUE))
accumulator(val, idx) # basically, invoke the .combine function
i <- i + 1
}
# return the accumulated results
getResult(it)
}
The following example registers out simple ‘doR’ adapter with chunkSize=1
which means each R worker process will process a single loop iteration. We accumulate results through R’s concatenate function with the option .combine = c
. Each loop iteration returns the system process ID of the R worker process.
registerDoR(chunkSize = 1)
foreach(j = 1:3, .combine=c) %dopar% Sys.getpid()
## [1] 17286 17296 17306
Note that there are three distinct process IDs, indicating that each loop iteration was evaluated by a separate R worker process. Registering a different chunkSize
value lets us change that at run time.
registerDoR(chunkSize = 3)
foreach(j = 1:3, .combine=c) %dopar% Sys.getpid()
## [1] 17316 17316 17316
The biggest omission in the simple example above is handling of random number streams across the loop iterations. Fortunately, R has excellent support for high-quality reproducible distributed random number generation based on methods developed by L’Ecuyer. The full implementation details require a modicum of effort. I recommend checking out the ‘doRedis’ package source code (https://github.com/bwlewis/doRedis) for an example implementation. The doRedis adapter generates reproducible random number sequences independent of the number of back-end worker processes and number of loop iterations evaluted per process.