Skip to content

Commit f828a74

Browse files
authored
Up incoming gRPC message limit / allow override (#693)
1 parent 520040a commit f828a74

File tree

1 file changed

+33
-8
lines changed

1 file changed

+33
-8
lines changed

client/src/lib.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
77
#[macro_use]
88
extern crate tracing;
9+
910
mod metrics;
1011
mod raw;
1112
mod retry;
@@ -316,6 +317,7 @@ impl<C> Deref for ConfiguredClient<C> {
316317
&self.client
317318
}
318319
}
320+
319321
impl<C> DerefMut for ConfiguredClient<C> {
320322
fn deref_mut(&mut self) -> &mut Self::Target {
321323
&mut self.client
@@ -493,6 +495,19 @@ pub struct TemporalServiceClient<T> {
493495
test_svc_client: OnceCell<TestServiceClient<T>>,
494496
health_svc_client: OnceCell<HealthClient<T>>,
495497
}
498+
499+
/// We up the limit on incoming messages from server from the 4Mb default to 128Mb. If for
500+
/// whatever reason this needs to be changed by the user, we support overriding it via env var.
501+
fn get_decode_max_size() -> usize {
502+
static _DECODE_MAX_SIZE: OnceCell<usize> = OnceCell::new();
503+
*_DECODE_MAX_SIZE.get_or_init(|| {
504+
std::env::var("TEMPORAL_MAX_INCOMING_GRPC_BYTES")
505+
.ok()
506+
.and_then(|s| s.parse().ok())
507+
.unwrap_or(128 * 1024 * 1024)
508+
})
509+
}
510+
496511
impl<T> TemporalServiceClient<T>
497512
where
498513
T: Clone,
@@ -512,23 +527,30 @@ where
512527
}
513528
/// Get the underlying workflow service client
514529
pub fn workflow_svc(&self) -> &WorkflowServiceClient<T> {
515-
self.workflow_svc_client
516-
.get_or_init(|| WorkflowServiceClient::new(self.svc.clone()))
530+
self.workflow_svc_client.get_or_init(|| {
531+
WorkflowServiceClient::new(self.svc.clone())
532+
.max_decoding_message_size(get_decode_max_size())
533+
})
517534
}
518535
/// Get the underlying operator service client
519536
pub fn operator_svc(&self) -> &OperatorServiceClient<T> {
520-
self.operator_svc_client
521-
.get_or_init(|| OperatorServiceClient::new(self.svc.clone()))
537+
self.operator_svc_client.get_or_init(|| {
538+
OperatorServiceClient::new(self.svc.clone())
539+
.max_decoding_message_size(get_decode_max_size())
540+
})
522541
}
523542
/// Get the underlying test service client
524543
pub fn test_svc(&self) -> &TestServiceClient<T> {
525-
self.test_svc_client
526-
.get_or_init(|| TestServiceClient::new(self.svc.clone()))
544+
self.test_svc_client.get_or_init(|| {
545+
TestServiceClient::new(self.svc.clone())
546+
.max_decoding_message_size(get_decode_max_size())
547+
})
527548
}
528549
/// Get the underlying health service client
529550
pub fn health_svc(&self) -> &HealthClient<T> {
530-
self.health_svc_client
531-
.get_or_init(|| HealthClient::new(self.svc.clone()))
551+
self.health_svc_client.get_or_init(|| {
552+
HealthClient::new(self.svc.clone()).max_decoding_message_size(get_decode_max_size())
553+
})
532554
}
533555
/// Get the underlying workflow service client mutably
534556
pub fn workflow_svc_mut(&mut self) -> &mut WorkflowServiceClient<T> {
@@ -551,6 +573,7 @@ where
551573
self.health_svc_client.get_mut().unwrap()
552574
}
553575
}
576+
554577
/// A [WorkflowServiceClient] with the default interceptors attached.
555578
pub type WorkflowServiceClientWithMetrics = WorkflowServiceClient<InterceptedMetricsSvc>;
556579
/// An [OperatorServiceClient] with the default interceptors attached.
@@ -1501,6 +1524,7 @@ mod sealed {
15011524
WorkflowClientTrait + RawClientLike<SvcType = InterceptedMetricsSvc>
15021525
{
15031526
}
1527+
15041528
impl<T> WfHandleClient for T where
15051529
T: WorkflowClientTrait + RawClientLike<SvcType = InterceptedMetricsSvc>
15061530
{
@@ -1527,6 +1551,7 @@ pub trait WfClientExt: WfHandleClient + Sized + Clone {
15271551
)
15281552
}
15291553
}
1554+
15301555
impl<T> WfClientExt for T where T: WfHandleClient + Clone + Sized {}
15311556

15321557
#[cfg(test)]

0 commit comments

Comments
 (0)