diff --git a/.Rbuildignore b/.Rbuildignore index 7ab339c5..1e3ae1ff 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -12,3 +12,5 @@ ^_pkgdown\.yml$ ^README.RMD$ ^.github$ +registry/ +^\.lintr$ diff --git a/.lintr b/.lintr new file mode 100644 index 00000000..b3e9a225 --- /dev/null +++ b/.lintr @@ -0,0 +1,19 @@ +linters: linters_with_defaults( + # lintr defaults: https://lintr.r-lib.org/reference/default_linters.html + # the following setup changes/removes certain linters + assignment_linter = NULL, # do not force using <- for assignments + object_name_linter(c("snake_case", "CamelCase")), # only allow snake case and camel case object names + commented_code_linter = NULL, # allow code in comments + line_length_linter(200L), + object_length_linter(40L), + undesirable_function_linter(fun = c( + # base messaging + cat = "use catf()", + stop = "use stopf()", + warning = "use warningf()", + message = "use messagef()", + # perf + ifelse = "use fifelse()", + rank = "use frank()" + )) + ) diff --git a/DESCRIPTION b/DESCRIPTION index 63043bd7..b07baafb 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -52,6 +52,7 @@ Suggests: foreach, future, future.batchtools, + jsonlite, knitr, parallelMap, ranger, @@ -66,4 +67,4 @@ ByteCompile: yes Encoding: UTF-8 NeedsCompilation: yes Roxygen: list(r6 = FALSE) -RoxygenNote: 7.3.2 +RoxygenNote: 7.3.3 diff --git a/NAMESPACE b/NAMESPACE index e77c7b0a..0d4f87da 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -83,6 +83,7 @@ export(loadResult) export(lpt) export(makeClusterFunctions) export(makeClusterFunctionsDocker) +export(makeClusterFunctionsHyperQueue) export(makeClusterFunctionsInteractive) export(makeClusterFunctionsLSF) export(makeClusterFunctionsMulticore) diff --git a/R/clusterFunctionsHyperQueue.R b/R/clusterFunctionsHyperQueue.R new file mode 100644 index 00000000..4bd119c2 --- /dev/null +++ b/R/clusterFunctionsHyperQueue.R @@ -0,0 +1,90 @@ +#' @title ClusterFunctions for HyperQueue +#' +#' @description +#' Cluster functions for HyperQueue (\url{https://it4innovations.github.io/hyperqueue/stable/}). +#' +#' Jobs are submitted via the HyperQueue CLI using \code{hq submit} and executed by calling \code{Rscript -e "batchtools::doJobCollection(...)"}. +#' The job name is set to the job hash and logs are handled internally by batchtools. +#' Listing jobs uses \code{hq job list} and cancelling jobs uses \code{hq job cancel}. +#' A running HyperQueue server and workers are required. +#' +#' @inheritParams makeClusterFunctions +#' @return [ClusterFunctions]. +#' @family ClusterFunctions +#' @export +makeClusterFunctionsHyperQueue = function(scheduler.latency = 1, fs.latency = 65) { + submitJob = function(reg, jc) { + assertRegistry(reg, writeable = TRUE) + assertClass(jc, "JobCollection") + + ncpus = if (!is.null(jc$resources$ncpus)) sprintf("--cpus=%i", jc$resources$ncpus) + memory = if (!is.null(jc$resources$memory)) sprintf("--resource mem=%iMiB", jc$resources$memory) + walltime = if (!is.null(jc$resources$walltime)) sprintf("--time-limit=%is", jc$resources$walltime) + + args = c( + "submit", + sprintf("--name=%s", jc$job.hash), + # hyperqueue cannot write stdout and stderr to the same file + "--stdout=none", + "--stderr=none", + ncpus, + memory, + walltime, + "--", + "Rscript", "-e", + shQuote(sprintf("batchtools::doJobCollection('%s', '%s')", jc$uri, jc$log.file)) + ) + res = runOSCommand("hq", args) + if (res$exit.code > 0L) { + return(cfHandleUnknownSubmitError("hq", res$exit.code, res$output)) + } + batch_ids = sub(".*job ID: ([0-9]+).*", "\\1", res$output) + makeSubmitJobResult(status = 0L, batch.id = batch_ids) + } + + killJob = function(reg, batch.id) { + assertRegistry(reg, writeable = TRUE) + assertString(batch.id) + args = c("job", "cancel", batch.id) + res = runOSCommand("hq", args) + if (res$exit.code > 0L) { + OSError("Killing of job failed", res) + } + makeSubmitJobResult(status = 0L, batch.id = batch.id) + } + + + listJobsQueued = function(reg) { + requireNamespace("jsonlite") + assertRegistry(reg, writeable = FALSE) + args = c("job", "list", "--filter", "waiting", "--output-mode", "json") + res = runOSCommand("hq", args) + if (res$exit.code > 0L) { + OSError("Listing of jobs failed", res) + } + jobs = jsonlite::fromJSON(res$output) + as.character(jobs$id) + } + + listJobsRunning = function(reg) { + requireNamespace("jsonlite") + assertRegistry(reg, writeable = FALSE) + args = c("job", "list", "--filter", "running", "--output-mode", "json") + res = runOSCommand("hq", args) + if (res$exit.code > 0L) { + OSError("Listing of jobs failed", res) + } + jobs = jsonlite::fromJSON(res$output) + as.character(jobs$id) + } + + makeClusterFunctions( + name = "HyperQueue", + submitJob = submitJob, + killJob = killJob, + listJobsRunning = listJobsRunning, + listJobsQueued = listJobsQueued, + store.job.collection = TRUE, + scheduler.latency = scheduler.latency, + fs.latency = fs.latency) +} diff --git a/man/makeClusterFunctions.Rd b/man/makeClusterFunctions.Rd index 5df5dff2..4fd9334a 100644 --- a/man/makeClusterFunctions.Rd +++ b/man/makeClusterFunctions.Rd @@ -83,6 +83,7 @@ with the package. \seealso{ Other ClusterFunctions: \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/man/makeClusterFunctionsDocker.Rd b/man/makeClusterFunctionsDocker.Rd index 35e70811..a46367eb 100644 --- a/man/makeClusterFunctionsDocker.Rd +++ b/man/makeClusterFunctionsDocker.Rd @@ -61,6 +61,7 @@ containers manually (or usa a cron job). \seealso{ Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/man/makeClusterFunctionsHyperQueue.Rd b/man/makeClusterFunctionsHyperQueue.Rd new file mode 100644 index 00000000..c9dc83cf --- /dev/null +++ b/man/makeClusterFunctionsHyperQueue.Rd @@ -0,0 +1,45 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/clusterFunctionsHyperQueue.R +\name{makeClusterFunctionsHyperQueue} +\alias{makeClusterFunctionsHyperQueue} +\title{ClusterFunctions for HyperQueue} +\usage{ +makeClusterFunctionsHyperQueue(scheduler.latency = 1, fs.latency = 65) +} +\arguments{ +\item{scheduler.latency}{[\code{numeric(1)}]\cr +Time to sleep after important interactions with the scheduler to ensure a sane state. +Currently only triggered after calling \code{\link{submitJobs}}.} + +\item{fs.latency}{[\code{numeric(1)}]\cr +Expected maximum latency of the file system, in seconds. +Set to a positive number for network file systems like NFS which enables more robust (but also more expensive) mechanisms to +access files and directories. +Usually safe to set to \code{0} to disable the heuristic, e.g. if you are working on a local file system.} +} +\value{ +[ClusterFunctions]. +} +\description{ +Cluster functions for HyperQueue (\url{https://it4innovations.github.io/hyperqueue/stable/}). + +Jobs are submitted via the HyperQueue CLI using \code{hq submit} and executed by calling \code{Rscript -e "batchtools::doJobCollection(...)"}. +The job name is set to the job hash and logs are handled internally by batchtools. +Listing jobs uses \code{hq job list} and cancelling jobs uses \code{hq job cancel}. +A running HyperQueue server and workers are required. +} +\seealso{ +Other ClusterFunctions: +\code{\link{makeClusterFunctions}()}, +\code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsInteractive}()}, +\code{\link{makeClusterFunctionsLSF}()}, +\code{\link{makeClusterFunctionsMulticore}()}, +\code{\link{makeClusterFunctionsOpenLava}()}, +\code{\link{makeClusterFunctionsSGE}()}, +\code{\link{makeClusterFunctionsSSH}()}, +\code{\link{makeClusterFunctionsSlurm}()}, +\code{\link{makeClusterFunctionsSocket}()}, +\code{\link{makeClusterFunctionsTORQUE}()} +} +\concept{ClusterFunctions} diff --git a/man/makeClusterFunctionsInteractive.Rd b/man/makeClusterFunctionsInteractive.Rd index 275e45e8..7a8ecf9d 100644 --- a/man/makeClusterFunctionsInteractive.Rd +++ b/man/makeClusterFunctionsInteractive.Rd @@ -42,6 +42,7 @@ and \code{killJob} is not implemented for the same reasons. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, \code{\link{makeClusterFunctionsOpenLava}()}, diff --git a/man/makeClusterFunctionsLSF.Rd b/man/makeClusterFunctionsLSF.Rd index 577b0bf5..75289bbb 100644 --- a/man/makeClusterFunctionsLSF.Rd +++ b/man/makeClusterFunctionsLSF.Rd @@ -56,6 +56,7 @@ Array jobs are currently not supported. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsMulticore}()}, \code{\link{makeClusterFunctionsOpenLava}()}, diff --git a/man/makeClusterFunctionsMulticore.Rd b/man/makeClusterFunctionsMulticore.Rd index 8101b27a..60b27e65 100644 --- a/man/makeClusterFunctionsMulticore.Rd +++ b/man/makeClusterFunctionsMulticore.Rd @@ -29,6 +29,7 @@ Does not work on Windows, use \code{\link{makeClusterFunctionsSocket}} instead. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsOpenLava}()}, diff --git a/man/makeClusterFunctionsOpenLava.Rd b/man/makeClusterFunctionsOpenLava.Rd index c08b6e36..90f1d33c 100644 --- a/man/makeClusterFunctionsOpenLava.Rd +++ b/man/makeClusterFunctionsOpenLava.Rd @@ -56,6 +56,7 @@ Array jobs are currently not supported. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/man/makeClusterFunctionsSGE.Rd b/man/makeClusterFunctionsSGE.Rd index ddf8f925..0863eb72 100644 --- a/man/makeClusterFunctionsSGE.Rd +++ b/man/makeClusterFunctionsSGE.Rd @@ -66,6 +66,7 @@ Array jobs are currently not supported. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/man/makeClusterFunctionsSSH.Rd b/man/makeClusterFunctionsSSH.Rd index ef8dfe87..1d13a7a2 100644 --- a/man/makeClusterFunctionsSSH.Rd +++ b/man/makeClusterFunctionsSSH.Rd @@ -40,6 +40,7 @@ makeClusterFunctionsSSH(list(Worker$new("localhost", ncpus = 2))) Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/man/makeClusterFunctionsSlurm.Rd b/man/makeClusterFunctionsSlurm.Rd index 9448b474..6e3cd88b 100644 --- a/man/makeClusterFunctionsSlurm.Rd +++ b/man/makeClusterFunctionsSlurm.Rd @@ -69,6 +69,7 @@ otherwise the commands for listing and killing jobs will not work. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/man/makeClusterFunctionsSocket.Rd b/man/makeClusterFunctionsSocket.Rd index 88f98011..3d3dd3bd 100644 --- a/man/makeClusterFunctionsSocket.Rd +++ b/man/makeClusterFunctionsSocket.Rd @@ -28,6 +28,7 @@ Jobs are spawned asynchronously using the package \pkg{snow}. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/man/makeClusterFunctionsTORQUE.Rd b/man/makeClusterFunctionsTORQUE.Rd index da4b9a9c..e5bfe8a5 100644 --- a/man/makeClusterFunctionsTORQUE.Rd +++ b/man/makeClusterFunctionsTORQUE.Rd @@ -52,6 +52,7 @@ allocations. Other ClusterFunctions: \code{\link{makeClusterFunctions}()}, \code{\link{makeClusterFunctionsDocker}()}, +\code{\link{makeClusterFunctionsHyperQueue}()}, \code{\link{makeClusterFunctionsInteractive}()}, \code{\link{makeClusterFunctionsLSF}()}, \code{\link{makeClusterFunctionsMulticore}()}, diff --git a/tests/testthat/test_ClusterFunctionHyperQueue.R b/tests/testthat/test_ClusterFunctionHyperQueue.R new file mode 100644 index 00000000..c0a13509 --- /dev/null +++ b/tests/testthat/test_ClusterFunctionHyperQueue.R @@ -0,0 +1,54 @@ +test_that("clusterFunctionsHyperQueue", { + skip_if(TRUE) + skip_on_ci() + skip_on_cran() + + reg = makeTestRegistry() + reg$cluster.functions = makeClusterFunctionsHyperQueue() + saveRegistry(reg) + fun = function(x) { + Sys.sleep(5) + TRUE + } + ids = batchMap(fun, x = c(5, 5), reg = reg) + submitJobs(1:2, reg = reg) + waitForJobs(ids = ids, reg = reg) + + expect_data_table(findJobs(ids = ids, reg = reg), nrow = 2) + expect_data_table(findRunning(reg = reg), nrow = 0L) +}) + +test_that("clusterFunctionsHyperQueue: killJob", { + skip_if(TRUE) + skip_on_ci() + skip_on_cran() + + reg = makeTestRegistry() + reg$cluster.functions = makeClusterFunctionsHyperQueue() + saveRegistry(reg) + fun = function(x) { Sys.sleep(5); TRUE } + ids = batchMap(fun, x = c(5, 5), reg = reg) + submitJobs(1:2, reg = reg) + Sys.sleep(1) + expect_data_table(killJobs(1, reg = reg), nrow = 1) +}) + +test_that("clusterFunctionsHyperQueue with resources", { + skip_if(TRUE) + skip_on_ci() + skip_on_cran() + + reg = makeTestRegistry() + reg$cluster.functions = makeClusterFunctionsHyperQueue() + saveRegistry(reg) + fun = function(x) { + Sys.sleep(5) + TRUE + } + ids = batchMap(fun, x = c(5, 5), reg = reg) + submitJobs(1:2, reg = reg, resources = list(ncpus = 2, walltime = 10, memory = 5)) + waitForJobs(ids = ids, reg = reg) + + expect_data_table(findJobs(ids = ids, reg = reg), nrow = 2) + expect_data_table(findRunning(reg = reg), nrow = 0L) +})