Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
^_pkgdown\.yml$
^README.RMD$
^.github$
registry/
^\.lintr$
19 changes: 19 additions & 0 deletions .lintr
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
linters: linters_with_defaults(
# lintr defaults: https://lintr.r-lib.org/reference/default_linters.html
# the following setup changes/removes certain linters
assignment_linter = NULL, # do not force using <- for assignments
object_name_linter(c("snake_case", "CamelCase")), # only allow snake case and camel case object names
commented_code_linter = NULL, # allow code in comments
line_length_linter(200L),
object_length_linter(40L),
undesirable_function_linter(fun = c(
# base messaging
cat = "use catf()",
stop = "use stopf()",
warning = "use warningf()",
message = "use messagef()",
# perf
ifelse = "use fifelse()",
rank = "use frank()"
))
)
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Suggests:
foreach,
future,
future.batchtools,
jsonlite,
knitr,
parallelMap,
ranger,
Expand All @@ -66,4 +67,4 @@ ByteCompile: yes
Encoding: UTF-8
NeedsCompilation: yes
Roxygen: list(r6 = FALSE)
RoxygenNote: 7.3.2
RoxygenNote: 7.3.3
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export(loadResult)
export(lpt)
export(makeClusterFunctions)
export(makeClusterFunctionsDocker)
export(makeClusterFunctionsHyperQueue)
export(makeClusterFunctionsInteractive)
export(makeClusterFunctionsLSF)
export(makeClusterFunctionsMulticore)
Expand Down
90 changes: 90 additions & 0 deletions R/clusterFunctionsHyperQueue.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#' @title ClusterFunctions for HyperQueue
#'
#' @description
#' Cluster functions for HyperQueue (\url{https://it4innovations.github.io/hyperqueue/stable/}).
#'
#' Jobs are submitted via the HyperQueue CLI using \code{hq submit} and executed by calling \code{Rscript -e "batchtools::doJobCollection(...)"}.
#' The job name is set to the job hash and logs are handled internally by batchtools.
#' Listing jobs uses \code{hq job list} and cancelling jobs uses \code{hq job cancel}.
#' A running HyperQueue server and workers are required.
#'
#' @inheritParams makeClusterFunctions
#' @return [ClusterFunctions].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsHyperQueue = function(scheduler.latency = 1, fs.latency = 65) {
submitJob = function(reg, jc) {
assertRegistry(reg, writeable = TRUE)
assertClass(jc, "JobCollection")

ncpus = if (!is.null(jc$resources$ncpus)) sprintf("--cpus=%i", jc$resources$ncpus)
memory = if (!is.null(jc$resources$memory)) sprintf("--resource mem=%iMiB", jc$resources$memory)
walltime = if (!is.null(jc$resources$walltime)) sprintf("--time-limit=%is", jc$resources$walltime)

args = c(
"submit",
sprintf("--name=%s", jc$job.hash),
# hyperqueue cannot write stdout and stderr to the same file
"--stdout=none",
"--stderr=none",
ncpus,
memory,
walltime,
"--",
"Rscript", "-e",
shQuote(sprintf("batchtools::doJobCollection('%s', '%s')", jc$uri, jc$log.file))
)
res = runOSCommand("hq", args)
if (res$exit.code > 0L) {
return(cfHandleUnknownSubmitError("hq", res$exit.code, res$output))
}
batch_ids = sub(".*job ID: ([0-9]+).*", "\\1", res$output)
makeSubmitJobResult(status = 0L, batch.id = batch_ids)
}

killJob = function(reg, batch.id) {
assertRegistry(reg, writeable = TRUE)
assertString(batch.id)
args = c("job", "cancel", batch.id)
res = runOSCommand("hq", args)
if (res$exit.code > 0L) {
OSError("Killing of job failed", res)
}
makeSubmitJobResult(status = 0L, batch.id = batch.id)
}


listJobsQueued = function(reg) {
requireNamespace("jsonlite")
assertRegistry(reg, writeable = FALSE)
args = c("job", "list", "--filter", "waiting", "--output-mode", "json")
res = runOSCommand("hq", args)
if (res$exit.code > 0L) {
OSError("Listing of jobs failed", res)
}
jobs = jsonlite::fromJSON(res$output)
as.character(jobs$id)
}

listJobsRunning = function(reg) {
requireNamespace("jsonlite")
assertRegistry(reg, writeable = FALSE)
args = c("job", "list", "--filter", "running", "--output-mode", "json")
res = runOSCommand("hq", args)
if (res$exit.code > 0L) {
OSError("Listing of jobs failed", res)
}
jobs = jsonlite::fromJSON(res$output)
as.character(jobs$id)
}

makeClusterFunctions(
name = "HyperQueue",
submitJob = submitJob,
killJob = killJob,
listJobsRunning = listJobsRunning,
listJobsQueued = listJobsQueued,
store.job.collection = TRUE,
scheduler.latency = scheduler.latency,
fs.latency = fs.latency)
}
1 change: 1 addition & 0 deletions man/makeClusterFunctions.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsDocker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions man/makeClusterFunctionsHyperQueue.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsInteractive.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsLSF.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsMulticore.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsOpenLava.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsSGE.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsSSH.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsSlurm.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsSocket.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions man/makeClusterFunctionsTORQUE.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions tests/testthat/test_ClusterFunctionHyperQueue.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
test_that("clusterFunctionsHyperQueue", {
skip_if(TRUE)
skip_on_ci()
skip_on_cran()

reg = makeTestRegistry()
reg$cluster.functions = makeClusterFunctionsHyperQueue()
saveRegistry(reg)
fun = function(x) {
Sys.sleep(5)
TRUE
}
ids = batchMap(fun, x = c(5, 5), reg = reg)
submitJobs(1:2, reg = reg)
waitForJobs(ids = ids, reg = reg)

expect_data_table(findJobs(ids = ids, reg = reg), nrow = 2)
expect_data_table(findRunning(reg = reg), nrow = 0L)
})

test_that("clusterFunctionsHyperQueue: killJob", {
skip_if(TRUE)
skip_on_ci()
skip_on_cran()

reg = makeTestRegistry()
reg$cluster.functions = makeClusterFunctionsHyperQueue()
saveRegistry(reg)
fun = function(x) { Sys.sleep(5); TRUE }
ids = batchMap(fun, x = c(5, 5), reg = reg)
submitJobs(1:2, reg = reg)
Sys.sleep(1)
expect_data_table(killJobs(1, reg = reg), nrow = 1)
})

test_that("clusterFunctionsHyperQueue with resources", {
skip_if(TRUE)
skip_on_ci()
skip_on_cran()

reg = makeTestRegistry()
reg$cluster.functions = makeClusterFunctionsHyperQueue()
saveRegistry(reg)
fun = function(x) {
Sys.sleep(5)
TRUE
}
ids = batchMap(fun, x = c(5, 5), reg = reg)
submitJobs(1:2, reg = reg, resources = list(ncpus = 2, walltime = 10, memory = 5))
waitForJobs(ids = ids, reg = reg)

expect_data_table(findJobs(ids = ids, reg = reg), nrow = 2)
expect_data_table(findRunning(reg = reg), nrow = 0L)
})
Loading