diff --git a/Cargo.lock b/Cargo.lock index adfd3b9a1d917..c38c1a475b3f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1880,6 +1880,7 @@ dependencies = [ "hashbrown 0.14.0", "hashlink", "heapsize", + "indexmap 1.9.2", ] [[package]] diff --git a/src/common/cache/Cargo.toml b/src/common/cache/Cargo.toml index 5a426156f1458..51a73f282caa4 100644 --- a/src/common/cache/Cargo.toml +++ b/src/common/cache/Cargo.toml @@ -19,6 +19,7 @@ heapsize = ["heapsize_"] bytes = { workspace = true } hashbrown = "0.14" hashlink = "0.8" +indexmap = "1.9.2" [target.'cfg(not(target_os = "macos"))'.dependencies] heapsize_ = { package = "heapsize", version = "0.4.2", optional = true } diff --git a/src/common/cache/src/cache.rs b/src/common/cache/src/cache.rs index b727e93838030..7bb05c2af6854 100644 --- a/src/common/cache/src/cache.rs +++ b/src/common/cache/src/cache.rs @@ -23,7 +23,7 @@ use crate::Meter; /// A trait for a cache. pub trait Cache where - K: Eq + Hash, + K: Eq + Hash + Clone, S: BuildHasher, M: Meter, { diff --git a/src/common/cache/src/cache/lru.rs b/src/common/cache/src/cache/lru.rs index 95945427b968f..7c6e146709d30 100644 --- a/src/common/cache/src/cache/lru.rs +++ b/src/common/cache/src/cache/lru.rs @@ -61,6 +61,7 @@ use std::hash::Hash; use hashbrown::hash_map::DefaultHashBuilder; use hashlink::linked_hash_map; use hashlink::LinkedHashMap; +use indexmap::IndexMap; use crate::cache::Cache; use crate::meter::count_meter::Count; @@ -69,18 +70,20 @@ use crate::meter::count_meter::CountableMeter; /// An LRU cache. #[derive(Clone)] pub struct LruCache< - K: Eq + Hash, + K: Eq + Hash + Clone, V, S: BuildHasher = DefaultHashBuilder, M: CountableMeter = Count, > { map: LinkedHashMap, + visited: IndexMap, + hand: u64, current_measure: M::Measure, max_capacity: u64, meter: M, } -impl LruCache { +impl LruCache { /// Creates an empty cache that can hold at most `capacity` items. /// /// # Examples @@ -92,6 +95,8 @@ impl LruCache { pub fn new(capacity: u64) -> Self { LruCache { map: LinkedHashMap::new(), + visited: IndexMap::new(), + hand: 0, current_measure: (), max_capacity: capacity, meter: Count, @@ -99,7 +104,7 @@ impl LruCache { } } -impl> LruCache { +impl> LruCache { /// Creates an empty cache that can hold at most `capacity` as measured by `meter`. /// /// You can implement the [`Meter`][meter] trait to allow custom metrics. @@ -136,6 +141,8 @@ impl> LruCache LruCache { LruCache { map: LinkedHashMap::new(), + visited: IndexMap::new(), + hand: 0, current_measure: Default::default(), max_capacity: capacity, meter, @@ -143,11 +150,13 @@ impl> LruCache LruCache { +impl LruCache { /// Creates an empty cache that can hold at most `capacity` items with the given hash builder. pub fn with_hasher(capacity: u64, hash_builder: S) -> LruCache { LruCache { map: LinkedHashMap::with_hasher(hash_builder), + visited: IndexMap::new(), + hand: 0, current_measure: (), max_capacity: capacity, meter: Count, @@ -155,7 +164,47 @@ impl LruCache { } } -impl> Cache +impl> LruCache { + fn find_evict_candidate(&mut self) -> Option { + let length = self.visited.len() as u64; + let mut p: Option = None; + let mut count = self.hand; + if count > length - length / 5 { + count = 0 + } + let mut iter = self.visited.iter_mut().skip(count as usize); + for (key, value) in &mut iter { + if !(*value) && p.is_none() { + p = Some(key.clone()); + break; + } + count += 1; + *value = false; + } + self.hand = count; + p + } + + fn peek_evict_candidate(&self) -> Option { + let length = self.visited.len() as u64; + let mut p: Option = None; + let mut count = self.hand; + if count > length - length / 5 { + count = 0 + } + let iter = self.visited.iter().skip(count as usize); + for (key, value) in iter { + if !(*value) && p.is_none() { + p = Some(key.clone()); + break; + } + count += 1; + } + p + } +} + +impl> Cache for LruCache { /// Creates an empty cache that can hold at most `capacity` as measured by `meter` with the @@ -163,6 +212,8 @@ impl> Cache fn with_meter_and_hasher(capacity: u64, meter: M, hash_builder: S) -> Self { LruCache { map: LinkedHashMap::with_hasher(hash_builder), + visited: IndexMap::new(), + hand: 0, current_measure: Default::default(), max_capacity: capacity, meter, @@ -195,13 +246,10 @@ impl> Cache K: Borrow, Q: Hash + Eq + ?Sized, { - match self.map.raw_entry_mut().from_key(k) { - linked_hash_map::RawEntryMut::Occupied(mut occupied) => { - occupied.to_back(); - Some(occupied.into_mut()) - } - linked_hash_map::RawEntryMut::Vacant(_) => None, + if let Some(v) = self.visited.get_mut(k) { + *v = true; } + self.map.get(k) } /// Returns a reference to the value corresponding to the key in the cache or `None` if it is @@ -244,7 +292,11 @@ impl> Cache /// assert_eq!(cache.peek_by_policy(), Some((&1, &"a"))); /// ``` fn peek_by_policy(&self) -> Option<(&K, &V)> { - self.map.front() + if let Some(old_key) = self.peek_evict_candidate() { + self.map.get_key_value(&old_key) + } else { + None + } } /// Checks if the map contains the given key. @@ -285,12 +337,17 @@ impl> Cache fn put(&mut self, k: K, v: V) -> Option { let new_size = self.meter.measure(&k, &v); self.current_measure = self.meter.add(self.current_measure, new_size); - if let Some(old) = self.map.get(&k) { - self.current_measure = self - .meter - .sub(self.current_measure, self.meter.measure(&k, old)); + match self.map.get(&k) { + Some(old) => { + self.current_measure = self + .meter + .sub(self.current_measure, self.meter.measure(&k, old)); + } + None => { + self.visited.insert(k.clone(), false); + } } - let old_val = self.map.insert(k, v); + let old_val = self.map.replace(k, v); while self.size() > self.capacity() { self.pop_by_policy(); } @@ -319,6 +376,7 @@ impl> Cache Q: Hash + Eq + ?Sized, { self.map.remove(k).map(|v| { + self.visited.remove(k); self.current_measure = self .meter .sub(self.current_measure, self.meter.measure(k, &v)); @@ -343,12 +401,17 @@ impl> Cache /// ``` #[inline] fn pop_by_policy(&mut self) -> Option<(K, V)> { - self.map.pop_front().map(|(k, v)| { - self.current_measure = self - .meter - .sub(self.current_measure, self.meter.measure(&k, &v)); - (k, v) - }) + if let Some(old_key) = self.find_evict_candidate() { + self.map.remove_entry(&old_key).map(|(k, v)| { + self.visited.remove(&old_key); + self.current_measure = self + .meter + .sub(self.current_measure, self.meter.measure(&k, &v)); + (k, v) + }) + } else { + None + } } /// Sets the size of the key-value pairs the cache can hold, as measured by the `Meter` used by @@ -427,11 +490,12 @@ impl> Cache /// Removes all key-value pairs from the cache. fn clear(&mut self) { self.map.clear(); + self.visited.clear(); self.current_measure = Default::default(); } } -impl> LruCache { +impl> LruCache { /// Returns an iterator over the cache's key-value pairs in least- to most-recently-used order. /// /// Accessing the cache through the iterator does _not_ affect the cache's LRU state. @@ -490,7 +554,7 @@ impl> LruCache> Extend<(K, V)> +impl> Extend<(K, V)> for LruCache { fn extend>(&mut self, iter: I) { @@ -500,15 +564,15 @@ impl> Extend<(K, V)> } } -impl> fmt::Debug - for LruCache +impl> + fmt::Debug for LruCache { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_map().entries(self.iter().rev()).finish() } } -impl> IntoIterator +impl> IntoIterator for LruCache { type Item = (K, V); @@ -519,7 +583,7 @@ impl> IntoIterator } } -impl<'a, K: Eq + Hash, V, S: BuildHasher, M: CountableMeter> IntoIterator +impl<'a, K: Eq + Hash + Clone, V, S: BuildHasher, M: CountableMeter> IntoIterator for &'a LruCache { type Item = (&'a K, &'a V); @@ -529,7 +593,7 @@ impl<'a, K: Eq + Hash, V, S: BuildHasher, M: CountableMeter> IntoIterator } } -impl<'a, K: Eq + Hash, V, S: BuildHasher, M: CountableMeter> IntoIterator +impl<'a, K: Eq + Hash + Clone, V, S: BuildHasher, M: CountableMeter> IntoIterator for &'a mut LruCache { type Item = (&'a K, &'a mut V); diff --git a/src/common/cache/tests/it/cache/lru.rs b/src/common/cache/tests/it/cache/lru.rs index 3a5321435ea58..55f548152cb68 100644 --- a/src/common/cache/tests/it/cache/lru.rs +++ b/src/common/cache/tests/it/cache/lru.rs @@ -89,13 +89,13 @@ fn test_debug() { cache.put(3, 30); assert_eq!(format!("{:?}", cache), "{3: 30, 2: 20, 1: 10}"); cache.put(2, 22); - assert_eq!(format!("{:?}", cache), "{2: 22, 3: 30, 1: 10}"); + assert_eq!(format!("{:?}", cache), "{3: 30, 2: 22, 1: 10}"); cache.put(6, 60); - assert_eq!(format!("{:?}", cache), "{6: 60, 2: 22, 3: 30}"); + assert_eq!(format!("{:?}", cache), "{6: 60, 3: 30, 2: 22}"); cache.get(&3); - assert_eq!(format!("{:?}", cache), "{3: 30, 6: 60, 2: 22}"); + assert_eq!(format!("{:?}", cache), "{6: 60, 3: 30, 2: 22}"); cache.set_capacity(2); - assert_eq!(format!("{:?}", cache), "{3: 30, 6: 60}"); + assert_eq!(format!("{:?}", cache), "{3: 30, 2: 22}"); } #[test] @@ -115,7 +115,7 @@ fn test_remove() { cache.put(8, 80); assert!(cache.get(&5).is_none()); assert_eq!(cache.get(&6), Some(&60)); - assert_eq!(cache.get(&7), Some(&70)); + assert_eq!(cache.get(&7), None); assert_eq!(cache.get(&8), Some(&80)); } @@ -139,24 +139,24 @@ fn test_iter() { cache.put(4, 40); cache.put(5, 50); assert_eq!(cache.iter().collect::>(), [ + (&2, &20), (&3, &30), - (&4, &40), (&5, &50) ]); assert_eq!(cache.iter_mut().collect::>(), [ + (&2, &mut 20), (&3, &mut 30), - (&4, &mut 40), (&5, &mut 50) ]); assert_eq!(cache.iter().rev().collect::>(), [ (&5, &50), - (&4, &40), - (&3, &30) + (&3, &30), + (&2, &20) ]); assert_eq!(cache.iter_mut().rev().collect::>(), [ (&5, &mut 50), - (&4, &mut 40), - (&3, &mut 30) + (&3, &mut 30), + (&2, &mut 20) ]); } diff --git a/src/query/storages/common/cache/tests/it/providers/disk_cache.rs b/src/query/storages/common/cache/tests/it/providers/disk_cache.rs index e049725e723c1..69baf42410d46 100644 --- a/src/query/storages/common/cache/tests/it/providers/disk_cache.rs +++ b/src/query/storages/common/cache/tests/it/providers/disk_cache.rs @@ -145,10 +145,10 @@ fn test_evict_until_enough_space() { // insert a single slice which size bigger than file1 and less than file1 + file2 c.insert_single_slice("file4", &[3; 2]).unwrap(); - assert_eq!(c.size(), 3); + assert_eq!(c.size(), 4); // file1 and file2 MUST be evicted assert!(!c.contains_key("file1")); - assert!(!c.contains_key("file2")); + assert!(!c.contains_key("file3")); // file3 MUST be keeped - assert!(c.contains_key("file3")); + assert!(c.contains_key("file2")); }