Skip to content

Commit 1d7aee1

Browse files
authored
feat: Add cache auto-purge and --old arg to cache clean command (#76)
* Add cache auto-purge, add --old arg to cache clean command * Apply various suggestions * Apply more suggestions * Add `from_bool` method to DeleteFilter enum * Rework `auto_purge` method
1 parent 726bd96 commit 1d7aee1

File tree

11 files changed

+192
-67
lines changed

11 files changed

+192
-67
lines changed

extra/completions/_stackablectl

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extra/completions/stackablectl.bash

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extra/completions/stackablectl.fish

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/stackable-cockpit/src/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ pub const DEFAULT_NAMESPACE: &str = "default";
88

99
pub const DEFAULT_LOCAL_CLUSTER_NAME: &str = "stackable-data-platform";
1010

11+
pub const DEFAULT_AUTO_PURGE_INTERVAL: Duration = Duration::from_secs(60 * 15); // 15 minutes
1112
pub const DEFAULT_CACHE_MAX_AGE: Duration = Duration::from_secs(60 * 60); // One hour
13+
pub const CACHE_LAST_AUTO_PURGE_FILEPATH: &str = ".cache-last-purge";
14+
pub const CACHE_PROTECTED_FILES: &[&str] = &[".cache-last-purge"];
1215

1316
pub const HELM_REPO_NAME_STABLE: &str = "stackable-stable";
1417
pub const HELM_REPO_NAME_TEST: &str = "stackable-test";

rust/stackable-cockpit/src/xfer/cache.rs

Lines changed: 126 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
use std::{
2+
ffi::OsString,
3+
num::ParseIntError,
24
path::{Path, PathBuf},
3-
time::{Duration, SystemTime, SystemTimeError},
5+
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
46
};
57

68
use sha2::{Digest, Sha256};
79
use snafu::{ResultExt, Snafu};
810
use tokio::{fs, io};
11+
use tracing::debug;
912
use url::Url;
1013

11-
use crate::constants::DEFAULT_CACHE_MAX_AGE;
14+
use crate::constants::{
15+
CACHE_LAST_AUTO_PURGE_FILEPATH, CACHE_PROTECTED_FILES, DEFAULT_AUTO_PURGE_INTERVAL,
16+
DEFAULT_CACHE_MAX_AGE,
17+
};
1218

1319
pub type Result<T, E = Error> = std::result::Result<T, E>;
1420

@@ -20,12 +26,16 @@ pub enum Error {
2026
#[snafu(display("system time error"))]
2127
SystemTimeError { source: SystemTimeError },
2228

29+
#[snafu(display("failed to parse string as integer"))]
30+
ParseIntError { source: ParseIntError },
31+
2332
#[snafu(display("tried to write file with disabled cache"))]
2433
WriteDisabled,
2534
}
2635

2736
#[derive(Debug)]
2837
pub struct Cache {
38+
pub(crate) auto_purge_interval: Duration,
2939
pub(crate) backend: CacheBackend,
3040
pub(crate) max_age: Duration,
3141
}
@@ -97,18 +107,16 @@ impl Cache {
97107
let mut entries = fs::read_dir(base_path).await.context(CacheIoSnafu)?;
98108

99109
while let Some(entry) = entries.next_entry().await.context(CacheIoSnafu)? {
100-
files.push((
101-
entry.path(),
102-
entry
103-
.metadata()
104-
.await
105-
.context(CacheIoSnafu)?
106-
.modified()
107-
.context(CacheIoSnafu)?,
108-
))
110+
let metadata = entry.metadata().await.context(CacheIoSnafu)?;
111+
112+
// Skip protected files
113+
if is_protected_file(entry.file_name()) {
114+
continue;
115+
}
116+
117+
files.push((entry.path(), metadata.modified().context(CacheIoSnafu)?))
109118
}
110119

111-
files.sort();
112120
Ok(files)
113121
}
114122
CacheBackend::Disabled => Ok(vec![]),
@@ -117,18 +125,80 @@ impl Cache {
117125

118126
/// Removes all cached files by deleting the base cache folder and then
119127
/// recreating it.
120-
pub async fn purge(&self) -> Result<()> {
128+
pub async fn purge(&self, delete_filter: DeleteFilter) -> Result<()> {
121129
match &self.backend {
122130
CacheBackend::Disk { base_path } => {
123-
fs::remove_dir_all(base_path).await.context(CacheIoSnafu)?;
124-
fs::create_dir_all(base_path).await.context(CacheIoSnafu)
131+
let mut entries = fs::read_dir(base_path).await.context(CacheIoSnafu)?;
132+
133+
while let Some(entry) = entries.next_entry().await.context(CacheIoSnafu)? {
134+
let metadata = entry.metadata().await.context(CacheIoSnafu)?;
135+
136+
let should_delete_file = match delete_filter {
137+
// Skip protected files
138+
_ if is_protected_file(entry.file_name()) => false,
139+
140+
// Without --old / --outdated
141+
DeleteFilter::All => true,
142+
// with --old/--outdated
143+
DeleteFilter::OnlyExpired => {
144+
metadata
145+
.modified()
146+
.context(CacheIoSnafu)?
147+
.elapsed()
148+
.context(SystemTimeSnafu)?
149+
> self.max_age
150+
}
151+
};
152+
153+
if should_delete_file {
154+
fs::remove_file(entry.path()).await.context(CacheIoSnafu)?;
155+
}
156+
}
157+
158+
Ok(())
125159
}
126-
CacheBackend::Disabled => todo!(),
160+
CacheBackend::Disabled => Ok(()),
127161
}
128162
}
129163

130-
fn new(backend: CacheBackend, max_age: Duration) -> Self {
131-
Self { backend, max_age }
164+
pub async fn auto_purge(&self) -> Result<()> {
165+
match &self.backend {
166+
CacheBackend::Disk { base_path } => {
167+
let cache_auto_purge_filepath = base_path.join(CACHE_LAST_AUTO_PURGE_FILEPATH);
168+
169+
// Read and covert timestamp
170+
let last_purged_at = match fs::read_to_string(&cache_auto_purge_filepath).await {
171+
Ok(timestamp) => Some(
172+
UNIX_EPOCH + Duration::from_secs(timestamp.parse().context(ParseIntSnafu)?),
173+
),
174+
Err(err) if err.kind() == std::io::ErrorKind::NotFound => None,
175+
Err(err) => return Err(err).context(CacheIoSnafu),
176+
};
177+
178+
// If the auto purge interval elapsed, run purge and write
179+
// back the new timestamp
180+
if last_purged_at
181+
.and_then(|ts| ts.elapsed().ok())
182+
.map_or(true, |elapsed| elapsed >= self.auto_purge_interval)
183+
{
184+
debug!("Auto-purging outdated cache files");
185+
186+
self.purge(DeleteFilter::OnlyExpired).await?;
187+
write_cache_auto_purge_file(&cache_auto_purge_filepath).await?;
188+
}
189+
190+
Ok(())
191+
}
192+
CacheBackend::Disabled => Ok(()),
193+
}
194+
}
195+
196+
fn new(backend: CacheBackend, max_age: Duration, auto_purge_interval: Duration) -> Self {
197+
Self {
198+
auto_purge_interval,
199+
backend,
200+
max_age,
201+
}
132202
}
133203

134204
async fn read(file_path: PathBuf) -> Result<String> {
@@ -163,13 +233,15 @@ pub enum CacheStatus<T> {
163233

164234
#[derive(Debug, Clone)]
165235
pub struct CacheSettings {
236+
pub auto_purge_interval: Duration,
166237
pub backend: CacheBackend,
167238
pub max_age: Duration,
168239
}
169240

170241
impl From<CacheBackend> for CacheSettings {
171242
fn from(backend: CacheBackend) -> Self {
172243
Self {
244+
auto_purge_interval: DEFAULT_AUTO_PURGE_INTERVAL,
173245
max_age: DEFAULT_CACHE_MAX_AGE,
174246
backend,
175247
}
@@ -195,9 +267,18 @@ impl CacheSettings {
195267
match &self.backend {
196268
CacheBackend::Disk { base_path } => {
197269
fs::create_dir_all(base_path).await.context(CacheIoSnafu)?;
198-
Ok(Cache::new(self.backend, self.max_age))
270+
271+
Ok(Cache::new(
272+
self.backend,
273+
self.max_age,
274+
self.auto_purge_interval,
275+
))
199276
}
200-
CacheBackend::Disabled => Ok(Cache::new(self.backend, self.max_age)),
277+
CacheBackend::Disabled => Ok(Cache::new(
278+
self.backend,
279+
self.max_age,
280+
self.auto_purge_interval,
281+
)),
201282
}
202283
}
203284
}
@@ -207,3 +288,28 @@ pub enum CacheBackend {
207288
Disk { base_path: PathBuf },
208289
Disabled,
209290
}
291+
292+
pub enum DeleteFilter {
293+
All,
294+
OnlyExpired,
295+
}
296+
297+
async fn write_cache_auto_purge_file(path: &Path) -> Result<()> {
298+
fs::write(
299+
path,
300+
SystemTime::now()
301+
.duration_since(UNIX_EPOCH)
302+
.context(SystemTimeSnafu)?
303+
.as_secs()
304+
.to_string()
305+
.as_bytes(),
306+
)
307+
.await
308+
.context(CacheIoSnafu)
309+
}
310+
311+
fn is_protected_file(filename: OsString) -> bool {
312+
// Non-UTF-8 filenames can't possibly be on the protected list
313+
let Some(filename) = filename.to_str() else { return false; };
314+
CACHE_PROTECTED_FILES.contains(&filename)
315+
}

rust/stackable-cockpit/src/xfer/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ impl FileTransferClient {
5353
Ok(Self { client, cache })
5454
}
5555

56+
pub fn new_with(cache: Cache) -> Self {
57+
let client = reqwest::Client::new();
58+
Self { client, cache }
59+
}
60+
5661
/// Retrieves data from `path_or_url` which can either be a [`PathBuf`]
5762
/// or a [`Url`]. The `processor` defines how the data is processed, for
5863
/// example as plain text data, YAML content or even templated.

rust/stackablectl/src/cmds/cache.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use std::time::Duration;
33
use clap::{Args, Subcommand};
44
use comfy_table::{presets::UTF8_FULL, ColumnConstraint, Table, Width};
55
use snafu::{ResultExt, Snafu};
6-
use stackable_cockpit::xfer::cache;
6+
use stackable_cockpit::xfer::cache::{self, Cache, DeleteFilter};
7+
use tracing::{info, instrument};
78

8-
use crate::cli::{CacheSettingsError, Cli};
9+
use crate::cli::CacheSettingsError;
910

1011
#[derive(Debug, Args)]
1112
pub struct CacheArgs {
@@ -21,7 +22,14 @@ pub enum CacheCommands {
2122

2223
/// Clean cached files
2324
#[command(aliases(["rm", "purge"]))]
24-
Clean,
25+
Clean(CacheCleanArgs),
26+
}
27+
28+
#[derive(Debug, Args)]
29+
pub struct CacheCleanArgs {
30+
/// Only remove outdated files in the cache
31+
#[arg(long = "old", visible_aliases(["outdated"]))]
32+
only_remove_old_files: bool,
2533
}
2634

2735
#[derive(Debug, Snafu)]
@@ -34,17 +42,17 @@ pub enum CacheCmdError {
3442
}
3543

3644
impl CacheArgs {
37-
pub async fn run(&self, common_args: &Cli) -> Result<String, CacheCmdError> {
38-
match self.subcommand {
39-
CacheCommands::List => list_cmd(common_args).await,
40-
CacheCommands::Clean => clean_cmd(common_args).await,
45+
pub async fn run(&self, cache: Cache) -> Result<String, CacheCmdError> {
46+
match &self.subcommand {
47+
CacheCommands::List => list_cmd(cache).await,
48+
CacheCommands::Clean(args) => clean_cmd(args, cache).await,
4149
}
4250
}
4351
}
4452

45-
async fn list_cmd(common_args: &Cli) -> Result<String, CacheCmdError> {
46-
let cache_settings = common_args.cache_settings().context(CacheSettingsSnafu)?;
47-
let cache = cache_settings.try_into_cache().await.context(CacheSnafu)?;
53+
#[instrument(skip_all)]
54+
async fn list_cmd(cache: Cache) -> Result<String, CacheCmdError> {
55+
info!("Listing cached files");
4856

4957
let files = cache.list().await.context(CacheSnafu)?;
5058

@@ -72,10 +80,16 @@ async fn list_cmd(common_args: &Cli) -> Result<String, CacheCmdError> {
7280
Ok(table.to_string())
7381
}
7482

75-
async fn clean_cmd(common_args: &Cli) -> Result<String, CacheCmdError> {
76-
let cache_settings = common_args.cache_settings().context(CacheSettingsSnafu)?;
77-
let cache = cache_settings.try_into_cache().await.context(CacheSnafu)?;
83+
#[instrument(skip_all)]
84+
async fn clean_cmd(args: &CacheCleanArgs, cache: Cache) -> Result<String, CacheCmdError> {
85+
info!("Cleaning cached files");
86+
87+
let delete_filter = if args.only_remove_old_files {
88+
DeleteFilter::OnlyExpired
89+
} else {
90+
DeleteFilter::All
91+
};
7892

79-
cache.purge().await.context(CacheSnafu)?;
93+
cache.purge(delete_filter).await.context(CacheSnafu)?;
8094
Ok("Cleaned cached files".into())
8195
}

rust/stackablectl/src/cmds/demo.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ use stackable_cockpit::{
1313
stack::{StackError, StackList},
1414
},
1515
utils::path::PathOrUrlParseError,
16-
xfer::{FileTransferClient, FileTransferError},
16+
xfer::{cache::Cache, FileTransferClient, FileTransferError},
1717
};
1818
use tracing::{debug, info, instrument};
1919

20-
use crate::cli::{CacheSettingsError, Cli, CommonClusterArgs, CommonClusterArgsError, OutputType};
20+
use crate::cli::{Cli, CommonClusterArgs, CommonClusterArgsError, OutputType};
2121

2222
#[derive(Debug, Args)]
2323
pub struct DemoArgs {
@@ -114,9 +114,6 @@ pub enum DemoCmdError {
114114
#[snafu(display("path/url parse error"))]
115115
PathOrUrlParseError { source: PathOrUrlParseError },
116116

117-
#[snafu(display("cache settings resolution error"), context(false))]
118-
CacheSettingsError { source: CacheSettingsError },
119-
120117
#[snafu(display("cluster argument error"))]
121118
CommonClusterArgsError { source: CommonClusterArgsError },
122119

@@ -126,12 +123,10 @@ pub enum DemoCmdError {
126123

127124
impl DemoArgs {
128125
#[instrument]
129-
pub async fn run(&self, common_args: &Cli) -> Result<String, DemoCmdError> {
126+
pub async fn run(&self, common_args: &Cli, cache: Cache) -> Result<String, DemoCmdError> {
130127
debug!("Handle demo args");
131128

132-
let transfer_client = FileTransferClient::new(common_args.cache_settings()?)
133-
.await
134-
.context(TransferSnafu)?;
129+
let transfer_client = FileTransferClient::new_with(cache);
135130

136131
// Build demo list based on the (default) remote demo file, and additional files provided by the
137132
// STACKABLE_DEMO_FILES env variable or the --demo-files CLI argument.

0 commit comments

Comments
 (0)