Skip to content

chore(deps): Update tower to v0.5 #23186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ azure_storage_blobs = { version = "0.17", default-features = false, optional = t
opendal = { version = "0.53", default-features = false, features = ["services-webhdfs"], optional = true }

# Tower
tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower = { version = "0.5.2", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower-http = { version = "0.4.4", default-features = false, features = ["compression-full", "decompression-gzip", "trace"] }
# Serde
serde.workspace = true
Expand Down
11 changes: 7 additions & 4 deletions src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ use crate::sinks::{
};

type Svc = Buffer<
ConcurrencyLimit<
Vec<InputLogEvent>,
<ConcurrencyLimit<
RateLimit<
Retry<
FibonacciRetryPolicy<CloudwatchRetryLogic<()>>,
Buffer<Timeout<CloudwatchLogsSvc>, Vec<InputLogEvent>>,
Buffer<
Vec<InputLogEvent>,
<Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
>,
>,
>,
>,
Vec<InputLogEvent>,
> as Service<Vec<InputLogEvent>>>::Future,
>;

#[derive(Debug)]
Expand Down
71 changes: 32 additions & 39 deletions src/sinks/util/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ pub struct FibonacciRetryPolicy<L> {
logic: L,
}

pub struct RetryPolicyFuture<L: RetryLogic> {
pub struct RetryPolicyFuture {
delay: Pin<Box<Sleep>>,
policy: FibonacciRetryPolicy<L>,
}

impl<L: RetryLogic> FibonacciRetryPolicy<L> {
Expand All @@ -102,49 +101,43 @@ impl<L: RetryLogic> FibonacciRetryPolicy<L> {
Duration::from_millis(jitter)
}

fn advance(&self) -> FibonacciRetryPolicy<L> {
let next_duration: Duration = cmp::min(
self.previous_duration + self.current_duration,
self.max_duration,
);

FibonacciRetryPolicy {
remaining_attempts: self.remaining_attempts - 1,
previous_duration: self.current_duration,
current_duration: next_duration,
current_jitter_duration: Self::add_full_jitter(next_duration),
jitter_mode: self.jitter_mode,
max_duration: self.max_duration,
logic: self.logic.clone(),
}
}

const fn backoff(&self) -> Duration {
match self.jitter_mode {
JitterMode::None => self.current_duration,
JitterMode::Full => self.current_jitter_duration,
}
}

fn build_retry(&self) -> RetryPolicyFuture<L> {
let policy = self.advance();
fn advance(&mut self) {
let next_duration: Duration = cmp::min(
self.previous_duration + self.current_duration,
self.max_duration,
);
Comment on lines +112 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let next_duration: Duration = cmp::min(
self.previous_duration + self.current_duration,
self.max_duration,
);
let sum = self.previous_duration.checked_add(self.current_duration)
.unwrap_or(Duration::MAX);
let next_duration = cmp::min(sum, self.max_duration);

Example:

previous_duration = 30s
current_duration = 40s
max_duration = 60s
next_duration = 60s

self.remaining_attempts -= 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.remaining_attempts -= 1;
self.remaining_attempts = self.remaining_attempts.saturating_sub(1);

self.previous_duration = self.current_duration;
self.current_duration = next_duration;
self.current_jitter_duration = Self::add_full_jitter(next_duration);
}

fn build_retry(&mut self) -> RetryPolicyFuture {
self.advance();
let delay = Box::pin(sleep(self.backoff()));

debug!(message = "Retrying request.", delay_ms = %self.backoff().as_millis());
RetryPolicyFuture { delay, policy }
RetryPolicyFuture { delay }
}
}

impl<Req, Res, L> Policy<Req, Res, Error> for FibonacciRetryPolicy<L>
where
Req: Clone,
Req: Clone + 'static,
L: RetryLogic<Response = Res>,
{
type Future = RetryPolicyFuture<L>;
type Future = RetryPolicyFuture;

// NOTE: in the error cases- `Error` and `EventsDropped` internal events are emitted by the
// driver, so only need to log here.
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
match result {
Ok(response) => match self.logic.should_retry_response(response) {
RetryAction::Retry(reason) => {
Expand Down Expand Up @@ -204,21 +197,21 @@ where
}
}

fn clone_request(&self, request: &Req) -> Option<Req> {
fn clone_request(&mut self, request: &Req) -> Option<Req> {
Some(request.clone())
}
}

// Safety: `L` is never pinned and we use no unsafe pin projections
// therefore this safe.
impl<L: RetryLogic> Unpin for RetryPolicyFuture<L> {}
impl Unpin for RetryPolicyFuture {}

impl<L: RetryLogic> Future for RetryPolicyFuture<L> {
type Output = FibonacciRetryPolicy<L>;
impl Future for RetryPolicyFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
std::task::ready!(self.delay.poll_unpin(cx));
Poll::Ready(self.policy.clone())
Poll::Ready(())
}
}

Expand Down Expand Up @@ -421,25 +414,25 @@ mod tests {
);
assert_eq!(Duration::from_secs(1), policy.backoff());

policy = policy.advance();
policy.advance();
assert_eq!(Duration::from_secs(1), policy.backoff());

policy = policy.advance();
policy.advance();
assert_eq!(Duration::from_secs(2), policy.backoff());

policy = policy.advance();
policy.advance();
assert_eq!(Duration::from_secs(3), policy.backoff());

policy = policy.advance();
policy.advance();
assert_eq!(Duration::from_secs(5), policy.backoff());

policy = policy.advance();
policy.advance();
assert_eq!(Duration::from_secs(8), policy.backoff());

policy = policy.advance();
policy.advance();
assert_eq!(Duration::from_secs(10), policy.backoff());

policy = policy.advance();
policy.advance();
assert_eq!(Duration::from_secs(10), policy.backoff());
}

Expand Down Expand Up @@ -469,7 +462,7 @@ mod tests {
backoff
);

policy = policy.advance();
policy.advance();
}

// Once the max backoff is reached, it should not exceed the max backoff.
Expand All @@ -482,7 +475,7 @@ mod tests {
backoff
);

policy = policy.advance();
policy.advance();
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/sinks/util/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pub type TowerPartitionSink<S, B, RL, K> = PartitionBatchSink<Svc<S, RL>, B, K>;

// Distributed service types
pub type DistributedService<S, RL, HL, K, Req> = RateLimit<
Retry<FibonacciRetryPolicy<RL>, Buffer<Balance<DiscoveryService<S, RL, HL, K>, Req>, Req>>,
Retry<
FibonacciRetryPolicy<RL>,
Buffer<Req, <Balance<DiscoveryService<S, RL, HL, K>, Req> as Service<Req>>::Future>,
>,
>;
pub type DiscoveryService<S, RL, HL, K> =
BoxStream<'static, Result<Change<K, SingleDistributedService<S, RL, HL>>, crate::Error>>;
Expand Down Expand Up @@ -352,7 +355,7 @@ impl TowerRequestSettings {
)
})
.enumerate()
.map(|(i, service)| Ok(Change::Insert(i, service)))
.map(|(i, service)| Ok::<_, S::Error>(Change::Insert(i, service)))
.collect::<Vec<_>>();

// Build sink service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@ mode = [0, 6]
mean = [2.2, 5.0]

[stats.rate]
max = [52, 62]
max = [52, 63]
mean = [20, 44]

[controller.in_flight]
max = [5, 39]
mode = [4, 27]
max = [5, 41]
mode = [4, 29]
mean = [3.5, 26]

[controller.concurrency_limit]
max = [6, 39]
mode = [2, 27]
mean = [4.0, 26.5]
max = [6, 41]
mode = [2, 29]
mean = [4.0, 29]

[controller.observed_rtt]
min = [0.100, 0.102]
max = [0.100, 12.7]
max = [0.100, 19.7]
mean = [0.100, 5.7]

[controller.averaged_rtt]
min = [0.100, 0.102]
max = [0.100, 7.0]
max = [0.100, 9.0]
mean = [0.100, 3.2]

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mean = [31, 54]

[controller.in_flight]
max = [13, 85]
mean = [7.0, 40]
mean = [7.0, 41]
mode = [1, 11]

[controller.concurrency_limit]
Expand Down
Loading