diff --git a/Cargo.lock b/Cargo.lock index ff1cdfe628db0..d1f08f7a65132 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1434,7 +1434,7 @@ dependencies = [ "serde", "sync_wrapper 0.1.2", "tokio", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", ] @@ -1461,7 +1461,7 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 1.0.1", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", ] @@ -4174,7 +4174,7 @@ dependencies = [ "tokio-stream", "tonic 0.11.0", "tonic-build 0.9.2", - "tower", + "tower 0.4.13", ] [[package]] @@ -5551,7 +5551,7 @@ dependencies = [ "thiserror 1.0.68", "tokio", "tokio-util", - "tower", + "tower 0.4.13", "tower-http 0.5.2", "tracing 0.1.41", ] @@ -10678,7 +10678,7 @@ dependencies = [ "tokio", "tokio-rustls 0.25.0", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing 0.1.41", @@ -10709,7 +10709,7 @@ dependencies = [ "socket2 0.5.10", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing 0.1.41", @@ -10761,6 +10761,25 @@ dependencies = [ "tracing 0.1.41", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 2.7.0", + "pin-project-lite", + "slab", + "sync_wrapper 1.0.1", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing 0.1.41", +] + [[package]] name = "tower-http" version = "0.4.4" @@ -10804,15 +10823,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-test" @@ -11600,7 +11619,7 @@ dependencies = [ "toml", "tonic 0.11.0", "tonic-build 0.11.0", - "tower", + "tower 0.5.2", "tower-http 0.4.4", "tower-test", "tracing 0.1.41", @@ -11887,7 +11906,7 @@ dependencies = [ "rand_distr", "tokio", "tokio-util", - "tower", + "tower 0.4.13", "tracing 0.1.41", "twox-hash 2.1.1", "vector-common", diff --git a/Cargo.toml b/Cargo.toml index 3864e4e9306f2..bdea890e1f021 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -265,7 +265,7 @@ azure_storage_blobs = { version = "0.17", default-features = false, optional = t opendal = { version = "0.53", default-features = false, features = ["services-webhdfs"], optional = true } # Tower -tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] } +tower = { version = "0.5.2", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] } tower-http = { version = "0.4.4", default-features = false, features = ["compression-full", "decompression-gzip", "trace"] } # Serde serde.workspace = true diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index f914029dbc35f..371a439199057 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -46,15 +46,18 @@ use crate::sinks::{ }; type Svc = Buffer< - ConcurrencyLimit< + Vec, + >, - Buffer, Vec>, + Buffer< + Vec, + as Service>>::Future, + >, >, >, - >, - Vec, + > as Service>>::Future, >; #[derive(Debug)] diff --git a/src/sinks/util/retries.rs b/src/sinks/util/retries.rs index 32ac8fd251f2e..375928bec89c0 100644 --- a/src/sinks/util/retries.rs +++ b/src/sinks/util/retries.rs @@ -73,9 +73,8 @@ pub struct FibonacciRetryPolicy { logic: L, } -pub struct RetryPolicyFuture { +pub struct RetryPolicyFuture { delay: Pin>, - policy: FibonacciRetryPolicy, } impl FibonacciRetryPolicy { @@ -102,23 +101,6 @@ impl FibonacciRetryPolicy { Duration::from_millis(jitter) } - fn advance(&self) -> FibonacciRetryPolicy { - let next_duration: Duration = cmp::min( - self.previous_duration + self.current_duration, - self.max_duration, - ); - - FibonacciRetryPolicy { - remaining_attempts: self.remaining_attempts - 1, - previous_duration: self.current_duration, - current_duration: next_duration, - current_jitter_duration: Self::add_full_jitter(next_duration), - jitter_mode: self.jitter_mode, - max_duration: self.max_duration, - logic: self.logic.clone(), - } - } - const fn backoff(&self) -> Duration { match self.jitter_mode { JitterMode::None => self.current_duration, @@ -126,25 +108,36 @@ impl FibonacciRetryPolicy { } } - fn build_retry(&self) -> RetryPolicyFuture { - let policy = self.advance(); + fn advance(&mut self) { + let next_duration: Duration = cmp::min( + self.previous_duration + self.current_duration, + self.max_duration, + ); + self.remaining_attempts -= 1; + self.previous_duration = self.current_duration; + self.current_duration = next_duration; + self.current_jitter_duration = Self::add_full_jitter(next_duration); + } + + fn build_retry(&mut self) -> RetryPolicyFuture { + self.advance(); let delay = Box::pin(sleep(self.backoff())); debug!(message = "Retrying request.", delay_ms = %self.backoff().as_millis()); - RetryPolicyFuture { delay, policy } + RetryPolicyFuture { delay } } } impl Policy for FibonacciRetryPolicy where - Req: Clone, + Req: Clone + 'static, L: RetryLogic, { - type Future = RetryPolicyFuture; + type Future = RetryPolicyFuture; // NOTE: in the error cases- `Error` and `EventsDropped` internal events are emitted by the // driver, so only need to log here. - fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { + fn retry(&mut self, _: &mut Req, result: &mut Result) -> Option { match result { Ok(response) => match self.logic.should_retry_response(response) { RetryAction::Retry(reason) => { @@ -204,21 +197,21 @@ where } } - fn clone_request(&self, request: &Req) -> Option { + fn clone_request(&mut self, request: &Req) -> Option { Some(request.clone()) } } // Safety: `L` is never pinned and we use no unsafe pin projections // therefore this safe. -impl Unpin for RetryPolicyFuture {} +impl Unpin for RetryPolicyFuture {} -impl Future for RetryPolicyFuture { - type Output = FibonacciRetryPolicy; +impl Future for RetryPolicyFuture { + type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { std::task::ready!(self.delay.poll_unpin(cx)); - Poll::Ready(self.policy.clone()) + Poll::Ready(()) } } @@ -421,25 +414,25 @@ mod tests { ); assert_eq!(Duration::from_secs(1), policy.backoff()); - policy = policy.advance(); + policy.advance(); assert_eq!(Duration::from_secs(1), policy.backoff()); - policy = policy.advance(); + policy.advance(); assert_eq!(Duration::from_secs(2), policy.backoff()); - policy = policy.advance(); + policy.advance(); assert_eq!(Duration::from_secs(3), policy.backoff()); - policy = policy.advance(); + policy.advance(); assert_eq!(Duration::from_secs(5), policy.backoff()); - policy = policy.advance(); + policy.advance(); assert_eq!(Duration::from_secs(8), policy.backoff()); - policy = policy.advance(); + policy.advance(); assert_eq!(Duration::from_secs(10), policy.backoff()); - policy = policy.advance(); + policy.advance(); assert_eq!(Duration::from_secs(10), policy.backoff()); } @@ -469,7 +462,7 @@ mod tests { backoff ); - policy = policy.advance(); + policy.advance(); } // Once the max backoff is reached, it should not exceed the max backoff. @@ -482,7 +475,7 @@ mod tests { backoff ); - policy = policy.advance(); + policy.advance(); } } diff --git a/src/sinks/util/service.rs b/src/sinks/util/service.rs index 5960250034144..91733c4af019f 100644 --- a/src/sinks/util/service.rs +++ b/src/sinks/util/service.rs @@ -44,7 +44,10 @@ pub type TowerPartitionSink = PartitionBatchSink, B, K>; // Distributed service types pub type DistributedService = RateLimit< - Retry, Buffer, Req>, Req>>, + Retry< + FibonacciRetryPolicy, + Buffer, Req> as Service>::Future>, + >, >; pub type DiscoveryService = BoxStream<'static, Result>, crate::Error>>; @@ -352,7 +355,7 @@ impl TowerRequestSettings { ) }) .enumerate() - .map(|(i, service)| Ok(Change::Insert(i, service))) + .map(|(i, service)| Ok::<_, S::Error>(Change::Insert(i, service))) .collect::>(); // Build sink service diff --git a/tests/data/adaptive-concurrency/defers-at-high-concurrency.toml b/tests/data/adaptive-concurrency/defers-at-high-concurrency.toml index 3401d91c0c5f3..2cc22a0b3041a 100644 --- a/tests/data/adaptive-concurrency/defers-at-high-concurrency.toml +++ b/tests/data/adaptive-concurrency/defers-at-high-concurrency.toml @@ -15,26 +15,26 @@ mode = [0, 6] mean = [2.2, 5.0] [stats.rate] -max = [52, 62] +max = [52, 63] mean = [20, 44] [controller.in_flight] -max = [5, 39] -mode = [4, 27] +max = [5, 41] +mode = [4, 29] mean = [3.5, 26] [controller.concurrency_limit] -max = [6, 39] -mode = [2, 27] -mean = [4.0, 26.5] +max = [6, 41] +mode = [2, 29] +mean = [4.0, 29] [controller.observed_rtt] min = [0.100, 0.102] -max = [0.100, 12.7] +max = [0.100, 19.7] mean = [0.100, 5.7] [controller.averaged_rtt] min = [0.100, 0.102] -max = [0.100, 7.0] +max = [0.100, 9.0] mean = [0.100, 3.2] diff --git a/tests/data/adaptive-concurrency/drops-at-high-concurrency.toml b/tests/data/adaptive-concurrency/drops-at-high-concurrency.toml index 98cc7a637036a..b9033c1333c5c 100644 --- a/tests/data/adaptive-concurrency/drops-at-high-concurrency.toml +++ b/tests/data/adaptive-concurrency/drops-at-high-concurrency.toml @@ -19,7 +19,7 @@ mean = [31, 54] [controller.in_flight] max = [13, 85] -mean = [7.0, 40] +mean = [7.0, 41] mode = [1, 11] [controller.concurrency_limit]