Skip to content

Commit ed211fa

Browse files
Fix exception in GrpcRetryer. (#2021)
* Fix uncaught gRPC exception in retry logic. * Limit retry timeout to deadline; don't query capabilities on health check RPCs. * Oops, replace setMaximumInterval with setExpiration. * Fix formatting. * Back out the chnages to the interceptor, something hinky is happening. * Add tests.
1 parent c6cceca commit ed211fa

File tree

4 files changed

+297
-32
lines changed

4 files changed

+297
-32
lines changed

temporal-serviceclient/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
api "org.slf4j:slf4j-api:$slf4jVersion"
2525

2626
testImplementation project(':temporal-testing')
27+
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
2728
testImplementation "junit:junit:${junitVersion}"
2829
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
2930

temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727
import io.grpc.health.v1.HealthGrpc;
2828
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
2929
import io.grpc.stub.MetadataUtils;
30-
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
30+
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
3131
import io.temporal.internal.retryer.GrpcRetryer;
32+
import io.temporal.internal.retryer.GrpcRetryer.GrpcRetryerOptions;
3233
import java.time.Duration;
3334
import java.util.Collection;
3435
import java.util.List;
@@ -87,7 +88,7 @@ final class ChannelManager {
8788
private final Channel interceptedChannel;
8889
private final HealthGrpc.HealthBlockingStub healthBlockingStub;
8990

90-
private final CompletableFuture<GetSystemInfoResponse.Capabilities> serverCapabilitiesFuture =
91+
private final CompletableFuture<Capabilities> serverCapabilitiesFuture =
9192
new CompletableFuture<>();
9293

9394
public ChannelManager(
@@ -289,8 +290,8 @@ public void connect(String healthCheckServiceName, @Nullable Duration timeout) {
289290
if (timeout == null) {
290291
timeout = options.getRpcTimeout();
291292
}
292-
GrpcRetryer.GrpcRetryerOptions grpcRetryerOptions =
293-
new GrpcRetryer.GrpcRetryerOptions(
293+
GrpcRetryerOptions grpcRetryerOptions =
294+
new GrpcRetryerOptions(
294295
RpcRetryOptions.newBuilder().setExpiration(timeout).validateBuildWithDefaults(), null);
295296

296297
new GrpcRetryer(getServerCapabilities())
@@ -310,30 +311,24 @@ public void connect(String healthCheckServiceName, @Nullable Duration timeout) {
310311
*/
311312
public HealthCheckResponse healthCheck(
312313
String healthCheckServiceName, @Nullable Duration timeout) {
313-
HealthGrpc.HealthBlockingStub stub;
314-
if (timeout != null) {
315-
stub =
316-
this.healthBlockingStub.withDeadline(
317-
Deadline.after(
318-
options.getHealthCheckAttemptTimeout().toMillis(), TimeUnit.MILLISECONDS));
319-
} else {
320-
stub = this.healthBlockingStub;
314+
if (timeout == null) {
315+
timeout = options.getHealthCheckAttemptTimeout();
321316
}
322-
return stub.check(HealthCheckRequest.newBuilder().setService(healthCheckServiceName).build());
317+
return this.healthBlockingStub
318+
.withDeadline(deadlineFrom(timeout))
319+
.check(HealthCheckRequest.newBuilder().setService(healthCheckServiceName).build());
323320
}
324321

325-
public Supplier<GetSystemInfoResponse.Capabilities> getServerCapabilities() {
326-
return () -> {
327-
synchronized (serverCapabilitiesFuture) {
328-
GetSystemInfoResponse.Capabilities capabilities = serverCapabilitiesFuture.getNow(null);
329-
if (capabilities == null) {
330-
serverCapabilitiesFuture.complete(
331-
SystemInfoInterceptor.getServerCapabilitiesOrThrow(interceptedChannel, null));
332-
capabilities = serverCapabilitiesFuture.getNow(null);
333-
}
334-
return capabilities;
335-
}
336-
};
322+
public Supplier<Capabilities> getServerCapabilities() {
323+
return () ->
324+
SystemInfoInterceptor.getServerCapabilitiesWithRetryOrThrow(
325+
serverCapabilitiesFuture,
326+
interceptedChannel,
327+
deadlineFrom(options.getHealthCheckAttemptTimeout()));
328+
}
329+
330+
private static Deadline deadlineFrom(Duration duration) {
331+
return Deadline.after(duration.toMillis(), TimeUnit.MILLISECONDS);
337332
}
338333

339334
public void shutdown() {

temporal-serviceclient/src/main/java/io/temporal/serviceclient/SystemInfoInterceptor.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,22 @@
2323
import io.grpc.*;
2424
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
2525
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
26+
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse.Capabilities;
2627
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
28+
import io.temporal.internal.retryer.GrpcRetryer;
29+
import io.temporal.internal.retryer.GrpcRetryer.GrpcRetryerOptions;
30+
import java.time.Duration;
31+
import java.util.Objects;
2732
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.TimeUnit;
34+
import javax.annotation.Nonnull;
2835
import javax.annotation.Nullable;
2936

3037
public class SystemInfoInterceptor implements ClientInterceptor {
3138

32-
private final CompletableFuture<GetSystemInfoResponse.Capabilities> serverCapabilitiesFuture;
39+
private final CompletableFuture<Capabilities> serverCapabilitiesFuture;
3340

34-
public SystemInfoInterceptor(
35-
CompletableFuture<GetSystemInfoResponse.Capabilities> serverCapabilitiesFuture) {
41+
public SystemInfoInterceptor(CompletableFuture<Capabilities> serverCapabilitiesFuture) {
3642
this.serverCapabilitiesFuture = serverCapabilitiesFuture;
3743
}
3844

@@ -63,8 +69,7 @@ public void onMessage(RespT message) {
6369
@Override
6470
public void onClose(Status status, Metadata trailers) {
6571
if (Status.UNIMPLEMENTED.getCode().equals(status.getCode())) {
66-
serverCapabilitiesFuture.complete(
67-
GetSystemInfoResponse.Capabilities.getDefaultInstance());
72+
serverCapabilitiesFuture.complete(Capabilities.getDefaultInstance());
6873
}
6974
super.onClose(status, trailers);
7075
}
@@ -87,7 +92,39 @@ public void onClose(Status status, Metadata trailers) {
8792
};
8893
}
8994

90-
public static GetSystemInfoResponse.Capabilities getServerCapabilitiesOrThrow(
95+
public static Capabilities getServerCapabilitiesWithRetryOrThrow(
96+
@Nonnull CompletableFuture<Capabilities> future,
97+
@Nonnull Channel channel,
98+
@Nullable Deadline deadline) {
99+
Capabilities capabilities = future.getNow(null);
100+
if (capabilities == null) {
101+
synchronized (Objects.requireNonNull(future)) {
102+
capabilities = future.getNow(null);
103+
if (capabilities == null) {
104+
if (deadline == null) {
105+
deadline = Deadline.after(30, TimeUnit.SECONDS);
106+
}
107+
Deadline computedDeadline = deadline;
108+
RpcRetryOptions rpcRetryOptions =
109+
RpcRetryOptions.newBuilder()
110+
.setExpiration(
111+
Duration.ofMillis(computedDeadline.timeRemaining(TimeUnit.MILLISECONDS)))
112+
.validateBuildWithDefaults();
113+
GrpcRetryerOptions grpcRetryerOptions =
114+
new GrpcRetryerOptions(rpcRetryOptions, computedDeadline);
115+
capabilities =
116+
new GrpcRetryer(Capabilities::getDefaultInstance)
117+
.retryWithResult(
118+
() -> getServerCapabilitiesOrThrow(channel, computedDeadline),
119+
grpcRetryerOptions);
120+
future.complete(capabilities);
121+
}
122+
}
123+
}
124+
return capabilities;
125+
}
126+
127+
public static Capabilities getServerCapabilitiesOrThrow(
91128
Channel channel, @Nullable Deadline deadline) {
92129
try {
93130
return WorkflowServiceGrpc.newBlockingStub(channel)
@@ -96,7 +133,7 @@ public static GetSystemInfoResponse.Capabilities getServerCapabilitiesOrThrow(
96133
.getCapabilities();
97134
} catch (StatusRuntimeException ex) {
98135
if (Status.Code.UNIMPLEMENTED.equals(ex.getStatus().getCode())) {
99-
return GetSystemInfoResponse.Capabilities.getDefaultInstance();
136+
return Capabilities.getDefaultInstance();
100137
}
101138
throw ex;
102139
}

0 commit comments

Comments
 (0)