Skip to content

Commit 1cd930f

Browse files
authored
fix: flaky parquet cache for real this time (#25426)
This adds a watch channel to check for when prunes have happened on the parquet cache oracle, so we can notify something, like a test, that needs to know when a prune has happened. This should make the cache eviction test in the parquet_cache module not flake out anymore.
1 parent 8ccb580 commit 1cd930f

File tree

1 file changed

+26
-8
lines changed
  • influxdb3_write/src/parquet_cache

1 file changed

+26
-8
lines changed

influxdb3_write/src/parquet_cache/mod.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use object_store::{
2727
use observability_deps::tracing::{error, info, warn};
2828
use tokio::sync::{
2929
mpsc::{channel, Receiver, Sender},
30-
oneshot,
30+
oneshot, watch,
3131
};
3232

3333
/// Shared future type for cache values that are being fetched
@@ -58,6 +58,9 @@ impl CacheRequest {
5858
pub trait ParquetCacheOracle: Send + Sync + Debug {
5959
/// Register a cache request with the oracle
6060
fn register(&self, cache_request: CacheRequest);
61+
62+
// Get a receiver that is notified when a prune takes place and how much memory was freed
63+
fn prune_notifier(&self) -> watch::Receiver<usize>;
6164
}
6265

6366
/// Concrete implementation of the [`ParquetCacheOracle`]
@@ -66,6 +69,7 @@ pub trait ParquetCacheOracle: Send + Sync + Debug {
6669
#[derive(Debug, Clone)]
6770
pub struct MemCacheOracle {
6871
cache_request_tx: Sender<CacheRequest>,
72+
prune_notifier_tx: watch::Sender<usize>,
6973
}
7074

7175
// TODO(trevor): make this configurable with reasonable default
@@ -80,8 +84,12 @@ impl MemCacheOracle {
8084
fn new(mem_cached_store: Arc<MemCachedObjectStore>, prune_interval: Duration) -> Self {
8185
let (cache_request_tx, cache_request_rx) = channel(CACHE_REQUEST_BUFFER_SIZE);
8286
background_cache_request_handler(Arc::clone(&mem_cached_store), cache_request_rx);
83-
background_cache_pruner(mem_cached_store, prune_interval);
84-
Self { cache_request_tx }
87+
let (prune_notifier_tx, _prune_notifier_rx) = watch::channel(0);
88+
background_cache_pruner(mem_cached_store, prune_notifier_tx.clone(), prune_interval);
89+
Self {
90+
cache_request_tx,
91+
prune_notifier_tx,
92+
}
8593
}
8694
}
8795

@@ -94,6 +102,10 @@ impl ParquetCacheOracle for MemCacheOracle {
94102
};
95103
});
96104
}
105+
106+
fn prune_notifier(&self) -> watch::Receiver<usize> {
107+
self.prune_notifier_tx.subscribe()
108+
}
97109
}
98110

99111
/// Helper function for creation of a [`MemCachedObjectStore`] and [`MemCacheOracle`]
@@ -324,10 +336,10 @@ impl Cache {
324336
/// Prune least recently hit entries from the cache
325337
///
326338
/// This is a no-op if the `used` amount on the cache is not >= its `capacity`
327-
fn prune(&self) {
339+
fn prune(&self) -> Option<usize> {
328340
let used = self.used.load(Ordering::SeqCst);
329341
if used < self.capacity {
330-
return;
342+
return None;
331343
}
332344
let n_to_prune = (self.map.len() as f64 * self.prune_percent).floor() as usize;
333345
// use a BinaryHeap to determine the cut-off time, at which, entries that were
@@ -367,6 +379,8 @@ impl Cache {
367379
}
368380
// update used mem size with freed amount:
369381
self.used.fetch_sub(freed, Ordering::SeqCst);
382+
383+
Some(freed)
370384
}
371385
}
372386

@@ -663,14 +677,17 @@ fn background_cache_request_handler(
663677
/// A background task for pruning un-needed entries in the cache
664678
fn background_cache_pruner(
665679
mem_store: Arc<MemCachedObjectStore>,
680+
prune_notifier_tx: watch::Sender<usize>,
666681
interval_duration: Duration,
667682
) -> tokio::task::JoinHandle<()> {
668683
tokio::spawn(async move {
669684
let mut interval = tokio::time::interval(interval_duration);
670685
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
671686
loop {
672687
interval.tick().await;
673-
mem_store.cache.prune();
688+
if let Some(freed) = mem_store.cache.prune() {
689+
let _ = prune_notifier_tx.send(freed);
690+
}
674691
}
675692
})
676693
}
@@ -764,6 +781,7 @@ pub(crate) mod tests {
764781
cache_prune_percent,
765782
cache_prune_interval,
766783
);
784+
let mut prune_notifier = oracle.prune_notifier();
767785
// PUT an entry into the store:
768786
let path_1 = Path::from("0.parquet");
769787
let payload_1 = b"Janeway";
@@ -856,8 +874,8 @@ pub(crate) mod tests {
856874
assert_eq!(1, inner_store.total_read_request_count(&path_2));
857875
assert_eq!(1, inner_store.total_read_request_count(&path_3));
858876

859-
// allow some time for pruning:
860-
tokio::time::sleep(Duration::from_millis(500)).await;
877+
prune_notifier.changed().await.unwrap();
878+
assert_eq!(23, *prune_notifier.borrow_and_update());
861879

862880
// GET paris from the cached store, this will not be served by the cache, because paris was
863881
// evicted by neelix:

0 commit comments

Comments
 (0)