From f1e078490cc84933ae9fbe7e224e851eae564115 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Mon, 9 Jun 2025 14:44:27 -0600 Subject: [PATCH 01/17] feat: BASIC implementation of a datacache --- crates/iceberg/src/io/data_cache.rs | 216 ++++++++++++++++++++++++++++ crates/iceberg/src/io/mod.rs | 2 + 2 files changed, 218 insertions(+) create mode 100644 crates/iceberg/src/io/data_cache.rs diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs new file mode 100644 index 000000000..bae6be4e1 --- /dev/null +++ b/crates/iceberg/src/io/data_cache.rs @@ -0,0 +1,216 @@ +use std::{mem, ops::Range, sync::Arc}; + +use async_std::sync::RwLock; +use bytes::Bytes; +use moka::future::{Cache, CacheBuilder}; + +/// A cache for data files retrieved by `FileIO`. +/// +/// Minimizes work by allowing "partial" cache hits, where a file read with edge(s) covered +/// by the cache only needs to fetch the missing part in the middle. +/// +// !!!! Note: the current implementation is unoptimized and basic, it must be revised +// Before any optimization is done, some kind of benchmarking is needed +#[derive(Clone, Debug)] +pub struct DataCache { + cache: Cache, +} + +#[derive(Clone, Debug)] +struct FileCache { + content: Arc>, + current_size: u32 +} + +impl DataCache { + pub fn new(max_size: u64) -> Self { + Self { + cache: CacheBuilder::new(max_size) + .weigher(|path: &String, file_cache: &FileCache| path.len() as u32 + file_cache.current_size) + .build(), + } + } + + pub async fn get(&self, path: &String, range: Range) -> DataCacheRes { + if let Some(file_cache) = self.cache.get(path).await { + file_cache.content.read().await.get(range) + } else { + DataCacheRes::Miss + } + } + + pub async fn set(&self, path: &String, range: Range, bytes: Bytes) { + if let Some(mut file_cache) = self.cache.get(path).await { + let mut file_content_cache = file_cache.content.write().await; + + file_content_cache.set(range, bytes); + file_cache.current_size = file_content_cache.size() as u32 + size_of::() as u32; + + mem::drop(file_content_cache); // release our lock + + self.cache.insert(path.clone(), file_cache).await; + } else { + let file_content_cache = FileContentCache::new_with_first_buf(path.clone(), range, bytes); + let current_size = file_content_cache.size() as u32; + let file_cache = FileCache { + content: Arc::new(RwLock::new(file_content_cache)), + current_size + }; + + self.cache.insert(path.clone(), file_cache).await; + } + } + + pub async fn fill_partial_hit(&self, partial_hit: PartialHit, missing_bytes: Bytes) -> Bytes { + self.set(&partial_hit.path, partial_hit.missing_range, missing_bytes.clone()).await; + + if let DataCacheRes::Hit(complete_buf) = self.get(&partial_hit.path, partial_hit.original_range).await { + complete_buf + } else { + // if our file data has been purged from the cache in the meantime, reconstruct the needed buffer ourselves + Bytes::from([partial_hit.head_bytes.unwrap_or_default(), missing_bytes, partial_hit.tail_bytes.unwrap_or_default()].into_iter().flatten().collect::>()) + } + } +} + +pub type DataCacheRef = Arc; + +pub enum DataCacheRes { + Hit(Bytes), + PartialHit(PartialHit), + Miss +} + +pub struct PartialHit { + path: String, + original_range: Range, + missing_range: Range, + head_bytes: Option, + tail_bytes: Option +} + +impl PartialHit { + pub fn missing_range(&self) -> Range { + self.missing_range.clone() + } +} + + +#[derive(Clone, Debug)] +struct FileContentCache { + path: String, + // it is assumed no buffers overlap or are adjacent (adjacent buffers should be merged) + buffers: Vec<(Range, Bytes)> +} + +impl FileContentCache { + fn new_with_first_buf(path: String, range: Range, bytes: Bytes) -> Self { + if range.start == range.end { // TODO: check if this is necessary + return Self { + path, + buffers: vec![] + } + } + Self { + path, + buffers: vec![(range, bytes)] + } + } + + fn size(&self) -> u64 { + let vec_size = size_of::, Bytes)>>(); + let buf_sizes = self.buffers.iter().fold(0usize, |sum: usize, (_, buf)| sum + size_of::>() + size_of::() + buf.len()); + + (vec_size + buf_sizes) as u64 + } + + fn get(&self, range: Range) -> DataCacheRes { + let mut head: Option = None; + let mut tail: Option = None; + + for (buf_range, buf) in &self.buffers { + if buf_range.start <= range.start && range.end <= buf_range.end { + let offset = (range.start - buf_range.start) as usize; + let len = (range.end - range.start) as usize; + return DataCacheRes::Hit(buf.slice(offset..(offset + len))) + } + + if buf_range.start <= range.start && buf_range.end < range.end { + let offset = (range.start - buf_range.start) as usize; + head = Some(buf.slice(offset..buf.len())); + } + + if range.start < buf_range.start && range.end <= buf_range.end { + let cutoff = (range.end - buf_range.start) as usize; + tail = Some(buf.slice(0..cutoff)) + } + } + + if head.is_some() || tail.is_some() { + let offset_start = match &head { + Some(buf) => buf.len() as u64, + None => 0, + }; + + let offset_end = match &tail { + Some(buf) => buf.len() as u64, + None => 0, + }; + + let missing_range = (range.start + offset_start)..(range.end - offset_end); + + return DataCacheRes::PartialHit(PartialHit { path: self.path.clone(), original_range: range, missing_range, head_bytes: head, tail_bytes: tail }) + } else { + DataCacheRes::Miss + } + } + + fn set(&mut self, range: Range, bytes: Bytes) { + // TODO: LOCKING THIS IS PROBABLY A GOOD IDEA + + if range.end == range.start { // TODO: check if this necessary + return; + } + + let mut head_touching: Option<(Range, Bytes)> = None; + let mut tail_touching: Option<(Range, Bytes)> = None; + + for i in (0..self.buffers.len()).rev() { + let buf_range = self.buffers[i].0.clone(); + if buf_range.start <= range.start && range.end <= buf_range.end { + return // we already have this cached + } + + if buf_range.start <= range.start && buf_range.end < range.end { + head_touching = Some(self.buffers.remove(i)); + } + + if range.start < buf_range.start && range.end <= buf_range.start { + tail_touching = Some(self.buffers.remove(i)); + } + + if range.start < buf_range.start && buf_range.end < range.end { + self.buffers.remove(i); + } + } + + if head_touching.is_none() && tail_touching.is_none() { + self.buffers.push((range, bytes)) + } else { + let (head_range, head_buf) = head_touching.unwrap_or((range.start..range.start, Bytes::new())); + let (tail_range, tail_buf) = tail_touching.unwrap_or((range.end..range.end, Bytes::new())); + + let start_offset = (head_range.end - range.start) as usize; + let end_offset = range.end - tail_range.start; + let trimmed_end = bytes.len() - end_offset as usize; + + let trimmed_middle_buf = bytes.slice(start_offset..trimmed_end); + let new_buf = Bytes::from([head_buf, trimmed_middle_buf, tail_buf].into_iter().flatten().collect::>()); + + // we clear all of the buffers that were totally overpassed by our new buffer + self.buffers.retain(|(buf_range, _)| !(range.start < buf_range.start && buf_range.end < range.end)); + + self.buffers.push((head_range.start..tail_range.end, new_buf)) + } + } +} diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index d442b1522..25d13cd62 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -69,6 +69,8 @@ mod file_io; pub use file_io::*; +pub(crate) mod data_cache; + mod storage; #[cfg(feature = "storage-memory")] mod storage_memory; From d05fd792d717735653c0237a828bd8b15b095cd4 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Mon, 9 Jun 2025 15:13:01 -0600 Subject: [PATCH 02/17] chore: cargo fmt --- crates/iceberg/src/io/data_cache.rs | 108 +++++++++++++++++++--------- 1 file changed, 74 insertions(+), 34 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index bae6be4e1..e8c4eee83 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -1,14 +1,15 @@ -use std::{mem, ops::Range, sync::Arc}; +use std::mem; +use std::ops::Range; +use std::sync::Arc; -use async_std::sync::RwLock; use bytes::Bytes; use moka::future::{Cache, CacheBuilder}; +use tokio::sync::RwLock; -/// A cache for data files retrieved by `FileIO`. -/// -/// Minimizes work by allowing "partial" cache hits, where a file read with edge(s) covered +/// A cache for data files retrieved by `FileIO`. +/// +/// Minimizes work by allowing "partial" cache hits, where a file read with edge(s) covered /// by the cache only needs to fetch the missing part in the middle. -/// // !!!! Note: the current implementation is unoptimized and basic, it must be revised // Before any optimization is done, some kind of benchmarking is needed #[derive(Clone, Debug)] @@ -19,14 +20,16 @@ pub struct DataCache { #[derive(Clone, Debug)] struct FileCache { content: Arc>, - current_size: u32 + current_size: u32, } impl DataCache { pub fn new(max_size: u64) -> Self { Self { cache: CacheBuilder::new(max_size) - .weigher(|path: &String, file_cache: &FileCache| path.len() as u32 + file_cache.current_size) + .weigher(|path: &String, file_cache: &FileCache| { + path.len() as u32 + file_cache.current_size + }) .build(), } } @@ -50,25 +53,43 @@ impl DataCache { self.cache.insert(path.clone(), file_cache).await; } else { - let file_content_cache = FileContentCache::new_with_first_buf(path.clone(), range, bytes); + let file_content_cache = + FileContentCache::new_with_first_buf(path.clone(), range, bytes); let current_size = file_content_cache.size() as u32; let file_cache = FileCache { content: Arc::new(RwLock::new(file_content_cache)), - current_size + current_size, }; self.cache.insert(path.clone(), file_cache).await; } } - + pub async fn fill_partial_hit(&self, partial_hit: PartialHit, missing_bytes: Bytes) -> Bytes { - self.set(&partial_hit.path, partial_hit.missing_range, missing_bytes.clone()).await; - - if let DataCacheRes::Hit(complete_buf) = self.get(&partial_hit.path, partial_hit.original_range).await { + self.set( + &partial_hit.path, + partial_hit.missing_range, + missing_bytes.clone(), + ) + .await; + + if let DataCacheRes::Hit(complete_buf) = self + .get(&partial_hit.path, partial_hit.original_range) + .await + { complete_buf } else { // if our file data has been purged from the cache in the meantime, reconstruct the needed buffer ourselves - Bytes::from([partial_hit.head_bytes.unwrap_or_default(), missing_bytes, partial_hit.tail_bytes.unwrap_or_default()].into_iter().flatten().collect::>()) + Bytes::from( + [ + partial_hit.head_bytes.unwrap_or_default(), + missing_bytes, + partial_hit.tail_bytes.unwrap_or_default(), + ] + .into_iter() + .flatten() + .collect::>(), + ) } } } @@ -78,7 +99,7 @@ pub type DataCacheRef = Arc; pub enum DataCacheRes { Hit(Bytes), PartialHit(PartialHit), - Miss + Miss, } pub struct PartialHit { @@ -86,7 +107,7 @@ pub struct PartialHit { original_range: Range, missing_range: Range, head_bytes: Option, - tail_bytes: Option + tail_bytes: Option, } impl PartialHit { @@ -95,31 +116,33 @@ impl PartialHit { } } - #[derive(Clone, Debug)] struct FileContentCache { path: String, // it is assumed no buffers overlap or are adjacent (adjacent buffers should be merged) - buffers: Vec<(Range, Bytes)> + buffers: Vec<(Range, Bytes)>, } impl FileContentCache { fn new_with_first_buf(path: String, range: Range, bytes: Bytes) -> Self { - if range.start == range.end { // TODO: check if this is necessary + if range.start == range.end { + // TODO: check if this is necessary return Self { path, - buffers: vec![] - } + buffers: vec![], + }; } Self { path, - buffers: vec![(range, bytes)] + buffers: vec![(range, bytes)], } } fn size(&self) -> u64 { let vec_size = size_of::, Bytes)>>(); - let buf_sizes = self.buffers.iter().fold(0usize, |sum: usize, (_, buf)| sum + size_of::>() + size_of::() + buf.len()); + let buf_sizes = self.buffers.iter().fold(0usize, |sum: usize, (_, buf)| { + sum + size_of::>() + size_of::() + buf.len() + }); (vec_size + buf_sizes) as u64 } @@ -132,7 +155,7 @@ impl FileContentCache { if buf_range.start <= range.start && range.end <= buf_range.end { let offset = (range.start - buf_range.start) as usize; let len = (range.end - range.start) as usize; - return DataCacheRes::Hit(buf.slice(offset..(offset + len))) + return DataCacheRes::Hit(buf.slice(offset..(offset + len))); } if buf_range.start <= range.start && buf_range.end < range.end { @@ -159,7 +182,13 @@ impl FileContentCache { let missing_range = (range.start + offset_start)..(range.end - offset_end); - return DataCacheRes::PartialHit(PartialHit { path: self.path.clone(), original_range: range, missing_range, head_bytes: head, tail_bytes: tail }) + return DataCacheRes::PartialHit(PartialHit { + path: self.path.clone(), + original_range: range, + missing_range, + head_bytes: head, + tail_bytes: tail, + }); } else { DataCacheRes::Miss } @@ -168,7 +197,8 @@ impl FileContentCache { fn set(&mut self, range: Range, bytes: Bytes) { // TODO: LOCKING THIS IS PROBABLY A GOOD IDEA - if range.end == range.start { // TODO: check if this necessary + if range.end == range.start { + // TODO: check if this necessary return; } @@ -178,7 +208,7 @@ impl FileContentCache { for i in (0..self.buffers.len()).rev() { let buf_range = self.buffers[i].0.clone(); if buf_range.start <= range.start && range.end <= buf_range.end { - return // we already have this cached + return; // we already have this cached } if buf_range.start <= range.start && buf_range.end < range.end { @@ -197,20 +227,30 @@ impl FileContentCache { if head_touching.is_none() && tail_touching.is_none() { self.buffers.push((range, bytes)) } else { - let (head_range, head_buf) = head_touching.unwrap_or((range.start..range.start, Bytes::new())); - let (tail_range, tail_buf) = tail_touching.unwrap_or((range.end..range.end, Bytes::new())); + let (head_range, head_buf) = + head_touching.unwrap_or((range.start..range.start, Bytes::new())); + let (tail_range, tail_buf) = + tail_touching.unwrap_or((range.end..range.end, Bytes::new())); let start_offset = (head_range.end - range.start) as usize; let end_offset = range.end - tail_range.start; let trimmed_end = bytes.len() - end_offset as usize; let trimmed_middle_buf = bytes.slice(start_offset..trimmed_end); - let new_buf = Bytes::from([head_buf, trimmed_middle_buf, tail_buf].into_iter().flatten().collect::>()); - + let new_buf = Bytes::from( + [head_buf, trimmed_middle_buf, tail_buf] + .into_iter() + .flatten() + .collect::>(), + ); + // we clear all of the buffers that were totally overpassed by our new buffer - self.buffers.retain(|(buf_range, _)| !(range.start < buf_range.start && buf_range.end < range.end)); + self.buffers.retain(|(buf_range, _)| { + !(range.start < buf_range.start && buf_range.end < range.end) + }); - self.buffers.push((head_range.start..tail_range.end, new_buf)) + self.buffers + .push((head_range.start..tail_range.end, new_buf)) } } } From acfcd7a4a402b467d145c96e871187fa088e2e01 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Mon, 9 Jun 2025 15:13:34 -0600 Subject: [PATCH 03/17] feat: add CachedFileReader to wrap opendal reader --- crates/iceberg/src/io/file_io.rs | 64 +++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 2f0ae1736..a6aeb3c87 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -23,9 +23,12 @@ use bytes::Bytes; use opendal::Operator; use url::Url; +use super::data_cache::{DataCache, DataCacheRef, DataCacheRes}; use super::storage::Storage; use crate::{Error, ErrorKind, Result}; +const DEFAULT_MAX_CACHE_SIZE: u64 = 32 * 1000 * 1000; // 32 Mb + /// FileIO implementation, used to manipulate files in underlying storage. /// /// # Note @@ -44,8 +47,8 @@ use crate::{Error, ErrorKind, Result}; #[derive(Clone, Debug)] pub struct FileIO { builder: FileIOBuilder, - inner: Arc, + cache: DataCacheRef, } impl FileIO { @@ -145,6 +148,7 @@ impl FileIO { op, path, relative_path_pos, + cache: self.cache.clone(), }) } @@ -161,6 +165,7 @@ impl FileIO { op, path, relative_path_pos, + cache: self.cache.clone(), }) } } @@ -174,6 +179,8 @@ pub struct FileIOBuilder { scheme_str: Option, /// Arguments for operator. props: HashMap, + /// Maximum capacity of the hash, in bytes + cache_size: u64, } impl FileIOBuilder { @@ -183,6 +190,7 @@ impl FileIOBuilder { Self { scheme_str: Some(scheme_str.to_string()), props: HashMap::default(), + cache_size: DEFAULT_MAX_CACHE_SIZE, } } @@ -191,6 +199,7 @@ impl FileIOBuilder { Self { scheme_str: None, props: HashMap::default(), + cache_size: DEFAULT_MAX_CACHE_SIZE, } } @@ -217,12 +226,20 @@ impl FileIOBuilder { self } + /// Set the maximum cache size in bytes + pub fn with_cache_size(mut self, cache_size: u64) -> Self { + self.cache_size = cache_size; + self + } + /// Builds [`FileIO`]. pub fn build(self) -> Result { let storage = Storage::build(self.clone())?; + let cache_size = self.cache_size; Ok(FileIO { builder: self, inner: Arc::new(storage), + cache: Arc::new(DataCache::new(cache_size)), }) } } @@ -249,6 +266,38 @@ pub trait FileRead: Send + Sync + Unpin + 'static { async fn read(&self, range: Range) -> crate::Result; } +/// A file reader that can cache read buffers for future use +pub struct CachedFileReader { + reader: opendal::Reader, + cache: DataCacheRef, + path: String, +} + +#[async_trait::async_trait] +impl FileRead for CachedFileReader { + async fn read(&self, range: Range) -> crate::Result { + match self.cache.get(&self.path, range.clone()).await { + DataCacheRes::Hit(res) => Ok(res), + DataCacheRes::PartialHit(partial_hit) => { + let missing_bytes = self + .reader + .read(partial_hit.missing_range()) + .await? + .to_bytes(); + Ok(self + .cache + .fill_partial_hit(partial_hit, missing_bytes) + .await) + } + DataCacheRes::Miss => { + let res = self.reader.read(range.clone()).await?.to_bytes(); + self.cache.set(&self.path, range, res.clone()).await; + Ok(res) + } + } + } +} + #[async_trait::async_trait] impl FileRead for opendal::Reader { async fn read(&self, range: Range) -> crate::Result { @@ -264,6 +313,7 @@ pub struct InputFile { path: String, // Relative path of file to uri, starts at [`relative_path_pos`] relative_path_pos: usize, + cache: DataCacheRef, } impl InputFile { @@ -289,6 +339,7 @@ impl InputFile { /// Read and returns whole content of file. /// /// For continuous reading, use [`Self::reader`] instead. + // TODO: implement cache-level understanding of file size and completeness so this function can use cache too pub async fn read(&self) -> crate::Result { Ok(self .op @@ -300,8 +351,15 @@ impl InputFile { /// Creates [`FileRead`] for continuous reading. /// /// For one-time reading, use [`Self::read`] instead. + // TODO: figure out how to cache reads with a reader pub async fn reader(&self) -> crate::Result> { - Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?) + let direct_reader = self.op.reader(&self.path[self.relative_path_pos..]).await?; + + Ok(CachedFileReader { + reader: direct_reader, + cache: self.cache.clone(), + path: self.path.clone(), + }) } } @@ -344,6 +402,7 @@ pub struct OutputFile { path: String, // Relative path of file to uri, starts at [`relative_path_pos`] relative_path_pos: usize, + cache: DataCacheRef, // TODO: cache writes to output files to prevent unecessary reads } impl OutputFile { @@ -370,6 +429,7 @@ impl OutputFile { op: self.op, path: self.path, relative_path_pos: self.relative_path_pos, + cache: self.cache, } } From 51fde55ebab3189bf10d711344bf63915289d11d Mon Sep 17 00:00:00 2001 From: Kyteware Date: Tue, 10 Jun 2025 08:23:32 -0600 Subject: [PATCH 04/17] fix: bounds checks for partial hits correct --- crates/iceberg/src/io/data_cache.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index e8c4eee83..ef29dc5a8 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -126,7 +126,6 @@ struct FileContentCache { impl FileContentCache { fn new_with_first_buf(path: String, range: Range, bytes: Bytes) -> Self { if range.start == range.end { - // TODO: check if this is necessary return Self { path, buffers: vec![], @@ -158,12 +157,14 @@ impl FileContentCache { return DataCacheRes::Hit(buf.slice(offset..(offset + len))); } - if buf_range.start <= range.start && buf_range.end < range.end { + if buf_range.start <= range.start + && ((range.start + 1)..range.end).contains(&buf_range.end) + { let offset = (range.start - buf_range.start) as usize; head = Some(buf.slice(offset..buf.len())); } - if range.start < buf_range.start && range.end <= buf_range.end { + if range.contains(&buf_range.start) && range.end <= buf_range.end { let cutoff = (range.end - buf_range.start) as usize; tail = Some(buf.slice(0..cutoff)) } @@ -195,8 +196,6 @@ impl FileContentCache { } fn set(&mut self, range: Range, bytes: Bytes) { - // TODO: LOCKING THIS IS PROBABLY A GOOD IDEA - if range.end == range.start { // TODO: check if this necessary return; @@ -211,11 +210,13 @@ impl FileContentCache { return; // we already have this cached } - if buf_range.start <= range.start && buf_range.end < range.end { + if buf_range.start <= range.start + && ((range.start + 1)..range.end).contains(&buf_range.end) + { head_touching = Some(self.buffers.remove(i)); } - if range.start < buf_range.start && range.end <= buf_range.start { + if range.contains(&buf_range.start) && range.end <= buf_range.end { tail_touching = Some(self.buffers.remove(i)); } From 5c14ec58f5be6b7e20ab92fc283c2263a31b1b0a Mon Sep 17 00:00:00 2001 From: Kyteware Date: Tue, 10 Jun 2025 08:45:40 -0600 Subject: [PATCH 05/17] chore: cargo fmt --- crates/iceberg/src/io/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index ad1bb6086..617f525c6 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -70,8 +70,8 @@ mod file_io; mod storage; pub use file_io::*; -pub(crate) mod object_cache; pub(crate) mod data_cache; +pub(crate) mod object_cache; #[cfg(feature = "storage-azdls")] mod storage_azdls; From d360d9ebdb582f01a4623de57ade840fcb60918f Mon Sep 17 00:00:00 2001 From: Kyteware Date: Tue, 10 Jun 2025 08:47:17 -0600 Subject: [PATCH 06/17] chore: fix typo --- crates/iceberg/src/io/file_io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 01a8ec4c0..7654d3418 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -404,7 +404,7 @@ pub struct OutputFile { path: String, // Relative path of file to uri, starts at [`relative_path_pos`] relative_path_pos: usize, - cache: DataCacheRef, // TODO: cache writes to output files to prevent unecessary reads + cache: DataCacheRef, // TODO: cache writes to output files to prevent unnecessary reads } impl OutputFile { From 693ee628359df0592ccf59ef500dc36a10d235f2 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Tue, 10 Jun 2025 08:50:25 -0600 Subject: [PATCH 07/17] chore: add license header to data cache file --- crates/iceberg/src/io/data_cache.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index ef29dc5a8..ee841bdab 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::mem; use std::ops::Range; use std::sync::Arc; From aab8769417758e307329dd32c73602eb048f75cd Mon Sep 17 00:00:00 2001 From: Kyteware Date: Tue, 10 Jun 2025 09:07:18 -0600 Subject: [PATCH 08/17] chore: cargo clippy --- crates/iceberg/src/io/data_cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index ee841bdab..e3fecd51b 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -200,13 +200,13 @@ impl FileContentCache { let missing_range = (range.start + offset_start)..(range.end - offset_end); - return DataCacheRes::PartialHit(PartialHit { + DataCacheRes::PartialHit(PartialHit { path: self.path.clone(), original_range: range, missing_range, head_bytes: head, tail_bytes: tail, - }); + }) } else { DataCacheRes::Miss } From 2794185038b3fc6ad64ccd9e6d04553116773df6 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 12 Jun 2025 14:13:11 -0600 Subject: [PATCH 09/17] feat: implement complete data file storage in cache --- crates/iceberg/src/io/data_cache.rs | 74 ++++++++++++++++++++++------- crates/iceberg/src/io/file_io.rs | 4 +- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index e3fecd51b..0340e28d0 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -51,30 +51,62 @@ impl DataCache { } } - pub async fn get(&self, path: &String, range: Range) -> DataCacheRes { + pub async fn get_whole(&self, path: &String) -> Option { if let Some(file_cache) = self.cache.get(path).await { - file_cache.content.read().await.get(range) + if let FileContentCache::Complete(bytes) = &*file_cache.content.read().await { + return Some(bytes.clone()); + } + } + + None + } + + pub async fn set_whole(&self, path: &String, bytes: Bytes) { + let size = size_of::() + bytes.len(); + self.cache.insert( + path.clone(), + FileCache { content: Arc::new(RwLock::new(FileContentCache::Complete(bytes))), current_size: size as u32 } + ).await; + } + + pub async fn get_range(&self, path: &String, range: Range) -> DataCacheRes { + if let Some(file_cache) = self.cache.get(path).await { + match &*file_cache.content.read().await { + FileContentCache::Complete(bytes) => { + let range = (range.start as usize)..(range.end as usize); + DataCacheRes::Hit(bytes.slice(range)) + }, + FileContentCache::Fragmented(fragmented_content_cache) => { + fragmented_content_cache.get(range) + }, + } } else { DataCacheRes::Miss } } - pub async fn set(&self, path: &String, range: Range, bytes: Bytes) { + pub async fn set_range(&self, path: &String, range: Range, bytes: Bytes) { if let Some(mut file_cache) = self.cache.get(path).await { let mut file_content_cache = file_cache.content.write().await; - - file_content_cache.set(range, bytes); - file_cache.current_size = file_content_cache.size() as u32 + size_of::() as u32; - - mem::drop(file_content_cache); // release our lock - - self.cache.insert(path.clone(), file_cache).await; + match &mut *file_content_cache { + FileContentCache::Complete(_) => { + // do nothing, we already have the entire file cached + }, + FileContentCache::Fragmented(fragmented_content_cache) => { + fragmented_content_cache.set(range, bytes); + file_cache.current_size = fragmented_content_cache.size() as u32 + size_of::() as u32; + + mem::drop(file_content_cache); // release our lock + + self.cache.insert(path.clone(), file_cache).await; + } + } } else { - let file_content_cache = - FileContentCache::new_with_first_buf(path.clone(), range, bytes); - let current_size = file_content_cache.size() as u32; + let fragmented_content_cache = + FragmentedContentCache::new_with_first_buf(path.clone(), range, bytes); + let current_size = fragmented_content_cache.size() as u32; let file_cache = FileCache { - content: Arc::new(RwLock::new(file_content_cache)), + content: Arc::new(RwLock::new(FileContentCache::Fragmented(fragmented_content_cache))), current_size, }; @@ -83,7 +115,7 @@ impl DataCache { } pub async fn fill_partial_hit(&self, partial_hit: PartialHit, missing_bytes: Bytes) -> Bytes { - self.set( + self.set_range( &partial_hit.path, partial_hit.missing_range, missing_bytes.clone(), @@ -91,7 +123,7 @@ impl DataCache { .await; if let DataCacheRes::Hit(complete_buf) = self - .get(&partial_hit.path, partial_hit.original_range) + .get_range(&partial_hit.path, partial_hit.original_range) .await { complete_buf @@ -134,13 +166,19 @@ impl PartialHit { } #[derive(Clone, Debug)] -struct FileContentCache { +enum FileContentCache { + Complete(Bytes), + Fragmented(FragmentedContentCache) +} + +#[derive(Clone, Debug)] +struct FragmentedContentCache { path: String, // it is assumed no buffers overlap or are adjacent (adjacent buffers should be merged) buffers: Vec<(Range, Bytes)>, } -impl FileContentCache { +impl FragmentedContentCache { fn new_with_first_buf(path: String, range: Range, bytes: Bytes) -> Self { if range.start == range.end { return Self { diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 7654d3418..f47267ec3 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -278,7 +278,7 @@ pub struct CachedFileReader { #[async_trait::async_trait] impl FileRead for CachedFileReader { async fn read(&self, range: Range) -> crate::Result { - match self.cache.get(&self.path, range.clone()).await { + match self.cache.get_range(&self.path, range.clone()).await { DataCacheRes::Hit(res) => Ok(res), DataCacheRes::PartialHit(partial_hit) => { let missing_bytes = self @@ -293,7 +293,7 @@ impl FileRead for CachedFileReader { } DataCacheRes::Miss => { let res = self.reader.read(range.clone()).await?.to_bytes(); - self.cache.set(&self.path, range, res.clone()).await; + self.cache.set_range(&self.path, range, res.clone()).await; Ok(res) } } From e14db3c52e2c8e67d33f6beb202609b0c32a8478 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 12 Jun 2025 14:16:53 -0600 Subject: [PATCH 10/17] feat: integrate whole-datafile caching in fileio --- crates/iceberg/src/io/file_io.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index f47267ec3..b4d19600b 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -343,11 +343,17 @@ impl InputFile { /// For continuous reading, use [`Self::reader`] instead. // TODO: implement cache-level understanding of file size and completeness so this function can use cache too pub async fn read(&self) -> crate::Result { - Ok(self - .op - .read(&self.path[self.relative_path_pos..]) - .await? - .to_bytes()) + if let Some(bytes) = self.cache.get_whole(&self.path).await { + Ok(bytes) + } else { + let bytes = self + .op + .read(&self.path[self.relative_path_pos..]) + .await? + .to_bytes(); + self.cache.set_whole(&self.path, bytes.clone()).await; + Ok(bytes) + } } /// Creates [`FileRead`] for continuous reading. From 40e0e1738c06c7a52bc7e6d91e3c45af44220f4a Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 12 Jun 2025 14:27:46 -0600 Subject: [PATCH 11/17] chore: cargo fmt --- crates/iceberg/src/io/data_cache.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index 0340e28d0..44276984c 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -63,10 +63,12 @@ impl DataCache { pub async fn set_whole(&self, path: &String, bytes: Bytes) { let size = size_of::() + bytes.len(); - self.cache.insert( - path.clone(), - FileCache { content: Arc::new(RwLock::new(FileContentCache::Complete(bytes))), current_size: size as u32 } - ).await; + self.cache + .insert(path.clone(), FileCache { + content: Arc::new(RwLock::new(FileContentCache::Complete(bytes))), + current_size: size as u32, + }) + .await; } pub async fn get_range(&self, path: &String, range: Range) -> DataCacheRes { @@ -75,10 +77,10 @@ impl DataCache { FileContentCache::Complete(bytes) => { let range = (range.start as usize)..(range.end as usize); DataCacheRes::Hit(bytes.slice(range)) - }, + } FileContentCache::Fragmented(fragmented_content_cache) => { fragmented_content_cache.get(range) - }, + } } } else { DataCacheRes::Miss @@ -91,10 +93,11 @@ impl DataCache { match &mut *file_content_cache { FileContentCache::Complete(_) => { // do nothing, we already have the entire file cached - }, + } FileContentCache::Fragmented(fragmented_content_cache) => { fragmented_content_cache.set(range, bytes); - file_cache.current_size = fragmented_content_cache.size() as u32 + size_of::() as u32; + file_cache.current_size = + fragmented_content_cache.size() as u32 + size_of::() as u32; mem::drop(file_content_cache); // release our lock @@ -106,7 +109,9 @@ impl DataCache { FragmentedContentCache::new_with_first_buf(path.clone(), range, bytes); let current_size = fragmented_content_cache.size() as u32; let file_cache = FileCache { - content: Arc::new(RwLock::new(FileContentCache::Fragmented(fragmented_content_cache))), + content: Arc::new(RwLock::new(FileContentCache::Fragmented( + fragmented_content_cache, + ))), current_size, }; @@ -168,7 +173,7 @@ impl PartialHit { #[derive(Clone, Debug)] enum FileContentCache { Complete(Bytes), - Fragmented(FragmentedContentCache) + Fragmented(FragmentedContentCache), } #[derive(Clone, Debug)] From 4fcd5e4d5d3bcae6fe78a0597eb0bef66291db92 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 12 Jun 2025 14:29:57 -0600 Subject: [PATCH 12/17] chore: cargo clippy --- crates/iceberg/src/io/data_cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index 44276984c..ae4aec8b5 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -61,10 +61,10 @@ impl DataCache { None } - pub async fn set_whole(&self, path: &String, bytes: Bytes) { + pub async fn set_whole(&self, path: &str, bytes: Bytes) { let size = size_of::() + bytes.len(); self.cache - .insert(path.clone(), FileCache { + .insert(path.to_owned(), FileCache { content: Arc::new(RwLock::new(FileContentCache::Complete(bytes))), current_size: size as u32, }) From 153ae6012b205eead8d327436fa7faf6591d0bc7 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Thu, 12 Jun 2025 14:33:51 -0600 Subject: [PATCH 13/17] chore: resolve/revise todo comments --- crates/iceberg/src/io/data_cache.rs | 1 - crates/iceberg/src/io/file_io.rs | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index ae4aec8b5..9d3074c73 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -257,7 +257,6 @@ impl FragmentedContentCache { fn set(&mut self, range: Range, bytes: Bytes) { if range.end == range.start { - // TODO: check if this necessary return; } diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index b4d19600b..f40f99200 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -341,7 +341,6 @@ impl InputFile { /// Read and returns whole content of file. /// /// For continuous reading, use [`Self::reader`] instead. - // TODO: implement cache-level understanding of file size and completeness so this function can use cache too pub async fn read(&self) -> crate::Result { if let Some(bytes) = self.cache.get_whole(&self.path).await { Ok(bytes) @@ -359,7 +358,6 @@ impl InputFile { /// Creates [`FileRead`] for continuous reading. /// /// For one-time reading, use [`Self::read`] instead. - // TODO: figure out how to cache reads with a reader pub async fn reader(&self) -> crate::Result> { let direct_reader = self.op.reader(&self.path[self.relative_path_pos..]).await?; @@ -410,7 +408,7 @@ pub struct OutputFile { path: String, // Relative path of file to uri, starts at [`relative_path_pos`] relative_path_pos: usize, - cache: DataCacheRef, // TODO: cache writes to output files to prevent unnecessary reads + cache: DataCacheRef, // TODO: consider caching file writes } impl OutputFile { From 30353c709961df3183923d67a62fb082a8e741e3 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sat, 14 Jun 2025 08:43:33 -0700 Subject: [PATCH 14/17] test: add data cache tests --- crates/iceberg/src/io/data_cache.rs | 182 ++++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index 9d3074c73..279db6b65 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -150,12 +150,14 @@ impl DataCache { pub type DataCacheRef = Arc; +#[derive(Debug, PartialEq)] pub enum DataCacheRes { Hit(Bytes), PartialHit(PartialHit), Miss, } +#[derive(Debug, PartialEq)] pub struct PartialHit { path: String, original_range: Range, @@ -314,3 +316,183 @@ impl FragmentedContentCache { } } } + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use bytes::Bytes; + + use crate::io::data_cache::PartialHit; + + use super::DataCache; + use super::DataCacheRes::{Hit, Miss, PartialHit as ParHit}; + + const MEGS32: u64 = 32 * 1000 * 1000; + const TEST_PATH: &str = "/test/path"; + const TEST_CONTENTS: &str = "abcdefghijklmnopqrstuvwxyz"; + const TEST_BYTES: Bytes = Bytes::from_static(TEST_CONTENTS.as_bytes()); + + #[tokio::test] + async fn cache_whole_file() { + let cache = DataCache::new(MEGS32); + + assert_eq!(None, cache.get_whole(&TEST_PATH.to_owned()).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 5..10).await); + + cache.set_whole(TEST_PATH, TEST_BYTES).await; + + assert_eq!( + Some(TEST_BYTES), + cache.get_whole(&TEST_PATH.to_owned()).await + ); + assert_eq!( + Hit(TEST_BYTES.slice(5..10)), + cache.get_range(&TEST_PATH.to_owned(), 5..10).await + ); + + // shouldn't have an effect, we already have this fully cached and the cache shouldn't have filled and purged it + cache.set_range(&TEST_PATH.to_owned(), 4..8, Bytes::new()).await; + + assert_eq!( + Some(TEST_BYTES), + cache.get_whole(&TEST_PATH.to_owned()).await + ); + assert_eq!( + Hit(TEST_BYTES.slice(5..10)), + cache.get_range(&TEST_PATH.to_owned(), 5..10).await + ); + } + + #[tokio::test] + async fn cache_one_range_simple() { + let cache = DataCache::new(MEGS32); + + assert_eq!(None, cache.get_whole(&TEST_PATH.to_owned()).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 0..4).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 9..12).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 20..23).await); + + cache.set_range(&TEST_PATH.to_owned(), 7..15, TEST_BYTES.slice(7..15)).await; + + assert_eq!(None, cache.get_whole(&TEST_PATH.to_owned()).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 0..4).await); + assert_eq!(Hit(TEST_BYTES.slice(9..12)), cache.get_range(&TEST_PATH.to_owned(), 9..12).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 20..23).await); + + cache.set_whole(TEST_PATH, TEST_BYTES).await; + + assert_eq!(Some(TEST_BYTES), cache.get_whole(&TEST_PATH.to_owned()).await); + assert_eq!(Hit(TEST_BYTES.slice(0..4)), cache.get_range(&TEST_PATH.to_owned(), 0..4).await); + assert_eq!(Hit(TEST_BYTES.slice(9..12)), cache.get_range(&TEST_PATH.to_owned(), 9..12).await); + assert_eq!(Hit(TEST_BYTES.slice(20..23)), cache.get_range(&TEST_PATH.to_owned(), 20..23).await); + } + + #[tokio::test] + async fn cache_partial_hit() { + let cache = DataCache::new(MEGS32); + + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 5..15).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 10..20).await); + + cache.set_range(&TEST_PATH.to_owned(), 3..8, TEST_BYTES.slice(3..8)).await; + + assert_eq!(ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 8..15, + head_bytes: Some(TEST_BYTES.slice(5..8)), + tail_bytes: None + }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); + assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 10..20).await); + + cache.set_range(&TEST_PATH.to_owned(), 15..22, TEST_BYTES.slice(15..22)).await; + + assert_eq!(ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 8..15, + head_bytes: Some(TEST_BYTES.slice(5..8)), + tail_bytes: None + }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); + assert_eq!(ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 10..20, + missing_range: 10..15, + head_bytes: None, + tail_bytes: Some(TEST_BYTES.slice(15..20)) + }), cache.get_range(&TEST_PATH.to_owned(), 10..20).await); + + cache.set_range(&TEST_PATH.to_owned(), 12..17, TEST_BYTES.slice(12..17)).await; + + assert_eq!(ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 8..12, + head_bytes: Some(TEST_BYTES.slice(5..8)), + tail_bytes: Some(TEST_BYTES.slice(12..15)) + }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); + assert_eq!(ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 10..20, + missing_range: 10..12, + head_bytes: None, + tail_bytes: Some(TEST_BYTES.slice(12..20)) + }), cache.get_range(&TEST_PATH.to_owned(), 10..20).await); + } + + #[tokio::test] + async fn cache_partial_hit_fill() { + let cache = DataCache::new(MEGS32); + + cache.set_range(&TEST_PATH.to_owned(), 3..8, TEST_BYTES.slice(3..8)).await; + + if let ParHit(partial_hit) = cache.get_range(&TEST_PATH.to_owned(), 5..13).await { + let missing = partial_hit.missing_range(); + let missing = missing.start as usize..missing.end as usize; + let missing_bytes = TEST_BYTES.slice(missing); + assert_eq!(TEST_BYTES.slice(5..13), cache.fill_partial_hit(partial_hit, missing_bytes).await); + } else { + panic!("not a partial hit :(") + } + } + + #[tokio::test] + async fn cache_overlapping_ranges() { + let cache = DataCache::new(MEGS32); + + cache.set_range(&TEST_PATH.to_owned(), 12..18, TEST_BYTES.slice(12..18)).await; + cache.set_range(&TEST_PATH.to_owned(), 10..20, TEST_BYTES.slice(10..20)).await; + cache.set_range(&TEST_PATH.to_owned(), 14..16, TEST_BYTES.slice(14..16)).await; + + assert_eq!(ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 5..10, + head_bytes: None, + tail_bytes: Some(TEST_BYTES.slice(10..15)) + }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); + + assert_eq!(Hit(TEST_BYTES.slice(11..17)), cache.get_range(&TEST_PATH.to_owned(), 11..17).await) + } + + #[tokio::test] + async fn cache_partial_fill_ran_out_of_memory() { + // enough memory to cache 5 bytes + let size = size_of::() + size_of::, Bytes)>>() + size_of::>() + size_of::() + 5; + + // give a little bit of extra leeway + let cache = DataCache::new(size as u64 + 2); + + cache.set_range(&TEST_PATH.to_owned(), 10..15, TEST_BYTES.slice(10..15)).await; + + if let ParHit(partial_hit) = cache.get_range(&TEST_PATH.to_owned(), 12..22).await { + let missing = partial_hit.missing_range(); + let missing = missing.start as usize..missing.end as usize; + let missing_bytes = TEST_BYTES.slice(missing); + assert_eq!(TEST_BYTES.slice(12..22), cache.fill_partial_hit(partial_hit, missing_bytes).await); + } else { + panic!("not a partial hit :(") + } + } +} From 448acf720414a9cc3283a6ebd22d622a0f79de15 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sat, 14 Jun 2025 08:56:17 -0700 Subject: [PATCH 15/17] doc: add docs to structs and functions public to the crate --- crates/iceberg/src/io/data_cache.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index 279db6b65..ea2fad24d 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -23,9 +23,9 @@ use bytes::Bytes; use moka::future::{Cache, CacheBuilder}; use tokio::sync::RwLock; -/// A cache for data files retrieved by `FileIO`. +/// A cache for data files retrieved by [`FileIO`]. /// -/// Minimizes work by allowing "partial" cache hits, where a file read with edge(s) covered +/// Minimizes work by allowing cache [`PartialHit`]s, where a file read with edge(s) covered /// by the cache only needs to fetch the missing part in the middle. // !!!! Note: the current implementation is unoptimized and basic, it must be revised // Before any optimization is done, some kind of benchmarking is needed @@ -34,6 +34,7 @@ pub struct DataCache { cache: Cache, } +/// The cache of a single file, with a read/write lock on the content for concurrent use #[derive(Clone, Debug)] struct FileCache { content: Arc>, @@ -41,6 +42,7 @@ struct FileCache { } impl DataCache { + /// Creates a new [`DataCache`] with no entries and the specified size. pub fn new(max_size: u64) -> Self { Self { cache: CacheBuilder::new(max_size) @@ -51,6 +53,9 @@ impl DataCache { } } + /// Tries to get the entire contents of a data file. Will only return the cached content if the + /// entire file has previously been cached as a whole. If it has been cached in fragments, + /// we have no way of knowing whether it is complete or not ATM. pub async fn get_whole(&self, path: &String) -> Option { if let Some(file_cache) = self.cache.get(path).await { if let FileContentCache::Complete(bytes) = &*file_cache.content.read().await { @@ -61,6 +66,7 @@ impl DataCache { None } + /// Caches an entire data file. pub async fn set_whole(&self, path: &str, bytes: Bytes) { let size = size_of::() + bytes.len(); self.cache @@ -71,6 +77,13 @@ impl DataCache { .await; } + /// Tries to get a range of bytes from the cache of a data file. Depending on what is currently + /// availible, this may return + /// + /// - Hit, if the entire range is availible + /// - PartialHit, if only some of the head and/or the tail is availible + /// - Use [`PartialHit::missing_range`] and [`DataCache::fill_partial_hit`] to resolve this + /// - Miss, if none of the range is availible pub async fn get_range(&self, path: &String, range: Range) -> DataCacheRes { if let Some(file_cache) = self.cache.get(path).await { match &*file_cache.content.read().await { @@ -87,6 +100,7 @@ impl DataCache { } } + /// Caches a fragment of a data file pub async fn set_range(&self, path: &String, range: Range, bytes: Bytes) { if let Some(mut file_cache) = self.cache.get(path).await { let mut file_content_cache = file_cache.content.write().await; @@ -119,6 +133,8 @@ impl DataCache { } } + /// Fills the missing section of a [`PartialHit`] and returns the complete btyes, even if the + /// cached head and/or tail have since been purged from the cache pub async fn fill_partial_hit(&self, partial_hit: PartialHit, missing_bytes: Bytes) -> Bytes { self.set_range( &partial_hit.path, @@ -148,8 +164,10 @@ impl DataCache { } } +/// An atomic reference to a [`DataCache`] pub type DataCacheRef = Arc; +/// Possible results of a cache search for a range of a file #[derive(Debug, PartialEq)] pub enum DataCacheRes { Hit(Bytes), @@ -157,6 +175,7 @@ pub enum DataCacheRes { Miss, } +/// A result of a cache search where some of the head and/or tail of the needed range was in the cache #[derive(Debug, PartialEq)] pub struct PartialHit { path: String, @@ -167,6 +186,7 @@ pub struct PartialHit { } impl PartialHit { + /// Get the range that still needs to be recovered pub fn missing_range(&self) -> Range { self.missing_range.clone() } From abad9fe201ff93fdff5dd7fe23fe099ae8d8f850 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sat, 14 Jun 2025 08:57:20 -0700 Subject: [PATCH 16/17] chore: cargo fmt --- crates/iceberg/src/io/data_cache.rs | 203 ++++++++++++++++++---------- 1 file changed, 134 insertions(+), 69 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index ea2fad24d..1503180ee 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -79,7 +79,7 @@ impl DataCache { /// Tries to get a range of bytes from the cache of a data file. Depending on what is currently /// availible, this may return - /// + /// /// - Hit, if the entire range is availible /// - PartialHit, if only some of the head and/or the tail is availible /// - Use [`PartialHit::missing_range`] and [`DataCache::fill_partial_hit`] to resolve this @@ -343,10 +343,9 @@ mod tests { use bytes::Bytes; - use crate::io::data_cache::PartialHit; - use super::DataCache; use super::DataCacheRes::{Hit, Miss, PartialHit as ParHit}; + use crate::io::data_cache::PartialHit; const MEGS32: u64 = 32 * 1000 * 1000; const TEST_PATH: &str = "/test/path"; @@ -372,7 +371,9 @@ mod tests { ); // shouldn't have an effect, we already have this fully cached and the cache shouldn't have filled and purged it - cache.set_range(&TEST_PATH.to_owned(), 4..8, Bytes::new()).await; + cache + .set_range(&TEST_PATH.to_owned(), 4..8, Bytes::new()) + .await; assert_eq!( Some(TEST_BYTES), @@ -393,19 +394,36 @@ mod tests { assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 9..12).await); assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 20..23).await); - cache.set_range(&TEST_PATH.to_owned(), 7..15, TEST_BYTES.slice(7..15)).await; + cache + .set_range(&TEST_PATH.to_owned(), 7..15, TEST_BYTES.slice(7..15)) + .await; assert_eq!(None, cache.get_whole(&TEST_PATH.to_owned()).await); assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 0..4).await); - assert_eq!(Hit(TEST_BYTES.slice(9..12)), cache.get_range(&TEST_PATH.to_owned(), 9..12).await); + assert_eq!( + Hit(TEST_BYTES.slice(9..12)), + cache.get_range(&TEST_PATH.to_owned(), 9..12).await + ); assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 20..23).await); cache.set_whole(TEST_PATH, TEST_BYTES).await; - assert_eq!(Some(TEST_BYTES), cache.get_whole(&TEST_PATH.to_owned()).await); - assert_eq!(Hit(TEST_BYTES.slice(0..4)), cache.get_range(&TEST_PATH.to_owned(), 0..4).await); - assert_eq!(Hit(TEST_BYTES.slice(9..12)), cache.get_range(&TEST_PATH.to_owned(), 9..12).await); - assert_eq!(Hit(TEST_BYTES.slice(20..23)), cache.get_range(&TEST_PATH.to_owned(), 20..23).await); + assert_eq!( + Some(TEST_BYTES), + cache.get_whole(&TEST_PATH.to_owned()).await + ); + assert_eq!( + Hit(TEST_BYTES.slice(0..4)), + cache.get_range(&TEST_PATH.to_owned(), 0..4).await + ); + assert_eq!( + Hit(TEST_BYTES.slice(9..12)), + cache.get_range(&TEST_PATH.to_owned(), 9..12).await + ); + assert_eq!( + Hit(TEST_BYTES.slice(20..23)), + cache.get_range(&TEST_PATH.to_owned(), 20..23).await + ); } #[tokio::test] @@ -415,63 +433,89 @@ mod tests { assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 5..15).await); assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 10..20).await); - cache.set_range(&TEST_PATH.to_owned(), 3..8, TEST_BYTES.slice(3..8)).await; + cache + .set_range(&TEST_PATH.to_owned(), 3..8, TEST_BYTES.slice(3..8)) + .await; - assert_eq!(ParHit(PartialHit { - path: TEST_PATH.to_owned(), - original_range: 5..15, - missing_range: 8..15, - head_bytes: Some(TEST_BYTES.slice(5..8)), - tail_bytes: None - }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); + assert_eq!( + ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 8..15, + head_bytes: Some(TEST_BYTES.slice(5..8)), + tail_bytes: None + }), + cache.get_range(&TEST_PATH.to_owned(), 5..15).await + ); assert_eq!(Miss, cache.get_range(&TEST_PATH.to_owned(), 10..20).await); - cache.set_range(&TEST_PATH.to_owned(), 15..22, TEST_BYTES.slice(15..22)).await; - - assert_eq!(ParHit(PartialHit { - path: TEST_PATH.to_owned(), - original_range: 5..15, - missing_range: 8..15, - head_bytes: Some(TEST_BYTES.slice(5..8)), - tail_bytes: None - }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); - assert_eq!(ParHit(PartialHit { - path: TEST_PATH.to_owned(), - original_range: 10..20, - missing_range: 10..15, - head_bytes: None, - tail_bytes: Some(TEST_BYTES.slice(15..20)) - }), cache.get_range(&TEST_PATH.to_owned(), 10..20).await); - - cache.set_range(&TEST_PATH.to_owned(), 12..17, TEST_BYTES.slice(12..17)).await; - - assert_eq!(ParHit(PartialHit { - path: TEST_PATH.to_owned(), - original_range: 5..15, - missing_range: 8..12, - head_bytes: Some(TEST_BYTES.slice(5..8)), - tail_bytes: Some(TEST_BYTES.slice(12..15)) - }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); - assert_eq!(ParHit(PartialHit { - path: TEST_PATH.to_owned(), - original_range: 10..20, - missing_range: 10..12, - head_bytes: None, - tail_bytes: Some(TEST_BYTES.slice(12..20)) - }), cache.get_range(&TEST_PATH.to_owned(), 10..20).await); + cache + .set_range(&TEST_PATH.to_owned(), 15..22, TEST_BYTES.slice(15..22)) + .await; + + assert_eq!( + ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 8..15, + head_bytes: Some(TEST_BYTES.slice(5..8)), + tail_bytes: None + }), + cache.get_range(&TEST_PATH.to_owned(), 5..15).await + ); + assert_eq!( + ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 10..20, + missing_range: 10..15, + head_bytes: None, + tail_bytes: Some(TEST_BYTES.slice(15..20)) + }), + cache.get_range(&TEST_PATH.to_owned(), 10..20).await + ); + + cache + .set_range(&TEST_PATH.to_owned(), 12..17, TEST_BYTES.slice(12..17)) + .await; + + assert_eq!( + ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 8..12, + head_bytes: Some(TEST_BYTES.slice(5..8)), + tail_bytes: Some(TEST_BYTES.slice(12..15)) + }), + cache.get_range(&TEST_PATH.to_owned(), 5..15).await + ); + assert_eq!( + ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 10..20, + missing_range: 10..12, + head_bytes: None, + tail_bytes: Some(TEST_BYTES.slice(12..20)) + }), + cache.get_range(&TEST_PATH.to_owned(), 10..20).await + ); } #[tokio::test] async fn cache_partial_hit_fill() { let cache = DataCache::new(MEGS32); - cache.set_range(&TEST_PATH.to_owned(), 3..8, TEST_BYTES.slice(3..8)).await; + cache + .set_range(&TEST_PATH.to_owned(), 3..8, TEST_BYTES.slice(3..8)) + .await; if let ParHit(partial_hit) = cache.get_range(&TEST_PATH.to_owned(), 5..13).await { let missing = partial_hit.missing_range(); let missing = missing.start as usize..missing.end as usize; let missing_bytes = TEST_BYTES.slice(missing); - assert_eq!(TEST_BYTES.slice(5..13), cache.fill_partial_hit(partial_hit, missing_bytes).await); + assert_eq!( + TEST_BYTES.slice(5..13), + cache.fill_partial_hit(partial_hit, missing_bytes).await + ); } else { panic!("not a partial hit :(") } @@ -481,36 +525,57 @@ mod tests { async fn cache_overlapping_ranges() { let cache = DataCache::new(MEGS32); - cache.set_range(&TEST_PATH.to_owned(), 12..18, TEST_BYTES.slice(12..18)).await; - cache.set_range(&TEST_PATH.to_owned(), 10..20, TEST_BYTES.slice(10..20)).await; - cache.set_range(&TEST_PATH.to_owned(), 14..16, TEST_BYTES.slice(14..16)).await; - - assert_eq!(ParHit(PartialHit { - path: TEST_PATH.to_owned(), - original_range: 5..15, - missing_range: 5..10, - head_bytes: None, - tail_bytes: Some(TEST_BYTES.slice(10..15)) - }), cache.get_range(&TEST_PATH.to_owned(), 5..15).await); - - assert_eq!(Hit(TEST_BYTES.slice(11..17)), cache.get_range(&TEST_PATH.to_owned(), 11..17).await) + cache + .set_range(&TEST_PATH.to_owned(), 12..18, TEST_BYTES.slice(12..18)) + .await; + cache + .set_range(&TEST_PATH.to_owned(), 10..20, TEST_BYTES.slice(10..20)) + .await; + cache + .set_range(&TEST_PATH.to_owned(), 14..16, TEST_BYTES.slice(14..16)) + .await; + + assert_eq!( + ParHit(PartialHit { + path: TEST_PATH.to_owned(), + original_range: 5..15, + missing_range: 5..10, + head_bytes: None, + tail_bytes: Some(TEST_BYTES.slice(10..15)) + }), + cache.get_range(&TEST_PATH.to_owned(), 5..15).await + ); + + assert_eq!( + Hit(TEST_BYTES.slice(11..17)), + cache.get_range(&TEST_PATH.to_owned(), 11..17).await + ) } #[tokio::test] async fn cache_partial_fill_ran_out_of_memory() { // enough memory to cache 5 bytes - let size = size_of::() + size_of::, Bytes)>>() + size_of::>() + size_of::() + 5; + let size = size_of::() + + size_of::, Bytes)>>() + + size_of::>() + + size_of::() + + 5; // give a little bit of extra leeway let cache = DataCache::new(size as u64 + 2); - cache.set_range(&TEST_PATH.to_owned(), 10..15, TEST_BYTES.slice(10..15)).await; + cache + .set_range(&TEST_PATH.to_owned(), 10..15, TEST_BYTES.slice(10..15)) + .await; if let ParHit(partial_hit) = cache.get_range(&TEST_PATH.to_owned(), 12..22).await { let missing = partial_hit.missing_range(); let missing = missing.start as usize..missing.end as usize; let missing_bytes = TEST_BYTES.slice(missing); - assert_eq!(TEST_BYTES.slice(12..22), cache.fill_partial_hit(partial_hit, missing_bytes).await); + assert_eq!( + TEST_BYTES.slice(12..22), + cache.fill_partial_hit(partial_hit, missing_bytes).await + ); } else { panic!("not a partial hit :(") } From d517f87343f4d6479fd97724d8b1087a385d93f6 Mon Sep 17 00:00:00 2001 From: Kyteware Date: Sat, 14 Jun 2025 08:58:30 -0700 Subject: [PATCH 17/17] chore: fix typos --- crates/iceberg/src/io/data_cache.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/io/data_cache.rs b/crates/iceberg/src/io/data_cache.rs index 1503180ee..a4c171017 100644 --- a/crates/iceberg/src/io/data_cache.rs +++ b/crates/iceberg/src/io/data_cache.rs @@ -78,12 +78,12 @@ impl DataCache { } /// Tries to get a range of bytes from the cache of a data file. Depending on what is currently - /// availible, this may return + /// available, this may return /// - /// - Hit, if the entire range is availible - /// - PartialHit, if only some of the head and/or the tail is availible + /// - Hit, if the entire range is available + /// - PartialHit, if only some of the head and/or the tail is available /// - Use [`PartialHit::missing_range`] and [`DataCache::fill_partial_hit`] to resolve this - /// - Miss, if none of the range is availible + /// - Miss, if none of the range is available pub async fn get_range(&self, path: &String, range: Range) -> DataCacheRes { if let Some(file_cache) = self.cache.get(path).await { match &*file_cache.content.read().await { @@ -133,7 +133,7 @@ impl DataCache { } } - /// Fills the missing section of a [`PartialHit`] and returns the complete btyes, even if the + /// Fills the missing section of a [`PartialHit`] and returns the complete bytes, even if the /// cached head and/or tail have since been purged from the cache pub async fn fill_partial_hit(&self, partial_hit: PartialHit, missing_bytes: Bytes) -> Bytes { self.set_range(