diff --git a/R/files.R b/R/files.R index e64690d6..ab5ddac4 100644 --- a/R/files.R +++ b/R/files.R @@ -42,9 +42,28 @@ file_mtime = function(x) { fs::file_info(x)$modification_time } -writeRDS = function(object, file, compress = "gzip") { - file_remove(file) - saveRDS(object, file = file, version = 2L, compress = compress) - waitForFile(file, 300) +writeRDS = function(object, file, compress = "gzip", wait = 300) { + # (a) Write to *.rds.tmp + tmp_file <- sprintf("%s.tmp", file) + saveRDS(object, file = tmp_file, version = 2L, compress = compress) + + # (b) Wait for it to be found + if (wait > 0) waitForFile(tmp_file, timeout = wait) + + # (c) Assert file exists + if (!file_test("-f", tmp_file)) { + stop(sprintf("Failed to save to temporary RDS file: %s", sQuote(tmp_file))) + } + + # (d) Remove old file, if it exists + if (file_test("-f", file)) file_remove(file) + + # (e) Rename *.rds.tmp to *.rds + file.rename(tmp_file, file) + if (!file_test("-f", file)) { + stop(sprintf("Failed to rename temporarily saved RDS file: %s -> %s", + sQuote(tmp_file), sQuote(file))) + } + invisible(TRUE) } diff --git a/R/sweepRegistry.R b/R/sweepRegistry.R index 2467c58a..1e6efbad 100644 --- a/R/sweepRegistry.R +++ b/R/sweepRegistry.R @@ -14,7 +14,7 @@ sweepRegistry = function(reg = getDefaultRegistry()) { submitted = reg$status[.findSubmitted(reg = reg), c("job.id", "job.hash")] obsolete = chsetdiff( - list.files(dir(reg, "results"), full.names = TRUE), + list.files(dir(reg, "results"), pattern = "\\.rds$", full.names = TRUE), getResultFiles(reg, submitted) ) if (length(obsolete)) { @@ -31,7 +31,7 @@ sweepRegistry = function(reg = getDefaultRegistry()) { fs::file_delete(obsolete) } - obsolete = list.files(dir(reg, "jobs"), pattern = "\\.rds", full.names = TRUE) + obsolete = list.files(dir(reg, "jobs"), pattern = "\\.rds$", full.names = TRUE) if (length(obsolete)) { info("Removing %i obsolete job collection files ...", length(obsolete)) fs::file_delete(obsolete) diff --git a/R/syncRegistry.R b/R/syncRegistry.R index cee150ed..d28a0180 100644 --- a/R/syncRegistry.R +++ b/R/syncRegistry.R @@ -21,7 +21,7 @@ syncRegistry = function(reg = getDefaultRegistry()) { sync = function(reg) { "!DEBUG [syncRegistry]: Triggered syncRegistry" - fns = list.files(dir(reg, "updates"), full.names = TRUE) + fns = list.files(dir(reg, "updates"), pattern = "\\.rds$", full.names = TRUE) if (length(fns) == 0L) return(character()) diff --git a/R/updateRegisty.R b/R/updateRegisty.R index 47680e54..402ae845 100644 --- a/R/updateRegisty.R +++ b/R/updateRegisty.R @@ -78,7 +78,7 @@ updateRegistry = function(reg = getDefaultRegistry()) { # nocov start uri = getProblemURI(reg, id) p = readRDS(uri) p$cache = FALSE - saveRDS(p, file = uri, version = 2L) + writeRDS(p, file = uri, compress = TRUE, wait = 0) } } } @@ -89,7 +89,7 @@ updateRegistry = function(reg = getDefaultRegistry()) { # nocov start setnames(reg$status, "memory", "mem.used") } - fns = list.files(dir(reg, "updates"), full.names = TRUE) + fns = list.files(dir(reg, "updates"), pattern = "\\.rds$", full.names = TRUE) if (length(fns) > 0L) { info("Renaming memory column in update files") updates = lapply(fns, function(fn) { @@ -99,7 +99,7 @@ updateRegistry = function(reg = getDefaultRegistry()) { # nocov start } else { if (hasName(x, "memory")) { setnames(x, "memory", "mem.used") - saveRDS(x, file = fn, version = 2L) + writeRDS(x, file = fn, compress = TRUE, wait = 0) } } })