Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions mountpoint-s3-fs/src/mem_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,10 @@ impl MemoryLimiter {
(self.pool.reserved_bytes(BufferKind::PutObject) + self.pool.reserved_bytes(BufferKind::Other)) as u64
}
}

impl Drop for MemoryLimiter {
fn drop(&mut self) {
let mem_reserved = self.mem_reserved.load(Ordering::SeqCst);
debug_assert_eq!(mem_reserved, 0, "all reservations must be released");
}
}
49 changes: 45 additions & 4 deletions mountpoint-s3-fs/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use tracing::trace;
use crate::checksums::{ChecksummedBytes, IntegrityError};
use crate::data_cache::DataCache;
use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
use crate::mem_limiter::{BufferArea, MemoryLimiter};
use crate::object::ObjectId;
use crate::sync::Arc;

mod backpressure_controller;
mod builder;
Expand Down Expand Up @@ -170,6 +172,7 @@ fn determine_max_read_size() -> usize {
pub struct Prefetcher<Client> {
part_stream: PartStream<Client>,
config: PrefetcherConfig,
mem_limiter: Arc<MemoryLimiter>,
}

impl<Client> Prefetcher<Client>
Expand All @@ -190,16 +193,27 @@ where
}

/// Create a new [Prefetcher] from the given [ObjectPartStream] instance.
pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig) -> Self {
Self { part_stream, config }
pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig, mem_limiter: Arc<MemoryLimiter>) -> Self {
Self {
part_stream,
config,
mem_limiter,
}
}

/// Start a new prefetch request to the specified object.
pub fn prefetch(&self, bucket: String, object_id: ObjectId, size: u64) -> PrefetchGetObject<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
PrefetchGetObject::new(self.part_stream.clone(), self.config, bucket, object_id, size)
PrefetchGetObject::new(
self.part_stream.clone(),
self.config,
bucket,
object_id,
size,
self.mem_limiter.clone(),
)
}
}

Expand All @@ -224,6 +238,7 @@ where
next_sequential_read_offset: u64,
next_request_offset: u64,
size: u64,
mem_limiter: Arc<MemoryLimiter>,
}

impl<Client> PrefetchGetObject<Client>
Expand All @@ -237,19 +252,25 @@ where
bucket: String,
object_id: ObjectId,
size: u64,
mem_limiter: Arc<MemoryLimiter>,
) -> Self {
let max_backward_seek_distance = config.max_backward_seek_distance as usize;
let seek_window_reservation =
Self::seek_window_reservation(part_stream.client().read_part_size(), max_backward_seek_distance);
mem_limiter.reserve(BufferArea::Prefetch, seek_window_reservation);
PrefetchGetObject {
part_stream,
config,
backpressure_task: None,
backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize),
backward_seek_window: SeekWindow::new(max_backward_seek_distance),
preferred_part_size: 128 * 1024,
sequential_read_start_offset: 0,
next_sequential_read_offset: 0,
next_request_offset: 0,
bucket,
object_id,
size,
mem_limiter,
}
}

Expand Down Expand Up @@ -474,13 +495,25 @@ where
histogram!("prefetch.contiguous_read_len")
.record((self.next_sequential_read_offset - self.sequential_read_start_offset) as f64);
}

/// The amount of memory reserved for a backwards seek window.
///
/// The seek window size is rounded up to the nearest multiple of part_size.
fn seek_window_reservation(part_size: usize, seek_window_size: usize) -> u64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: backwards_seek_reservation?

(seek_window_size.div_ceil(part_size) * part_size) as u64
}
}

impl<Client> Drop for PrefetchGetObject<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
let seek_window_reservation = Self::seek_window_reservation(
self.part_stream.client().read_part_size(),
self.backward_seek_window.max_size(),
);
self.mem_limiter.release(BufferArea::Prefetch, seek_window_reservation);
self.record_contiguous_read_metric();
}
}
Expand Down Expand Up @@ -1195,6 +1228,14 @@ mod tests {
}
}

#[test_case(8 * 1024 * 1024, 1 * 1024 * 1024, 8 * 1024 * 1024; "8MiB part_size, 1MiB window")]
#[test_case(1 * 1024 * 1024, 1 * 1024 * 1024, 1 * 1024 * 1024; "equal part_size and window")]
#[test_case(250 * 1024, 1 * 1024 * 1024, 1250 * 1024; "window larger than part_size")]
fn test_seek_window_reservation(part_size: usize, seek_window_size: usize, expected: u64) {
let reservation = PrefetchGetObject::<MockClient>::seek_window_reservation(part_size, seek_window_size);
assert_eq!(reservation, expected);
}

