Skip to content

Commit 0c6c566

Browse files
Add StickyTaskQueueDrainTimeout (#2019)
Add an option to drain sticky task queue during graceful shutdown.
1 parent f205d1c commit 0c6c566

File tree

8 files changed

+212
-11
lines changed

8 files changed

+212
-11
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,23 @@ public CompletableFuture<Void> waitForSemaphorePermitsReleaseUntimed(
7777
return future;
7878
}
7979

80+
/**
81+
* waitForStickyQueueBalancer -&gt; disableNormalPoll -&gt; timed wait for graceful completion of
82+
* sticky workflows
83+
*/
84+
public CompletableFuture<Void> waitForStickyQueueBalancer(
85+
StickyQueueBalancer balancer, Duration timeout) {
86+
CompletableFuture<Void> future = new CompletableFuture<>();
87+
balancer.disableNormalPoll();
88+
scheduledExecutorService.schedule(
89+
() -> {
90+
future.complete(null);
91+
},
92+
timeout.toMillis(),
93+
TimeUnit.MILLISECONDS);
94+
return future;
95+
}
96+
8097
/**
8198
* Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
8299
* when the executor is terminated.

temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public static final class Builder {
5858
private long defaultDeadlockDetectionTimeout;
5959
private Duration maxHeartbeatThrottleInterval;
6060
private Duration defaultHeartbeatThrottleInterval;
61+
private Duration drainStickyTaskQueueTimeout;
6162

6263
private Builder() {}
6364

@@ -80,6 +81,7 @@ private Builder(SingleWorkerOptions options) {
8081
this.defaultHeartbeatThrottleInterval = options.getDefaultHeartbeatThrottleInterval();
8182
this.buildId = options.getBuildId();
8283
this.useBuildIdForVersioning = options.isUsingBuildIdForVersioning();
84+
this.drainStickyTaskQueueTimeout = options.getDrainStickyTaskQueueTimeout();
8385
}
8486

8587
public Builder setIdentity(String identity) {
@@ -161,6 +163,11 @@ public Builder setUseBuildIdForVersioning(boolean useBuildIdForVersioning) {
161163
return this;
162164
}
163165

166+
public Builder setStickyTaskQueueDrainTimeout(Duration drainStickyTaskQueueTimeout) {
167+
this.drainStickyTaskQueueTimeout = drainStickyTaskQueueTimeout;
168+
return this;
169+
}
170+
164171
public SingleWorkerOptions build() {
165172
PollerOptions pollerOptions = this.pollerOptions;
166173
if (pollerOptions == null) {
@@ -177,6 +184,11 @@ public SingleWorkerOptions build() {
177184
metricsScope = new NoopScope();
178185
}
179186

187+
Duration drainStickyTaskQueueTimeout = this.drainStickyTaskQueueTimeout;
188+
if (drainStickyTaskQueueTimeout == null) {
189+
drainStickyTaskQueueTimeout = Duration.ofSeconds(0);
190+
}
191+
180192
return new SingleWorkerOptions(
181193
this.identity,
182194
this.binaryChecksum,
@@ -192,7 +204,8 @@ public SingleWorkerOptions build() {
192204
this.stickyQueueScheduleToStartTimeout,
193205
this.defaultDeadlockDetectionTimeout,
194206
this.maxHeartbeatThrottleInterval,
195-
this.defaultHeartbeatThrottleInterval);
207+
this.defaultHeartbeatThrottleInterval,
208+
drainStickyTaskQueueTimeout);
196209
}
197210
}
198211

@@ -211,6 +224,7 @@ public SingleWorkerOptions build() {
211224
private final long defaultDeadlockDetectionTimeout;
212225
private final Duration maxHeartbeatThrottleInterval;
213226
private final Duration defaultHeartbeatThrottleInterval;
227+
private final Duration drainStickyTaskQueueTimeout;
214228

215229
private SingleWorkerOptions(
216230
String identity,
@@ -227,7 +241,8 @@ private SingleWorkerOptions(
227241
Duration stickyQueueScheduleToStartTimeout,
228242
long defaultDeadlockDetectionTimeout,
229243
Duration maxHeartbeatThrottleInterval,
230-
Duration defaultHeartbeatThrottleInterval) {
244+
Duration defaultHeartbeatThrottleInterval,
245+
Duration drainStickyTaskQueueTimeout) {
231246
this.identity = identity;
232247
this.binaryChecksum = binaryChecksum;
233248
this.buildId = buildId;
@@ -243,6 +258,7 @@ private SingleWorkerOptions(
243258
this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout;
244259
this.maxHeartbeatThrottleInterval = maxHeartbeatThrottleInterval;
245260
this.defaultHeartbeatThrottleInterval = defaultHeartbeatThrottleInterval;
261+
this.drainStickyTaskQueueTimeout = drainStickyTaskQueueTimeout;
246262
}
247263

248264
public String getIdentity() {
@@ -265,6 +281,10 @@ public boolean isUsingBuildIdForVersioning() {
265281
return useBuildIdForVersioning;
266282
}
267283

284+
public Duration getDrainStickyTaskQueueTimeout() {
285+
return drainStickyTaskQueueTimeout;
286+
}
287+
268288
public DataConverter getDataConverter() {
269289
return dataConverter;
270290
}

temporal-sdk/src/main/java/io/temporal/internal/worker/StickyQueueBalancer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package io.temporal.internal.worker;
2222

2323
import io.temporal.api.enums.v1.TaskQueueKind;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425
import java.util.concurrent.atomic.AtomicInteger;
2526
import javax.annotation.concurrent.ThreadSafe;
2627

@@ -30,6 +31,7 @@ public class StickyQueueBalancer {
3031
private final boolean stickyQueueEnabled;
3132
private final AtomicInteger stickyPollers = new AtomicInteger(0);
3233
private final AtomicInteger normalPollers = new AtomicInteger(0);
34+
private final AtomicBoolean disableNormalPoll = new AtomicBoolean(false);
3335

3436
private volatile long stickyBacklogSize = 0;
3537

@@ -43,6 +45,10 @@ public StickyQueueBalancer(int pollersCount, boolean stickyQueueEnabled) {
4345
*/
4446
public TaskQueueKind makePoll() {
4547
if (stickyQueueEnabled) {
48+
if (disableNormalPoll.get()) {
49+
stickyPollers.incrementAndGet();
50+
return TaskQueueKind.TASK_QUEUE_KIND_STICKY;
51+
}
4652
// If pollersCount >= stickyBacklogSize > 0 we want to go back to a normal ratio to avoid a
4753
// situation that too many pollers (all of them in the worst case) will open only sticky queue
4854
// polls observing a stickyBacklogSize == 1 for example (which actually can be 0 already at
@@ -83,4 +89,12 @@ public void finishPoll(TaskQueueKind taskQueueKind, long backlogSize) {
8389
stickyBacklogSize = backlogSize;
8490
}
8591
}
92+
93+
public void disableNormalPoll() {
94+
disableNormalPoll.set(true);
95+
}
96+
97+
public int getNormalPollerCount() {
98+
return normalPollers.get();
99+
}
86100
}

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public boolean start() {
9898
@Override
9999
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
100100
return shutdownManager
101-
// we want to shutdown heartbeatExecutor before activity worker, so in-flight activities
101+
// we want to shut down heartbeatExecutor before activity worker, so in-flight activities
102102
// could get an ActivityWorkerShutdownException from their heartbeat
103103
.shutdownExecutor(heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5))
104104
.thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks))

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ final class WorkflowWorker implements SuspendableWorker {
7878
// Currently the implementation looks safe without volatile, but it's brittle.
7979
@Nonnull private SuspendableWorker poller = new NoopWorker();
8080

81+
private StickyQueueBalancer stickyQueueBalancer;
82+
8183
public WorkflowWorker(
8284
@Nonnull WorkflowServiceStubs service,
8385
@Nonnull String namespace,
@@ -118,7 +120,7 @@ public boolean start() {
118120
options.getTaskExecutorThreadPoolSize(),
119121
workerMetricsScope,
120122
true);
121-
StickyQueueBalancer stickyQueueBalancer =
123+
stickyQueueBalancer =
122124
new StickyQueueBalancer(
123125
options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
124126

@@ -153,8 +155,21 @@ public boolean start() {
153155
@Override
154156
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
155157
String semaphoreName = this + "#executorSlotsSemaphore";
156-
return poller
157-
.shutdown(shutdownManager, interruptTasks)
158+
159+
boolean stickyQueueBalancerDrainEnabled =
160+
!interruptTasks
161+
&& !options.getDrainStickyTaskQueueTimeout().isZero()
162+
&& stickyTaskQueueName != null
163+
&& stickyQueueBalancer != null;
164+
165+
return CompletableFuture.completedFuture(null)
166+
.thenCompose(
167+
ignore ->
168+
stickyQueueBalancerDrainEnabled
169+
? shutdownManager.waitForStickyQueueBalancer(
170+
stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
171+
: CompletableFuture.completedFuture(null))
172+
.thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks))
158173
.thenCompose(
159174
ignore ->
160175
!interruptTasks

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ private static SingleWorkerOptions toWorkflowWorkerOptions(
543543
PollerOptions.newBuilder().setPollThreadCount(maxConcurrentWorkflowTaskPollers).build())
544544
.setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowTaskExecutionSize())
545545
.setStickyQueueScheduleToStartTimeout(stickyQueueScheduleToStartTimeout)
546+
.setStickyTaskQueueDrainTimeout(options.getStickyTaskQueueDrainTimeout())
546547
.setDefaultDeadlockDetectionTimeout(options.getDefaultDeadlockDetectionTimeout())
547548
.setMetricsScope(metricsScope.tagged(tags))
548549
.build();

temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import com.google.common.base.Preconditions;
2626
import io.temporal.common.Experimental;
27+
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
2728
import java.time.Duration;
2829
import java.util.Objects;
2930
import javax.annotation.Nonnull;
@@ -45,6 +46,8 @@ public static WorkerOptions getDefaultInstance() {
4546

4647
static final Duration DEFAULT_STICKY_SCHEDULE_TO_START_TIMEOUT = Duration.ofSeconds(5);
4748

49+
static final Duration DEFAULT_STICKY_TASK_QUEUE_DRAIN_TIMEOUT = Duration.ofSeconds(0);
50+
4851
private static final WorkerOptions DEFAULT_INSTANCE;
4952

5053
static {
@@ -78,6 +81,7 @@ public static final class Builder {
7881
private boolean disableEagerExecution;
7982
private String buildId;
8083
private boolean useBuildIdForVersioning;
84+
private Duration stickyTaskQueueDrainTimeout;
8185

8286
private Builder() {}
8387

@@ -100,6 +104,7 @@ private Builder(WorkerOptions o) {
100104
this.disableEagerExecution = o.disableEagerExecution;
101105
this.useBuildIdForVersioning = o.useBuildIdForVersioning;
102106
this.buildId = o.buildId;
107+
this.stickyTaskQueueDrainTimeout = o.stickyTaskQueueDrainTimeout;
103108
}
104109

105110
/**
@@ -349,6 +354,22 @@ public Builder setBuildId(String buildId) {
349354
return this;
350355
}
351356

357+
/**
358+
* During graceful shutdown, as when calling {@link WorkerFactory#shutdown()}, if the workflow
359+
* cache is enabled, this timeout controls how long to wait for the sticky task queue to drain
360+
* before shutting down the worker. If set the worker will stop making new poll requests on the
361+
* normal task queue, but will continue to poll the sticky task queue until the timeout is
362+
* reached. This value should always be greater than clients rpc long poll timeout, which can be
363+
* set via {@link WorkflowServiceStubsOptions.Builder#setRpcLongPollTimeout(Duration)}.
364+
*
365+
* <p>Default is not to wait.
366+
*/
367+
@Experimental
368+
public Builder setStickyTaskQueueDrainTimeout(Duration stickyTaskQueueDrainTimeout) {
369+
this.stickyTaskQueueDrainTimeout = stickyTaskQueueDrainTimeout;
370+
return this;
371+
}
372+
352373
public WorkerOptions build() {
353374
return new WorkerOptions(
354375
maxWorkerActivitiesPerSecond,
@@ -365,7 +386,8 @@ public WorkerOptions build() {
365386
stickyQueueScheduleToStartTimeout,
366387
disableEagerExecution,
367388
useBuildIdForVersioning,
368-
buildId);
389+
buildId,
390+
stickyTaskQueueDrainTimeout);
369391
}
370392

371393
public WorkerOptions validateAndBuildWithDefaults() {
@@ -396,6 +418,9 @@ public WorkerOptions validateAndBuildWithDefaults() {
396418
buildId != null && !buildId.isEmpty(),
397419
"buildId must be set non-empty if useBuildIdForVersioning is set true");
398420
}
421+
Preconditions.checkState(
422+
stickyTaskQueueDrainTimeout == null || !stickyTaskQueueDrainTimeout.isNegative(),
423+
"negative stickyTaskQueueDrainTimeout");
399424

400425
return new WorkerOptions(
401426
maxWorkerActivitiesPerSecond,
@@ -430,7 +455,10 @@ public WorkerOptions validateAndBuildWithDefaults() {
430455
: stickyQueueScheduleToStartTimeout,
431456
disableEagerExecution,
432457
useBuildIdForVersioning,
433-
buildId);
458+
buildId,
459+
stickyTaskQueueDrainTimeout == null
460+
? DEFAULT_STICKY_TASK_QUEUE_DRAIN_TIMEOUT
461+
: stickyTaskQueueDrainTimeout);
434462
}
435463
}
436464

@@ -449,6 +477,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
449477
private final boolean disableEagerExecution;
450478
private final boolean useBuildIdForVersioning;
451479
private final String buildId;
480+
private final Duration stickyTaskQueueDrainTimeout;
452481

453482
private WorkerOptions(
454483
double maxWorkerActivitiesPerSecond,
@@ -465,7 +494,8 @@ private WorkerOptions(
465494
@Nonnull Duration stickyQueueScheduleToStartTimeout,
466495
boolean disableEagerExecution,
467496
boolean useBuildIdForVersioning,
468-
String buildId) {
497+
String buildId,
498+
Duration stickyTaskQueueDrainTimeout) {
469499
this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond;
470500
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
471501
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowExecutionSize;
@@ -481,6 +511,7 @@ private WorkerOptions(
481511
this.disableEagerExecution = disableEagerExecution;
482512
this.useBuildIdForVersioning = useBuildIdForVersioning;
483513
this.buildId = buildId;
514+
this.stickyTaskQueueDrainTimeout = stickyTaskQueueDrainTimeout;
484515
}
485516

486517
public double getMaxWorkerActivitiesPerSecond() {
@@ -560,6 +591,10 @@ public String getBuildId() {
560591
return buildId;
561592
}
562593

594+
public Duration getStickyTaskQueueDrainTimeout() {
595+
return stickyTaskQueueDrainTimeout;
596+
}
597+
563598
@Override
564599
public boolean equals(Object o) {
565600
if (this == o) return true;
@@ -579,7 +614,8 @@ && compare(that.maxTaskQueueActivitiesPerSecond, maxTaskQueueActivitiesPerSecond
579614
&& Objects.equals(stickyQueueScheduleToStartTimeout, that.stickyQueueScheduleToStartTimeout)
580615
&& disableEagerExecution == that.disableEagerExecution
581616
&& useBuildIdForVersioning == that.useBuildIdForVersioning
582-
&& Objects.equals(that.buildId, buildId);
617+
&& Objects.equals(that.buildId, buildId)
618+
&& Objects.equals(stickyTaskQueueDrainTimeout, that.stickyTaskQueueDrainTimeout);
583619
}
584620

585621
@Override
@@ -599,7 +635,8 @@ public int hashCode() {
599635
stickyQueueScheduleToStartTimeout,
600636
disableEagerExecution,
601637
useBuildIdForVersioning,
602-
buildId);
638+
buildId,
639+
stickyTaskQueueDrainTimeout);
603640
}
604641

605642
@Override
@@ -635,6 +672,8 @@ public String toString() {
635672
+ useBuildIdForVersioning
636673
+ ", buildId='"
637674
+ buildId
675+
+ ", stickyTaskQueueDrainTimeout='"
676+
+ stickyTaskQueueDrainTimeout
638677
+ '}';
639678
}
640679
}

0 commit comments

Comments
 (0)