generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 219
Re-implement the prefetcher using backpressure mechanism #980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
use std::ops::Range; | ||
|
||
use async_channel::{unbounded, Receiver, Sender}; | ||
use tracing::trace; | ||
|
||
use super::PrefetchReadError; | ||
|
||
#[derive(Debug)] | ||
pub enum BackpressureFeedbackEvent { | ||
/// An event where data with a certain length has been read out of the stream | ||
DataRead(usize), | ||
/// An event indicating part queue stall | ||
PartQueueStall, | ||
} | ||
|
||
pub struct BackpressureConfig { | ||
/// Backpressure's initial read window size | ||
pub initial_read_window_size: usize, | ||
/// Maximum read window size that the backpressure controller is allowed to scale up to | ||
pub max_read_window_size: usize, | ||
/// Factor to increase the read window size by when the part queue is stalled | ||
pub read_window_size_multiplier: usize, | ||
/// Request range to apply backpressure | ||
pub request_range: Range<u64>, | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct BackpressureController { | ||
read_window_updater: Sender<usize>, | ||
preferred_read_window_size: usize, | ||
max_read_window_size: usize, | ||
read_window_size_multiplier: usize, | ||
/// Upper bound of the current read window. The request can return data up to this | ||
/// offset *exclusively*. This value must be advanced to continue fetching new data. | ||
read_window_end_offset: u64, | ||
/// Next offset of the data to be read. It is used for tracking how many bytes of | ||
/// data has been read out of the stream. | ||
next_read_offset: u64, | ||
/// End offset for the request we want to apply backpressure. The request can return | ||
/// data up to this offset *exclusively*. | ||
request_end_offset: u64, | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct BackpressureLimiter { | ||
read_window_incrementing_queue: Receiver<usize>, | ||
/// Upper bound of the current read window. | ||
/// Calling [BackpressureLimiter::wait_for_read_window_increment()] will block current | ||
/// thread until this value is advanced. | ||
read_window_end_offset: u64, | ||
/// End offset for the request we want to apply backpressure. The request can return | ||
/// data up to this offset *exclusively*. | ||
request_end_offset: u64, | ||
} | ||
|
||
/// Creates a [BackpressureController] and its related [BackpressureLimiter]. | ||
/// We use a pair of these to for providing feedback to backpressure stream. | ||
/// | ||
/// [BackpressureLimiter] is used on producer side of the object stream, that is, any | ||
/// [super::part_stream::ObjectPartStream] that support backpressure. The producer can call | ||
/// `wait_for_read_window_increment` to wait for feedback from the consumer. This method | ||
/// could block when they know that the producer requires read window incrementing. | ||
/// | ||
/// [BackpressureController] will be given to the consumer side of the object stream. | ||
/// It can be used anywhere to set preferred read window size for the stream and tell the | ||
/// producer when its read window should be increased. | ||
pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureController, BackpressureLimiter) { | ||
let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64; | ||
let (read_window_updater, read_window_incrementing_queue) = unbounded(); | ||
let controller = BackpressureController { | ||
read_window_updater, | ||
preferred_read_window_size: config.initial_read_window_size, | ||
max_read_window_size: config.max_read_window_size, | ||
read_window_size_multiplier: config.read_window_size_multiplier, | ||
read_window_end_offset, | ||
next_read_offset: config.request_range.start, | ||
request_end_offset: config.request_range.end, | ||
}; | ||
let limiter = BackpressureLimiter { | ||
read_window_incrementing_queue, | ||
read_window_end_offset, | ||
request_end_offset: config.request_range.end, | ||
}; | ||
(controller, limiter) | ||
} | ||
|
||
impl BackpressureController { | ||
pub fn read_window_end_offset(&self) -> u64 { | ||
self.read_window_end_offset | ||
} | ||
|
||
/// Send a feedback to the backpressure controller when reading data out of the stream. The backpressure controller | ||
/// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`. | ||
pub async fn send_feedback<E>(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError<E>> { | ||
match event { | ||
BackpressureFeedbackEvent::DataRead(length) => { | ||
self.next_read_offset += length as u64; | ||
// Increment the read window only if the remaining window reaches some threshold i.e. half of it left. | ||
if self.remaining_window() < (self.preferred_read_window_size / 2) | ||
&& self.read_window_end_offset < self.request_end_offset | ||
{ | ||
let new_read_window_end_offset = self | ||
.next_read_offset | ||
.saturating_add(self.preferred_read_window_size as u64) | ||
.min(self.request_end_offset); | ||
debug_assert!(self.read_window_end_offset < new_read_window_end_offset); | ||
let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; | ||
trace!( | ||
preferred_read_window_size = self.preferred_read_window_size, | ||
next_read_offset = self.next_read_offset, | ||
read_window_end_offset = self.read_window_end_offset, | ||
to_increase, | ||
"incrementing read window" | ||
); | ||
self.increment_read_window(to_increase).await; | ||
self.read_window_end_offset = new_read_window_end_offset; | ||
} | ||
} | ||
BackpressureFeedbackEvent::PartQueueStall => self.try_scaling_up(), | ||
} | ||
Ok(()) | ||
} | ||
|
||
// Send an increment read window request to the stream producer | ||
async fn increment_read_window(&self, len: usize) { | ||
// This should not block since the channel is unbounded | ||
let _ = self | ||
.read_window_updater | ||
.send(len) | ||
.await | ||
.inspect_err(|_| trace!("read window incrementing queue is already closed")); | ||
} | ||
|
||
fn remaining_window(&self) -> usize { | ||
self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize | ||
} | ||
|
||
// Try scaling up preferred read window size with a multiplier configured at initialization. | ||
fn try_scaling_up(&mut self) { | ||
if self.preferred_read_window_size < self.max_read_window_size { | ||
let new_read_window_size = | ||
(self.preferred_read_window_size * self.read_window_size_multiplier).min(self.max_read_window_size); | ||
trace!( | ||
current_size = self.preferred_read_window_size, | ||
new_size = new_read_window_size, | ||
"scaling up preferred read window" | ||
); | ||
self.preferred_read_window_size = new_read_window_size; | ||
} | ||
} | ||
} | ||
|
||
impl BackpressureLimiter { | ||
pub fn read_window_end_offset(&self) -> u64 { | ||
self.read_window_end_offset | ||
} | ||
|
||
/// Checks if there is enough read window to put the next item with a given offset to the stream. | ||
/// It blocks until receiving enough incrementing read window requests to serve the next part. | ||
/// | ||
/// Returns the new read window offset. | ||
pub async fn wait_for_read_window_increment<E>( | ||
&mut self, | ||
offset: u64, | ||
) -> Result<Option<u64>, PrefetchReadError<E>> { | ||
// There is already enough read window so no need to block | ||
if self.read_window_end_offset > offset { | ||
// Check the read window incrementing queue to see there is an early request to increase read window | ||
let new_read_window_offset = if let Ok(len) = self.read_window_incrementing_queue.try_recv() { | ||
self.read_window_end_offset += len as u64; | ||
Some(self.read_window_end_offset) | ||
} else { | ||
None | ||
}; | ||
return Ok(new_read_window_offset); | ||
} | ||
|
||
// Reaching here means there is not enough read window, so we block until it is large enough | ||
while self.read_window_end_offset <= offset && self.read_window_end_offset < self.request_end_offset { | ||
vladem marked this conversation as resolved.
Show resolved
Hide resolved
|
||
trace!( | ||
offset, | ||
read_window_offset = self.read_window_end_offset, | ||
"blocking for read window increment" | ||
); | ||
let recv = self.read_window_incrementing_queue.recv().await; | ||
match recv { | ||
Ok(len) => self.read_window_end_offset += len as u64, | ||
Err(_) => return Err(PrefetchReadError::ReadWindowIncrement), | ||
} | ||
} | ||
Ok(Some(self.read_window_end_offset)) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since we are not returning the error, I'd rather use an explicit
if let Err..
to trace instead ofinspect_err
. Not blocking on this though.