#[cfg(feature = "shuttle")]
mod shuttle_tests {
use super::*;
Expand Down
26 changes: 22 additions & 4 deletions mountpoint-s3-fs/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,13 @@ impl BackpressureController {
/// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`.
pub async fn send_feedback<E>(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError<E>> {
match event {
// Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending
BackpressureFeedbackEvent::DataRead { offset, length } => {
debug_assert!(
offset >= self.next_read_offset,
"reads are always ascending: no feedback on backward seek reads, {}, {}",
offset,
self.next_read_offset,
);
self.next_read_offset = offset + length as u64;
self.mem_limiter.release(BufferArea::Prefetch, length as u64);
let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize;
Expand Down Expand Up @@ -417,9 +422,22 @@ mod tests {
);

// Send more than one increment.
backpressure_controller.increment_read_window(7 * MIB).await;
backpressure_controller.increment_read_window(8 * MIB).await;
backpressure_controller.increment_read_window(8 * MIB).await;
let to_increase = 7 * MIB;
backpressure_controller
.mem_limiter
.reserve(BufferArea::Prefetch, to_increase as u64);
backpressure_controller.increment_read_window(to_increase).await;

let to_increase = 8 * MIB;
backpressure_controller
.mem_limiter
.reserve(BufferArea::Prefetch, to_increase as u64);
backpressure_controller.increment_read_window(to_increase).await;

backpressure_controller
.mem_limiter
.reserve(BufferArea::Prefetch, to_increase as u64);
backpressure_controller.increment_read_window(to_increase).await;

let curr_offset = backpressure_limiter
.wait_for_read_window_increment::<MockClientError>(0)
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3-fs/src/prefetch/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ where
mem_limiter: Arc<MemoryLimiter>,
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client> {
let part_stream = ClientPartStream::new(runtime, self.client, mem_limiter);
Prefetcher::new(PartStream::new(part_stream), prefetcher_config)
let part_stream = ClientPartStream::new(runtime, self.client, mem_limiter.clone());
Prefetcher::new(PartStream::new(part_stream), prefetcher_config, mem_limiter)
}
}

Expand All @@ -98,7 +98,7 @@ where
mem_limiter: Arc<MemoryLimiter>,
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client> {
let part_stream = CachingPartStream::new(runtime, self.client, mem_limiter, self.cache);
Prefetcher::new(PartStream::new(part_stream), prefetcher_config)
let part_stream = CachingPartStream::new(runtime, self.client, mem_limiter.clone(), self.cache);
Prefetcher::new(PartStream::new(part_stream), prefetcher_config, mem_limiter)
}
}
2 changes: 1 addition & 1 deletion mountpoint-s3-fs/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
};
let (backpressure_controller, backpressure_limiter) =
new_backpressure_controller(backpressure_config, self.mem_limiter.clone());
let (part_queue, part_queue_producer) = unbounded_part_queue(self.mem_limiter.clone());
let (part_queue, part_queue_producer) = unbounded_part_queue();
trace!(?range, "spawning request");

let request_task = {
Expand Down
13 changes: 13 additions & 0 deletions mountpoint-s3-fs/src/prefetch/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct Part {
id: ObjectId,
offset: u64,
checksummed_bytes: ChecksummedBytes,
is_backwards_window: bool,
}

impl Part {
Expand All @@ -18,6 +19,7 @@ impl Part {
id,
offset,
checksummed_bytes,
is_backwards_window: false,
}
}

Expand All @@ -41,6 +43,7 @@ impl Part {
id: self.id.clone(),
offset: self.offset + at as u64,
checksummed_bytes: new_bytes,
is_backwards_window: self.is_backwards_window,
}
}

Expand All @@ -56,6 +59,16 @@ impl Part {
self.checksummed_bytes.is_empty()
}

/// Mark part as a backwards window.
pub(super) fn mark_as_backwards_window(&mut self) {
self.is_backwards_window = true;
}

/// Whether this part belongs to a backwards seek window.
pub(super) fn is_backwards_window(&self) -> bool {
self.is_backwards_window
}

fn check(&self, id: &ObjectId, offset: u64) -> Result<(), PartOperationError> {
if self.id != *id {
return Err(PartOperationError::IdMismatch {
Expand Down
16 changes: 2 additions & 14 deletions mountpoint-s3-fs/src/prefetch/part_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Instant;
use mountpoint_s3_client::ObjectClient;
use tracing::trace;

use crate::mem_limiter::{BufferArea, MemoryLimiter};
use crate::sync::Arc;
use crate::sync::async_channel::{Receiver, RecvError, Sender, unbounded};
use crate::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -23,7 +22,6 @@ pub struct PartQueue<Client: ObjectClient> {
failed: bool,
/// The total number of bytes sent to the underlying queue of `self.receiver`
bytes_received: Arc<AtomicUsize>,
mem_limiter: Arc<MemoryLimiter>,
}

/// Producer side of the queue of [Part]s.
Expand All @@ -35,17 +33,14 @@ pub struct PartQueueProducer<E: std::error::Error> {
}

/// Creates an unbounded [PartQueue] and its related [PartQueueProducer].
pub fn unbounded_part_queue<Client: ObjectClient>(
mem_limiter: Arc<MemoryLimiter>,
) -> (PartQueue<Client>, PartQueueProducer<Client::ClientError>) {
pub fn unbounded_part_queue<Client: ObjectClient>() -> (PartQueue<Client>, PartQueueProducer<Client::ClientError>) {
let (sender, receiver) = unbounded();
let bytes_counter = Arc::new(AtomicUsize::new(0));
let part_queue = PartQueue {
front_queue: Vec::new(),
receiver,
failed: false,
bytes_received: Arc::clone(&bytes_counter),
mem_limiter,
};
let part_queue_producer = PartQueueProducer {
sender,
Expand Down Expand Up @@ -103,9 +98,6 @@ impl<Client: ObjectClient> PartQueue<Client> {
assert!(!self.failed, "cannot use a PartQueue after failure");

metrics::gauge!("prefetch.bytes_in_queue").increment(part.len() as f64);
// The backpressure controller is not aware of the parts from backwards seek,
// so we have to reserve memory for them here.
self.mem_limiter.reserve(BufferArea::Prefetch, part.len() as u64);
self.front_queue.push(part);
Ok(())
}
Expand Down Expand Up @@ -152,8 +144,6 @@ impl<Client: ObjectClient> Drop for PartQueue<Client> {
#[cfg(test)]
mod tests {
use crate::checksums::ChecksummedBytes;
use crate::mem_limiter::MINIMUM_MEM_LIMIT;
use crate::memory::PagedPool;
use crate::object::ObjectId;

use super::*;
Expand All @@ -173,10 +163,8 @@ mod tests {
}

async fn run_test(ops: Vec<Op>) {
let pool = PagedPool::new_with_candidate_sizes([1024]);
let mem_limiter = MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT);
let part_id = ObjectId::new("key".to_owned(), ETag::for_tests());
let (mut part_queue, part_queue_producer) = unbounded_part_queue::<MockClient>(mem_limiter.into());
let (mut part_queue, part_queue_producer) = unbounded_part_queue::<MockClient>();
let mut current_offset = 0;
let mut current_length = 0;
for op in ops {
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-fs/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl<Client: ObjectClient + Clone + Send + Sync + 'static> ObjectPartStream<Clie
};
let (backpressure_controller, mut backpressure_limiter) =
new_backpressure_controller(backpressure_config, self.mem_limiter.clone());
let (part_queue, part_queue_producer) = unbounded_part_queue(self.mem_limiter.clone());
let (part_queue, part_queue_producer) = unbounded_part_queue();
trace!(?range, "spawning request");

let span = debug_span!("prefetch", ?range);
Expand Down
8 changes: 7 additions & 1 deletion mountpoint-s3-fs/src/prefetch/seek_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl SeekWindow {

/// Add a new part to the front of the window, and drop any parts necessary to fit the new part
/// within the maximum size.
pub fn push(&mut self, part: Part) {
pub fn push(&mut self, mut part: Part) {
if part.len() > self.max_size {
self.clear();
return;
Expand All @@ -39,6 +39,7 @@ impl SeekWindow {
}

self.current_size += part.len();
part.mark_as_backwards_window();
self.parts.push_back(part);
}

Expand Down Expand Up @@ -75,4 +76,9 @@ impl SeekWindow {
self.parts.drain(..);
self.current_size = 0;
}

/// Return the maximum size of this window
pub fn max_size(&self) -> usize {
self.max_size
}
}
17 changes: 10 additions & 7 deletions mountpoint-s3-fs/src/prefetch/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@ impl<Client: ObjectClient> RequestTask<Client> {
debug_assert!(part.len() <= self.remaining);
self.remaining -= part.len();

// We read some data out of the part queue so the read window should be moved
self.backpressure_controller
.send_feedback(DataRead {
offset: part.offset(),
length: part.len(),
})
.await?;
// We read some data out of the part queue so the read window should be moved, unless this part
// was read from a backwards seek window: such reads never move the read window.
if !part.is_backwards_window() {
self.backpressure_controller
.send_feedback(DataRead {
offset: part.offset(),
length: part.len(),
})
.await?;
}

let next_offset = part.offset() + part.len() as u64;
let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize;
Expand Down
Loading