Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,15 +806,14 @@ mod tests {
};

let mut get_failures = HashMap::new();
// We only have one request with backpressure, so we are going to inject the failure at
// 2nd read from that request stream.
get_failures.insert(1, Ok((2, MockClientError(err_value.to_owned().into()))));

// Object needs to be bigger than a part size in order to trigger the failure
// because the CRT data returns in chunks of part size.
let object_size = config.client_part_size + 111;
get_failures.insert(
2,
Err(ObjectClientError::ClientError(MockClientError(
err_value.to_owned().into(),
))),
);

fail_sequential_read_test(part_stream, object_size as u64, 1024 * 1024, config, get_failures);
fail_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
}

proptest! {
Expand Down
9 changes: 5 additions & 4 deletions mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl BackpressureController {
to_increase,
"incrementing read window"
);
self.increment_read_window(to_increase).await?;
self.increment_read_window(to_increase).await;
self.read_window_end_offset = new_read_window_end_offset;
}
}
Expand All @@ -122,12 +122,13 @@ impl BackpressureController {
}

// Send an increment read window request to the stream producer
async fn increment_read_window<E>(&self, len: usize) -> Result<(), PrefetchReadError<E>> {
async fn increment_read_window(&self, len: usize) {
// This should not block since the channel is unbounded
self.read_window_updater
let _ = self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we are not returning the error, I'd rather use an explicit if let Err.. to trace instead of inspect_err. Not blocking on this though.

.read_window_updater
.send(len)
.await
.map_err(|_| PrefetchReadError::ReadWindowIncrement)
.inspect_err(|_| trace!("read window incrementing queue is already closed"));
}

fn remaining_window(&self) -> usize {
Expand Down
52 changes: 10 additions & 42 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ where
match self.cache.get_block(cache_key, block_index, block_offset) {
Ok(Some(block)) => {
trace!(?cache_key, ?range, block_index, "cache hit");
let part = make_part(block, block_index, block_offset, block_size, cache_key, &range);
// Cache blocks always contain bytes in the request range
let part = try_make_part(&block, block_offset, cache_key, &range).unwrap();
part_queue_producer.push(Ok(part));
block_offset += block_size;

Expand Down Expand Up @@ -182,11 +183,10 @@ where
assert!(block_size > 0);

// Always request a range aligned with block boundaries (or to the end of the object).
let block_aligned_byte_range =
(block_range.start * block_size)..(block_range.end * block_size).min(range.object_size() as u64);
let request_len = (block_aligned_byte_range.end - block_aligned_byte_range.start) as usize;
let block_aligned_byte_range =
RequestRange::new(range.object_size(), block_aligned_byte_range.start, request_len);
let start_offset = block_range.start * block_size;
let end_offset = (block_range.end * block_size).min(range.object_size() as u64);
let request_len = (end_offset - start_offset) as usize;
let block_aligned_byte_range = RequestRange::new(range.object_size(), start_offset, request_len);

trace!(
key = cache_key.key(),
Expand All @@ -213,9 +213,7 @@ where
buffer: ChecksummedBytes::default(),
cache: self.cache.clone(),
};
let part_composer_future = part_composer.try_compose_parts(request_stream);
part_composer_future.await;
part_composer.flush();
part_composer.try_compose_parts(request_stream).await;
}

fn block_indices_for_byte_range(&self, range: &RequestRange) -> Range<BlockIndex> {
Expand Down Expand Up @@ -319,19 +317,10 @@ where
self.buffer = ChecksummedBytes::default();
}
}
Ok(())
}

/// Flush remaining data in the buffer to the cache. This can be called to write the last
/// block for the object.
fn flush(self) {
let block_size = self.cache.block_size();
if !self.buffer.is_empty() {
assert!(
self.buffer.len() < block_size as usize,
"buffer should be flushed when we get a full block"
);
// The last block for the object can be smaller than block_size (and ends at the end of the object).
// If we still have data in the buffer, this must be the last block for this object,
// which can be smaller than block_size (and ends at the end of the object).
assert_eq!(
self.block_offset as usize + self.buffer.len(),
self.original_range.object_size(),
Expand All @@ -346,6 +335,7 @@ where
&self.cache_key,
);
}
Ok(())
}
}

Expand Down Expand Up @@ -384,28 +374,6 @@ fn try_make_part(bytes: &ChecksummedBytes, offset: u64, object_id: &ObjectId, ra
))
}

/// Creates a Part that can be streamed to the prefetcher from the given cache block.
/// If required, trims the block bytes to the request range.
fn make_part(
block: ChecksummedBytes,
block_index: u64,
block_offset: u64,
block_size: u64,
cache_key: &ObjectId,
range: &RequestRange,
) -> Part {
assert_eq!(block_offset, block_index * block_size, "invalid block offset");
trace!(
?cache_key,
block_index,
block_offset,
block_size,
"creating part from block data",
);
// Cache blocks always contain bytes in the request range
try_make_part(&block, block_offset, cache_key, range).unwrap()
}

#[cfg(test)]
mod tests {
// It's convenient to write test constants like "1 * 1024 * 1024" for symmetry
Expand Down
11 changes: 7 additions & 4 deletions mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ where
object_id: config.object_id,
preferred_part_size: config.preferred_part_size,
};
let part_composer_future = part_composer.try_compose_parts(request_stream);
part_composer_future.await;
part_composer.try_compose_parts(request_stream).await;
}
.instrument(span),
)
Expand Down Expand Up @@ -336,7 +335,7 @@ fn read_from_request<'a, Client: ObjectClient + 'a>(
) -> impl Stream<Item = RequestReaderOutput<Client::ClientError>> + 'a {
try_stream! {
let request = client
.get_object(&bucket, id.key(), Some(request_range), Some(id.etag().clone()))
.get_object(&bucket, id.key(), Some(request_range.clone()), Some(id.etag().clone()))
.await
.inspect_err(|e| error!(key=id.key(), error=?e, "GetObject request failed"))
.map_err(PrefetchReadError::GetRequestFailed)?;
Expand All @@ -357,8 +356,12 @@ fn read_from_request<'a, Client: ObjectClient + 'a>(
metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64);
yield(offset, body);

// Blocks if read window increment if it's not enough to read the next offset
let next_offset = offset + length;
// We are reaching the end so don't have to wait for more read window
if next_offset == request_range.end {
break;
}
// Blocks if read window increment if it's not enough to read the next offset
if let Some(next_read_window_offset) = backpressure_limiter.wait_for_read_window_increment(next_offset).await? {
let diff = next_read_window_offset.saturating_sub(request.as_ref().read_window_end_offset()) as usize;
request.as_mut().increment_read_window(diff);
Expand Down