Skip to content

Commit 8a4bd7f

Browse files
tconley1428cretz
andauthored
Don't retry on grpc message too large errors (#916)
* Don't retry on grpc message too large errors * Update client/src/retry.rs Co-authored-by: Chad Retz <chad@temporal.io> * Add additional error messages, share key, add ApplicationFailureInfo * Refactor cloud tests to a separate test library and CI job * Linting * Allow override of test client, add GRPC test * Formatting * Init integ telem in new constructor * Use same namespace env var as core starter * Simplify cloud test command, no need for runner * Simplify cloud test command, no need for runner --------- Co-authored-by: Chad Retz <chad@temporal.io>
1 parent 1a3b41b commit 8a4bd7f

File tree

9 files changed

+440
-178
lines changed

9 files changed

+440
-178
lines changed

.github/workflows/per-pr.yml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,6 @@ jobs:
7676

7777
integ-tests:
7878
name: Integ tests
79-
env:
80-
TEMPORAL_CLOUD_ADDRESS: https://${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
81-
TEMPORAL_CLOUD_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
82-
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
83-
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
8479
timeout-minutes: 20
8580
strategy:
8681
fail-fast: false
@@ -111,6 +106,31 @@ jobs:
111106
- uses: Swatinem/rust-cache@v2
112107
- run: cargo integ-test
113108

109+
cloud-tests:
110+
if: github.repository_owner == 'temporalio'
111+
name: Cloud tests
112+
env:
113+
TEMPORAL_CLOUD_ADDRESS: https://${{ vars.TEMPORAL_CLIENT_NAMESPACE }}.tmprl.cloud:7233
114+
TEMPORAL_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
115+
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
116+
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
117+
timeout-minutes: 20
118+
runs-on: ubuntu-latest
119+
steps:
120+
- uses: actions/checkout@v4
121+
- uses: dtolnay/rust-toolchain@stable
122+
with:
123+
toolchain: 1.85.0
124+
- name: Install protoc
125+
uses: arduino/setup-protoc@v3
126+
with:
127+
# TODO: Upgrade proto once https://github.com/arduino/setup-protoc/issues/99 is fixed
128+
version: '23.x'
129+
repo-token: ${{ secrets.GITHUB_TOKEN }}
130+
131+
- uses: Swatinem/rust-cache@v2
132+
- run: cargo test --test cloud_tests
133+
114134
docker-integ-tests:
115135
name: Docker integ tests
116136
env:

client/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ use uuid::Uuid;
8686
static CLIENT_NAME_HEADER_KEY: &str = "client-name";
8787
static CLIENT_VERSION_HEADER_KEY: &str = "client-version";
8888
static TEMPORAL_NAMESPACE_HEADER_KEY: &str = "temporal-namespace";
89+
90+
/// Key used to communicate when a GRPC message is too large
91+
pub static MESSAGE_TOO_LARGE_KEY: &str = "message-too-large";
92+
8993
/// The server times out polls after 60 seconds. Set our timeout to be slightly beyond that.
9094
const LONG_POLL_TIMEOUT: Duration = Duration::from_secs(70);
9195
const OTHER_CALL_TIMEOUT: Duration = Duration::from_secs(30);

client/src/retry.rs

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
Client, IsWorkerTaskLongPoll, NamespacedClient, NoRetryOnMatching, Result, RetryConfig,
3-
raw::IsUserLongPoll,
2+
Client, IsWorkerTaskLongPoll, MESSAGE_TOO_LARGE_KEY, NamespacedClient, NoRetryOnMatching,
3+
Result, RetryConfig, raw::IsUserLongPoll,
44
};
55
use backoff::{Clock, SystemClock, backoff::Backoff, exponential::ExponentialBackoff};
66
use futures_retry::{ErrorHandler, FutureRetry, RetryPolicy};
@@ -201,7 +201,11 @@ where
201201
{
202202
type OutError = tonic::Status;
203203

204-
fn handle(&mut self, current_attempt: usize, e: tonic::Status) -> RetryPolicy<tonic::Status> {
204+
fn handle(
205+
&mut self,
206+
current_attempt: usize,
207+
mut e: tonic::Status,
208+
) -> RetryPolicy<tonic::Status> {
205209
// 0 max retries means unlimited retries
206210
if self.max_retries > 0 && current_attempt >= self.max_retries {
207211
return RetryPolicy::ForwardError(e);
@@ -213,6 +217,24 @@ where
213217
}
214218
}
215219

220+
// Short circuit if message is too large - this is not retryable
221+
if e.code() == Code::ResourceExhausted
222+
&& (e
223+
.message()
224+
.starts_with("grpc: received message larger than max")
225+
|| e.message()
226+
.starts_with("grpc: message after decompression larger than max")
227+
|| e.message()
228+
.starts_with("grpc: received message after decompression larger than max"))
229+
{
230+
// Leave a marker so we don't have duplicate detection logic in the workflow
231+
e.metadata_mut().insert(
232+
MESSAGE_TOO_LARGE_KEY,
233+
tonic::metadata::MetadataValue::from(0),
234+
);
235+
return RetryPolicy::ForwardError(e);
236+
}
237+
216238
// Task polls are OK with being cancelled or running into the timeout because there's
217239
// nothing to do but retry anyway
218240
let long_poll_allowed = self.call_type == CallType::TaskLongPoll
@@ -423,6 +445,47 @@ mod tests {
423445
assert_matches!(result, RetryPolicy::ForwardError(_))
424446
}
425447

448+
#[tokio::test]
449+
async fn message_too_large_not_retried() {
450+
let mut err_handler = TonicErrorHandler::new_with_clock(
451+
CallInfo {
452+
call_type: CallType::TaskLongPoll,
453+
call_name: POLL_WORKFLOW_METH_NAME,
454+
retry_cfg: TEST_RETRY_CONFIG,
455+
retry_short_circuit: None,
456+
},
457+
TEST_RETRY_CONFIG,
458+
FixedClock(Instant::now()),
459+
FixedClock(Instant::now()),
460+
);
461+
let result = err_handler.handle(
462+
1,
463+
Status::new(
464+
Code::ResourceExhausted,
465+
"grpc: received message larger than max",
466+
),
467+
);
468+
assert_matches!(result, RetryPolicy::ForwardError(_));
469+
470+
let result = err_handler.handle(
471+
1,
472+
Status::new(
473+
Code::ResourceExhausted,
474+
"grpc: message after decompression larger than max",
475+
),
476+
);
477+
assert_matches!(result, RetryPolicy::ForwardError(_));
478+
479+
let result = err_handler.handle(
480+
1,
481+
Status::new(
482+
Code::ResourceExhausted,
483+
"grpc: received message after decompression larger than max",
484+
),
485+
);
486+
assert_matches!(result, RetryPolicy::ForwardError(_));
487+
}
488+
426489
#[rstest::rstest]
427490
#[tokio::test]
428491
async fn task_poll_retries_forever<R>(

core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ name = "global_metric_tests"
120120
path = "../tests/global_metric_tests.rs"
121121
test = false
122122

123+
[[test]]
124+
name = "cloud_tests"
125+
path = "../tests/cloud_tests.rs"
126+
test = false
127+
123128
[[bench]]
124129
name = "workflow_replay"
125130
harness = false

0 commit comments

Comments
 (0)