The slurmR
package provides wrappers and tools for
integrating R with the HPC workload manager Slurm. Overall, there are two
different approaches to do so, either using Socket clusters, in essence,
following the workflow of CRAN’s parallel
package, or using
Job arrays, which are a different implementation of the same idea behind
the par*apply
functions in the parallel
package, which, at times, can be more powerful.
Another important component of slurmR
is
makeSlurmCluster
function. This allow users creating
multi-node PSOCKCluster class objects. The implementation of this
function, wrapper of parallel::makePSOCKcluster
, is very
simple:
It submits a job to Slurm requesting the desired number of tasks. Each task will then return information regarding the node at which it is operating.
Once Slurm allocates the resources, the master R session (from which the job was submitted) will read in the node names returned by each task.
With the full list of nodenames in usage,
makeSlurmCluster
will pass the list of names to
parallel::makePSOCKcluster
, which ultimately creates the
cluster
class object.
After creating the cluster object, the workflow is exactly the same
as with the parallel
package. Here is an example from the
makeSlurmCluster
manual
# Creating a cluster with 100 workers/offpring/child R sessions
cl <- makeSlurmCluster(100)
# Computing the mean of a 100 random uniforms within each worker
# for this we can use any of the function available in the parallel package.
ans <- parSapply(cl, 1:200, function(x) mean(runif(100)))
# We simply call stopCluster as we would do with any other cluster
# object
stopCluster(cl)
Whenever Slurm_lapply
, Slurm_sapply
, or
Slurm_Map
are called, a lot of things happen under the
hood. What the user does not see is the way in which slurmR
sets us a job and submits it to the queue.
Just like rslurm
, slurmR
has two levels of
job distribution: first, Slurm Jobs, and second, within each job via
parallel::mclapply
and parallel::mcMap
(task
forking). In general, the function Slurm_*
is implemented
as follows:
List whatever R packages are loaded, including the path to the R package.
List all the objects passed via ellipsis (...
), and,
together with X
and FUN
or f
,
save them at [tmp_path]/[job_name]/
as
[object-name].rds
.
Write out the corresponding R script and Slurm bash file, and
save them as [tmp_path]/[job_name]/00-rscript.r
, and
[tmp_path]/[job_name]/01-bash.sh
respectively.
If plan = "collect"
(the default), the job will be
submitted to the queue via sbatch()
, and the function will
wait until is flagged as completed by Slurm.
Once sbatch()
is called, a Job Array will be
submitted in which each R job will lunch up to mc.cores
forked processes (second layer of palatalization)
Once it is done, the the results can be collected using
Slurm_collect
, which happens automatically if the user set
plan = "collect"
.
The next section discusses some advantages of submitting jobs using socket clusters versus job arrays.
While socket clusters, created via makePSOCKcluster
or,
in the case of slurmR, via makeSlurmCluster
, may be more
efficient in terms of data communication1, using job arrays has
some important benefits over socket cluster:
The number of workers can be much higher than clusters with the parallel package.2 Users needing to work with hundreds or thousands of jobs/instances may need to use job arrays instead.
If part of the job fails due to a failure of one of the tasks in the array, the job can be easily resubmitted. The same is not necessarily true for socket clusters.
Job arrays can run independently from the main session that started the job. This means that, if for some reason the main session crashes or stops, the job arrays will continue working regardless, and what’s more, the results can be collected anyway.
We would like to implement a simulation algorithm to be run in a cluster. In this case, we have the very simple function we would like to parallelize:
This simple function generates an estimate of Pi. This approximation is based on the following observation
$$ \mbox{Area} = \pi\times r^2 \implies \frac{Area}{r^2} = \pi $$
Since we know what r is, we just need to get an estimate of the Area to obtain an approximation of π. A rather simple way of doing so is with Monte Carlo simulations, in particular, sampling points in a unit square. The proportion of points that fall within the unit circle, i.e. the proportion of points whose distance to the origin is smaller than the radius of the circle, has an expected value equal to the area of its circumscribed circle (for more details, check out the Wikipedia article about this topic here).
Using parallel::mclapply
, we could just type
Which estimates pi using a single node(computer).
In the case of job arrays, we can use the Slurm_lapply
function implemented in the package. Before submitting a job to the
queue, we need to specify some options that are needed to create it:
tmp_path
: A path to a directory to which all
computing nodes of the cluster have read+write access.
job_name
: The name of the job, passed to
sbatch
via the job-name
flag. This will also
be used as the name of the folder that is created within
tmp_path
.
Ultimately, all the objects saved by the job will be located in the
path defined by tmp_path
/job_name
.
library(slurmR)
# Setting required parameters
opts_slurmR$set_tmp_path("/stagging/slurmr-jobs/")
opts_slurmR$set_job_name("simulating-pi")
Moreover, we can specify more options to be set as default options for all the jobs submitted for the current session. For example, we can set the default partition and account as follows:
# Optional parameters are set via set_opts
opts_slurmR$set_opts(partition="conti", account="lc_dvc")
A comprehensive list of options can be found here. To see what are
all the current defaults, we can just print the opts_slurmR
object:
opts_slurmR
##
## Options for sbatch (Slurm workflow):
## partition : conti
## account : lc_dvc
## job-name : simulating-pi
##
## Preamble:
## n/a
##
## Other options (R workflow):
## tmp_path : /stagging/slurmr-jobs/
## cmd : sbatch
## verbose : FALSE
## debug : FALSE
##
## To get and set options for Slurm jobs creation use (see ?opts_slurmR):
##
## debug_off : function ()
## debug_on : function ()
## get_cmd : function ()
## get_debug : function ()
## get_job_name : function (check = TRUE)
## get_opts_job : function (...)
## get_opts_r : function (...)
## get_preamble : function ()
## get_tmp_path : function ()
## get_verbose : function ()
## reset : function ()
## set_job_name : function (name)
## set_opts : function (...)
## set_preamble : function (...)
## set_tmp_path : function (path = Sys.getenv("SLURMR_TMP_PATH", getwd()))
## verbose_off : function ()
## verbose_on : function ()
Once we have specified all the needed options, we can do our
Slurm_lapply
call and submit the job to the queue as
follows:
If plan = "wait"
, then Slurm_lapply
will
return once the job is done (or failed). To collect the results we can
use the Slurm_collect
function:
Alternatively, we could have collected the results on the fly by
telling slurmR
that the plan is to "collect"
the results:
ans <- Slurm_lapply(rep(1e6, 100), simpi, njobs=10, mc.cores=10, plan = "collect")
mean(unlist(job))
This way Slurm_lapply
will do the
Slurm_collect
call before returning.
Another way to do this is using parallel::parLapply
with
a multi-node socket cluster.3 To do this, we can use the
makeSlurmCluster
function and proceed as follows:
Once we are done with the calculations, we can stop the cluster
object by simply calling the stopCluster
function:
And slurmR
will kill the job (and thus, the socket
connections) calling scancel
.
Data transfering on Socket clusters is done using
serialization with the serialize
and
unserialize
functions. This way, data is sent directly
through the connection. In the case of job arrays, data is sent using
saveRDS
and readRDS
which involves I/O on the
disk.↩︎
The current default configuration of R does not allow
having more than 128 connections simulatenously (see
?connection
). This can be changed during installation
time.↩︎
In general, Slurm will try to allocate multiple tasks in the same node (machine). But if no node with that many resources is available, the tasks will span multiple nodes.↩︎