From 030e0cac474fce6daadd704cc61a1d1c7398b164 Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Fri, 3 Oct 2025 13:03:15 +0000 Subject: [PATCH 1/2] Keep a constant memory reservation for backwards seek for each fh Signed-off-by: Vlad Volodkin --- mountpoint-s3-fs/src/mem_limiter.rs | 7 +++ mountpoint-s3-fs/src/prefetch.rs | 49 +++++++++++++++++-- .../src/prefetch/backpressure_controller.rs | 7 ++- mountpoint-s3-fs/src/prefetch/builder.rs | 8 +-- .../src/prefetch/caching_stream.rs | 2 +- mountpoint-s3-fs/src/prefetch/part.rs | 13 +++++ mountpoint-s3-fs/src/prefetch/part_queue.rs | 16 +----- mountpoint-s3-fs/src/prefetch/part_stream.rs | 2 +- mountpoint-s3-fs/src/prefetch/seek_window.rs | 8 ++- mountpoint-s3-fs/src/prefetch/task.rs | 17 ++++--- 10 files changed, 96 insertions(+), 33 deletions(-) diff --git a/mountpoint-s3-fs/src/mem_limiter.rs b/mountpoint-s3-fs/src/mem_limiter.rs index 48a0be4b5..4f5f78285 100644 --- a/mountpoint-s3-fs/src/mem_limiter.rs +++ b/mountpoint-s3-fs/src/mem_limiter.rs @@ -156,3 +156,10 @@ impl MemoryLimiter { (self.pool.reserved_bytes(BufferKind::PutObject) + self.pool.reserved_bytes(BufferKind::Other)) as u64 } } + +impl Drop for MemoryLimiter { + fn drop(&mut self) { + let mem_reserved = self.mem_reserved.load(Ordering::SeqCst); + debug_assert_eq!(mem_reserved, 0, "all reservations must be released"); + } +} diff --git a/mountpoint-s3-fs/src/prefetch.rs b/mountpoint-s3-fs/src/prefetch.rs index 72ff664fe..4201214b2 100644 --- a/mountpoint-s3-fs/src/prefetch.rs +++ b/mountpoint-s3-fs/src/prefetch.rs @@ -42,7 +42,9 @@ use tracing::trace; use crate::checksums::{ChecksummedBytes, IntegrityError}; use crate::data_cache::DataCache; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT}; +use crate::mem_limiter::{BufferArea, MemoryLimiter}; use crate::object::ObjectId; +use crate::sync::Arc; mod backpressure_controller; mod builder; @@ -170,6 +172,7 @@ fn determine_max_read_size() -> usize { pub struct Prefetcher { part_stream: PartStream, config: PrefetcherConfig, + mem_limiter: Arc, } impl Prefetcher @@ -190,8 +193,12 @@ where } /// Create a new [Prefetcher] from the given [ObjectPartStream] instance. - pub fn new(part_stream: PartStream, config: PrefetcherConfig) -> Self { - Self { part_stream, config } + pub fn new(part_stream: PartStream, config: PrefetcherConfig, mem_limiter: Arc) -> Self { + Self { + part_stream, + config, + mem_limiter, + } } /// Start a new prefetch request to the specified object. @@ -199,7 +206,14 @@ where where Client: ObjectClient + Clone + Send + Sync + 'static, { - PrefetchGetObject::new(self.part_stream.clone(), self.config, bucket, object_id, size) + PrefetchGetObject::new( + self.part_stream.clone(), + self.config, + bucket, + object_id, + size, + self.mem_limiter.clone(), + ) } } @@ -224,6 +238,7 @@ where next_sequential_read_offset: u64, next_request_offset: u64, size: u64, + mem_limiter: Arc, } impl PrefetchGetObject @@ -237,12 +252,17 @@ where bucket: String, object_id: ObjectId, size: u64, + mem_limiter: Arc, ) -> Self { + let max_backward_seek_distance = config.max_backward_seek_distance as usize; + let seek_window_reservation = + Self::seek_window_reservation(part_stream.client().read_part_size(), max_backward_seek_distance); + mem_limiter.reserve(BufferArea::Prefetch, seek_window_reservation); PrefetchGetObject { part_stream, config, backpressure_task: None, - backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), + backward_seek_window: SeekWindow::new(max_backward_seek_distance), preferred_part_size: 128 * 1024, sequential_read_start_offset: 0, next_sequential_read_offset: 0, @@ -250,6 +270,7 @@ where bucket, object_id, size, + mem_limiter, } } @@ -474,6 +495,13 @@ where histogram!("prefetch.contiguous_read_len") .record((self.next_sequential_read_offset - self.sequential_read_start_offset) as f64); } + + /// The amount of memory reserved for a backwards seek window. + /// + /// The seek window size is rounded up to the nearest multiple of part_size. + fn seek_window_reservation(part_size: usize, seek_window_size: usize) -> u64 { + (seek_window_size.div_ceil(part_size) * part_size) as u64 + } } impl Drop for PrefetchGetObject @@ -481,6 +509,11 @@ where Client: ObjectClient + Clone + Send + Sync + 'static, { fn drop(&mut self) { + let seek_window_reservation = Self::seek_window_reservation( + self.part_stream.client().read_part_size(), + self.backward_seek_window.max_size(), + ); + self.mem_limiter.release(BufferArea::Prefetch, seek_window_reservation); self.record_contiguous_read_metric(); } } @@ -1195,6 +1228,14 @@ mod tests { } } + #[test_case(8 * 1024 * 1024, 1 * 1024 * 1024, 8 * 1024 * 1024; "8MiB part_size, 1MiB window")] + #[test_case(1 * 1024 * 1024, 1 * 1024 * 1024, 1 * 1024 * 1024; "equal part_size and window")] + #[test_case(250 * 1024, 1 * 1024 * 1024, 1250 * 1024; "window larger than part_size")] + fn test_seek_window_reservation(part_size: usize, seek_window_size: usize, expected: u64) { + let reservation = PrefetchGetObject::::seek_window_reservation(part_size, seek_window_size); + assert_eq!(reservation, expected); + } + #[cfg(feature = "shuttle")] mod shuttle_tests { use super::*; diff --git a/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs b/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs index cb7a87d03..25123abde 100644 --- a/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs @@ -122,8 +122,13 @@ impl BackpressureController { /// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`. pub async fn send_feedback(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError> { match event { - // Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending BackpressureFeedbackEvent::DataRead { offset, length } => { + debug_assert!( + offset >= self.next_read_offset, + "reads are always ascending: no feedback on backward seek reads, {}, {}", + offset, + self.next_read_offset, + ); self.next_read_offset = offset + length as u64; self.mem_limiter.release(BufferArea::Prefetch, length as u64); let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize; diff --git a/mountpoint-s3-fs/src/prefetch/builder.rs b/mountpoint-s3-fs/src/prefetch/builder.rs index bc50926e3..f386698dd 100644 --- a/mountpoint-s3-fs/src/prefetch/builder.rs +++ b/mountpoint-s3-fs/src/prefetch/builder.rs @@ -77,8 +77,8 @@ where mem_limiter: Arc, prefetcher_config: PrefetcherConfig, ) -> Prefetcher { - let part_stream = ClientPartStream::new(runtime, self.client, mem_limiter); - Prefetcher::new(PartStream::new(part_stream), prefetcher_config) + let part_stream = ClientPartStream::new(runtime, self.client, mem_limiter.clone()); + Prefetcher::new(PartStream::new(part_stream), prefetcher_config, mem_limiter) } } @@ -98,7 +98,7 @@ where mem_limiter: Arc, prefetcher_config: PrefetcherConfig, ) -> Prefetcher { - let part_stream = CachingPartStream::new(runtime, self.client, mem_limiter, self.cache); - Prefetcher::new(PartStream::new(part_stream), prefetcher_config) + let part_stream = CachingPartStream::new(runtime, self.client, mem_limiter.clone(), self.cache); + Prefetcher::new(PartStream::new(part_stream), prefetcher_config, mem_limiter) } } diff --git a/mountpoint-s3-fs/src/prefetch/caching_stream.rs b/mountpoint-s3-fs/src/prefetch/caching_stream.rs index ebf2e6511..7b045fcc5 100644 --- a/mountpoint-s3-fs/src/prefetch/caching_stream.rs +++ b/mountpoint-s3-fs/src/prefetch/caching_stream.rs @@ -60,7 +60,7 @@ where }; let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config, self.mem_limiter.clone()); - let (part_queue, part_queue_producer) = unbounded_part_queue(self.mem_limiter.clone()); + let (part_queue, part_queue_producer) = unbounded_part_queue(); trace!(?range, "spawning request"); let request_task = { diff --git a/mountpoint-s3-fs/src/prefetch/part.rs b/mountpoint-s3-fs/src/prefetch/part.rs index 2e9106dce..d3eb9ad46 100644 --- a/mountpoint-s3-fs/src/prefetch/part.rs +++ b/mountpoint-s3-fs/src/prefetch/part.rs @@ -10,6 +10,7 @@ pub struct Part { id: ObjectId, offset: u64, checksummed_bytes: ChecksummedBytes, + is_backwards_window: bool, } impl Part { @@ -18,6 +19,7 @@ impl Part { id, offset, checksummed_bytes, + is_backwards_window: false, } } @@ -41,6 +43,7 @@ impl Part { id: self.id.clone(), offset: self.offset + at as u64, checksummed_bytes: new_bytes, + is_backwards_window: self.is_backwards_window, } } @@ -56,6 +59,16 @@ impl Part { self.checksummed_bytes.is_empty() } + /// Mark part as a backwards window. + pub(super) fn mark_as_backwards_window(&mut self) { + self.is_backwards_window = true; + } + + /// Whether this part belongs to a backwards seek window. + pub(super) fn is_backwards_window(&self) -> bool { + self.is_backwards_window + } + fn check(&self, id: &ObjectId, offset: u64) -> Result<(), PartOperationError> { if self.id != *id { return Err(PartOperationError::IdMismatch { diff --git a/mountpoint-s3-fs/src/prefetch/part_queue.rs b/mountpoint-s3-fs/src/prefetch/part_queue.rs index e583456d4..8d1fabcc5 100644 --- a/mountpoint-s3-fs/src/prefetch/part_queue.rs +++ b/mountpoint-s3-fs/src/prefetch/part_queue.rs @@ -3,7 +3,6 @@ use std::time::Instant; use mountpoint_s3_client::ObjectClient; use tracing::trace; -use crate::mem_limiter::{BufferArea, MemoryLimiter}; use crate::sync::Arc; use crate::sync::async_channel::{Receiver, RecvError, Sender, unbounded}; use crate::sync::atomic::{AtomicUsize, Ordering}; @@ -23,7 +22,6 @@ pub struct PartQueue { failed: bool, /// The total number of bytes sent to the underlying queue of `self.receiver` bytes_received: Arc, - mem_limiter: Arc, } /// Producer side of the queue of [Part]s. @@ -35,9 +33,7 @@ pub struct PartQueueProducer { } /// Creates an unbounded [PartQueue] and its related [PartQueueProducer]. -pub fn unbounded_part_queue( - mem_limiter: Arc, -) -> (PartQueue, PartQueueProducer) { +pub fn unbounded_part_queue() -> (PartQueue, PartQueueProducer) { let (sender, receiver) = unbounded(); let bytes_counter = Arc::new(AtomicUsize::new(0)); let part_queue = PartQueue { @@ -45,7 +41,6 @@ pub fn unbounded_part_queue( receiver, failed: false, bytes_received: Arc::clone(&bytes_counter), - mem_limiter, }; let part_queue_producer = PartQueueProducer { sender, @@ -103,9 +98,6 @@ impl PartQueue { assert!(!self.failed, "cannot use a PartQueue after failure"); metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64); - // The backpressure controller is not aware of the parts from backwards seek, - // so we have to reserve memory for them here. - self.mem_limiter.reserve(BufferArea::Prefetch, part.len() as u64); self.front_queue.push(part); Ok(()) } @@ -152,8 +144,6 @@ impl Drop for PartQueue { #[cfg(test)] mod tests { use crate::checksums::ChecksummedBytes; - use crate::mem_limiter::MINIMUM_MEM_LIMIT; - use crate::memory::PagedPool; use crate::object::ObjectId; use super::*; @@ -173,10 +163,8 @@ mod tests { } async fn run_test(ops: Vec) { - let pool = PagedPool::new_with_candidate_sizes([1024]); - let mem_limiter = MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT); let part_id = ObjectId::new("key".to_owned(), ETag::for_tests()); - let (mut part_queue, part_queue_producer) = unbounded_part_queue::(mem_limiter.into()); + let (mut part_queue, part_queue_producer) = unbounded_part_queue::(); let mut current_offset = 0; let mut current_length = 0; for op in ops { diff --git a/mountpoint-s3-fs/src/prefetch/part_stream.rs b/mountpoint-s3-fs/src/prefetch/part_stream.rs index 482043dc8..ffabdf229 100644 --- a/mountpoint-s3-fs/src/prefetch/part_stream.rs +++ b/mountpoint-s3-fs/src/prefetch/part_stream.rs @@ -227,7 +227,7 @@ impl ObjectPartStream self.max_size { self.clear(); return; @@ -39,6 +39,7 @@ impl SeekWindow { } self.current_size += part.len(); + part.mark_as_backwards_window(); self.parts.push_back(part); } @@ -75,4 +76,9 @@ impl SeekWindow { self.parts.drain(..); self.current_size = 0; } + + /// Return the maximum size of this window + pub fn max_size(&self) -> usize { + self.max_size + } } diff --git a/mountpoint-s3-fs/src/prefetch/task.rs b/mountpoint-s3-fs/src/prefetch/task.rs index bae5e2fc8..2fc18609f 100644 --- a/mountpoint-s3-fs/src/prefetch/task.rs +++ b/mountpoint-s3-fs/src/prefetch/task.rs @@ -51,13 +51,16 @@ impl RequestTask { debug_assert!(part.len() <= self.remaining); self.remaining -= part.len(); - // We read some data out of the part queue so the read window should be moved - self.backpressure_controller - .send_feedback(DataRead { - offset: part.offset(), - length: part.len(), - }) - .await?; + // We read some data out of the part queue so the read window should be moved, unless this part + // was read from a backwards seek window: such reads never move the read window. + if !part.is_backwards_window() { + self.backpressure_controller + .send_feedback(DataRead { + offset: part.offset(), + length: part.len(), + }) + .await?; + } let next_offset = part.offset() + part.len() as u64; let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize; From cd3e0d53b695a2e0995bc514a6f857e50eb848ed Mon Sep 17 00:00:00 2001 From: Vlad Volodkin Date: Fri, 3 Oct 2025 14:07:31 +0000 Subject: [PATCH 2/2] Fix a test Signed-off-by: Vlad Volodkin --- .../src/prefetch/backpressure_controller.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs b/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs index 25123abde..3b7fcc624 100644 --- a/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3-fs/src/prefetch/backpressure_controller.rs @@ -422,9 +422,22 @@ mod tests { ); // Send more than one increment. - backpressure_controller.increment_read_window(7 * MIB).await; - backpressure_controller.increment_read_window(8 * MIB).await; - backpressure_controller.increment_read_window(8 * MIB).await; + let to_increase = 7 * MIB; + backpressure_controller + .mem_limiter + .reserve(BufferArea::Prefetch, to_increase as u64); + backpressure_controller.increment_read_window(to_increase).await; + + let to_increase = 8 * MIB; + backpressure_controller + .mem_limiter + .reserve(BufferArea::Prefetch, to_increase as u64); + backpressure_controller.increment_read_window(to_increase).await; + + backpressure_controller + .mem_limiter + .reserve(BufferArea::Prefetch, to_increase as u64); + backpressure_controller.increment_read_window(to_increase).await; let curr_offset = backpressure_limiter .wait_for_read_window_increment::(0)