Skip to content

Commit 121bb95

Browse files
authored
refactor(query): don't retry transform on each block (#16235)
* refactor(query): don't retry transform on each block * refactor(query): don't retry transform on each block
1 parent 9e22255 commit 121bb95

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

src/query/pipeline/transforms/src/processors/transforms/transform_retry_async.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@ pub struct RetryStrategy {
3030
}
3131

3232
pub struct AsyncRetryWrapper<T: AsyncRetry + 'static> {
33+
retries: usize,
3334
t: T,
3435
}
3536

3637
impl<T: AsyncRetry + 'static> AsyncRetryWrapper<T> {
3738
pub fn create(inner: T) -> Self {
38-
Self { t: inner }
39+
Self {
40+
t: inner,
41+
retries: 0,
42+
}
3943
}
4044
}
4145

@@ -45,14 +49,14 @@ impl<T: AsyncRetry + 'static> AsyncTransform for AsyncRetryWrapper<T> {
4549

4650
async fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
4751
let strategy = self.t.retry_strategy();
48-
for i in 0..strategy.retry_times {
52+
while self.retries < strategy.retry_times {
4953
match self.t.transform(data.clone()).await {
5054
Ok(v) => return Ok(v),
5155
Err(e) => {
5256
// Add log to know which error is retrying
5357
info!(
5458
"Retry {} times for transform {} error: {:?}",
55-
i,
59+
self.retries,
5660
Self::NAME,
5761
e
5862
);
@@ -62,6 +66,7 @@ impl<T: AsyncRetry + 'static> AsyncTransform for AsyncRetryWrapper<T> {
6266
if let Some(duration) = strategy.retry_sleep_duration {
6367
tokio::time::sleep(duration).await;
6468
}
69+
self.retries += 1;
6570
}
6671
}
6772
}

0 commit comments

Comments
 (0)