Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions R/files.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,28 @@ file_mtime = function(x) {
fs::file_info(x)$modification_time
}

writeRDS = function(object, file, compress = "gzip") {
file_remove(file)
saveRDS(object, file = file, version = 2L, compress = compress)
waitForFile(file, 300)
writeRDS = function(object, file, compress = "gzip", wait = 300) {
# (a) Write to *.rds.tmp
tmp_file <- sprintf("%s.tmp", file)
saveRDS(object, file = tmp_file, version = 2L, compress = compress)

# (b) Wait for it to be found
if (wait > 0) waitForFile(tmp_file, timeout = wait)

# (c) Assert file exists
if (!file_test("-f", tmp_file)) {
stop(sprintf("Failed to save to temporary RDS file: %s", sQuote(tmp_file)))
}

# (d) Remove old file, if it exists
if (file_test("-f", file)) file_remove(file)

# (e) Rename *.rds.tmp to *.rds
file.rename(tmp_file, file)
if (!file_test("-f", file)) {
stop(sprintf("Failed to rename temporarily saved RDS file: %s -> %s",
sQuote(tmp_file), sQuote(file)))
}

invisible(TRUE)
}
4 changes: 2 additions & 2 deletions R/sweepRegistry.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ sweepRegistry = function(reg = getDefaultRegistry()) {

submitted = reg$status[.findSubmitted(reg = reg), c("job.id", "job.hash")]
obsolete = chsetdiff(
list.files(dir(reg, "results"), full.names = TRUE),
list.files(dir(reg, "results"), pattern = "\\.rds$", full.names = TRUE),
getResultFiles(reg, submitted)
)
if (length(obsolete)) {
Expand All @@ -31,7 +31,7 @@ sweepRegistry = function(reg = getDefaultRegistry()) {
fs::file_delete(obsolete)
}

obsolete = list.files(dir(reg, "jobs"), pattern = "\\.rds", full.names = TRUE)
obsolete = list.files(dir(reg, "jobs"), pattern = "\\.rds$", full.names = TRUE)
if (length(obsolete)) {
info("Removing %i obsolete job collection files ...", length(obsolete))
fs::file_delete(obsolete)
Expand Down
2 changes: 1 addition & 1 deletion R/syncRegistry.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ syncRegistry = function(reg = getDefaultRegistry()) {

sync = function(reg) {
"!DEBUG [syncRegistry]: Triggered syncRegistry"
fns = list.files(dir(reg, "updates"), full.names = TRUE)
fns = list.files(dir(reg, "updates"), pattern = "\\.rds$", full.names = TRUE)
if (length(fns) == 0L)
return(character())

Expand Down
6 changes: 3 additions & 3 deletions R/updateRegisty.R
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ updateRegistry = function(reg = getDefaultRegistry()) { # nocov start
uri = getProblemURI(reg, id)
p = readRDS(uri)
p$cache = FALSE
saveRDS(p, file = uri, version = 2L)
writeRDS(p, file = uri, compress = TRUE, wait = 0)
}
}
}
Expand All @@ -89,7 +89,7 @@ updateRegistry = function(reg = getDefaultRegistry()) { # nocov start
setnames(reg$status, "memory", "mem.used")
}

fns = list.files(dir(reg, "updates"), full.names = TRUE)
fns = list.files(dir(reg, "updates"), pattern = "\\.rds$", full.names = TRUE)
if (length(fns) > 0L) {
info("Renaming memory column in update files")
updates = lapply(fns, function(fn) {
Expand All @@ -99,7 +99,7 @@ updateRegistry = function(reg = getDefaultRegistry()) { # nocov start
} else {
if (hasName(x, "memory")) {
setnames(x, "memory", "mem.used")
saveRDS(x, file = fn, version = 2L)
writeRDS(x, file = fn, compress = TRUE, wait = 0)
}
}
})
Expand Down