Skip to content

Commit 92692b4

Browse files
Call shutdown RPC on worker shutdown (#2264)
Call shutdownWorker on worker shutdown
1 parent 393045d commit 92692b4

File tree

4 files changed

+89
-25
lines changed

4 files changed

+89
-25
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121
package io.temporal.internal.worker;
2222

23+
import com.google.common.util.concurrent.ListenableFuture;
24+
import io.grpc.Status;
25+
import io.grpc.StatusRuntimeException;
26+
import io.temporal.api.workflowservice.v1.ShutdownWorkerResponse;
2327
import java.io.Closeable;
2428
import java.time.Duration;
2529
import java.util.concurrent.*;
@@ -119,6 +123,33 @@ private CompletableFuture<Void> limitedWait(
119123
return future;
120124
}
121125

126+
/**
127+
* Wait for {@code shutdownRequest} to finish. shutdownRequest is considered best effort, so we do
128+
* not fail the shutdown if it fails.
129+
*/
130+
public CompletableFuture<Void> waitOnWorkerShutdownRequest(
131+
ListenableFuture<ShutdownWorkerResponse> shutdownRequest) {
132+
CompletableFuture<Void> future = new CompletableFuture<>();
133+
shutdownRequest.addListener(
134+
() -> {
135+
try {
136+
shutdownRequest.get();
137+
} catch (StatusRuntimeException e) {
138+
// If the server does not support shutdown, ignore the exception
139+
if (Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) {
140+
return;
141+
}
142+
log.warn("failed to call shutdown worker", e);
143+
} catch (Exception e) {
144+
log.warn("failed to call shutdown worker", e);
145+
} finally {
146+
future.complete(null);
147+
}
148+
},
149+
scheduledExecutorService);
150+
return future;
151+
}
152+
122153
@Override
123154
public void close() {
124155
scheduledExecutorService.shutdownNow();

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.slf4j.MDC;
5757

5858
final class WorkflowWorker implements SuspendableWorker {
59+
private static final String GRACEFUL_SHUTDOWN_MESSAGE = "graceful shutdown";
5960
private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
6061

6162
private final WorkflowRunLockManager runLocks;
@@ -162,30 +163,49 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
162163
&& stickyTaskQueueName != null
163164
&& stickyQueueBalancer != null;
164165

165-
return CompletableFuture.completedFuture(null)
166-
.thenCompose(
167-
ignore ->
168-
stickyQueueBalancerDrainEnabled
169-
? shutdownManager.waitForStickyQueueBalancer(
170-
stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
171-
: CompletableFuture.completedFuture(null))
172-
.thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks))
173-
.thenCompose(
174-
ignore ->
175-
!interruptTasks
176-
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
177-
slotSupplier, supplierName)
178-
: CompletableFuture.completedFuture(null))
179-
.thenCompose(
180-
ignore ->
181-
pollTaskExecutor != null
182-
? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
183-
: CompletableFuture.completedFuture(null))
184-
.exceptionally(
185-
e -> {
186-
log.error("Unexpected exception during shutdown", e);
187-
return null;
188-
});
166+
CompletableFuture<Void> pollerShutdown =
167+
CompletableFuture.completedFuture(null)
168+
.thenCompose(
169+
ignore ->
170+
stickyQueueBalancerDrainEnabled
171+
? shutdownManager.waitForStickyQueueBalancer(
172+
stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
173+
: CompletableFuture.completedFuture(null))
174+
.thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks));
175+
return CompletableFuture.allOf(
176+
pollerShutdown.thenCompose(
177+
ignore -> {
178+
if (!interruptTasks && stickyTaskQueueName != null) {
179+
return shutdownManager.waitOnWorkerShutdownRequest(
180+
service
181+
.futureStub()
182+
.shutdownWorker(
183+
ShutdownWorkerRequest.newBuilder()
184+
.setIdentity(options.getIdentity())
185+
.setNamespace(namespace)
186+
.setStickyTaskQueue(stickyTaskQueueName)
187+
.setReason(GRACEFUL_SHUTDOWN_MESSAGE)
188+
.build()));
189+
}
190+
return CompletableFuture.completedFuture(null);
191+
}),
192+
pollerShutdown
193+
.thenCompose(
194+
ignore ->
195+
!interruptTasks
196+
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
197+
slotSupplier, supplierName)
198+
: CompletableFuture.completedFuture(null))
199+
.thenCompose(
200+
ignore ->
201+
pollTaskExecutor != null
202+
? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
203+
: CompletableFuture.completedFuture(null))
204+
.exceptionally(
205+
e -> {
206+
log.error("Unexpected exception during shutdown", e);
207+
return null;
208+
}));
189209
}
190210

191211
@Override

temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.mockito.ArgumentMatchers.any;
2727
import static org.mockito.Mockito.*;
2828

29+
import com.google.common.util.concurrent.Futures;
2930
import com.google.protobuf.ByteString;
3031
import com.uber.m3.tally.NoopScope;
3132
import com.uber.m3.tally.RootScopeBuilder;
@@ -100,9 +101,15 @@ public void concurrentPollRequestLockTest() throws Exception {
100101
eagerActivityDispatcher,
101102
slotSupplier);
102103

104+
WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub =
105+
mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class);
106+
when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class)))
107+
.thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build()));
108+
103109
WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
104110
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
105111
when(client.blockingStub()).thenReturn(blockingStub);
112+
when(client.futureStub()).thenReturn(futureStub);
106113
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
107114

108115
PollWorkflowTaskQueueResponse pollResponse =
@@ -259,9 +266,15 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception {
259266
eagerActivityDispatcher,
260267
slotSupplier);
261268

269+
WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub =
270+
mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class);
271+
when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class)))
272+
.thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build()));
273+
262274
WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
263275
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
264276
when(client.blockingStub()).thenReturn(blockingStub);
277+
when(client.futureStub()).thenReturn(futureStub);
265278
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
266279

267280
PollWorkflowTaskQueueResponse pollResponse =

0 commit comments

Comments
 (0)