Skip to content

Commit ead142e

Browse files
authored
💥 [Breaking] Asyncify slot suppliers (#2433)
1 parent 6c4c183 commit ead142e

16 files changed

+539
-106
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,22 +95,21 @@ public ActivityTask poll() {
9595
}
9696
PollActivityTaskQueueResponse response;
9797
SlotPermit permit;
98+
SlotSupplierFuture future;
9899
boolean isSuccessful = false;
99-
100100
try {
101-
permit =
101+
future =
102102
slotSupplier.reserveSlot(
103103
new SlotReservationData(
104104
pollRequest.getTaskQueue().getName(),
105105
pollRequest.getIdentity(),
106106
pollRequest.getWorkerVersionCapabilities().getBuildId()));
107-
} catch (InterruptedException e) {
108-
Thread.currentThread().interrupt();
109-
return null;
110107
} catch (Exception e) {
111108
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
112109
return null;
113110
}
111+
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
112+
if (permit == null) return null;
114113

115114
try {
116115
response =

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.worker.tuning.LocalActivitySlotInfo;
2424
import io.temporal.worker.tuning.SlotPermit;
2525
import io.temporal.worker.tuning.SlotReleaseReason;
26+
import io.temporal.worker.tuning.SlotSupplierFuture;
2627
import io.temporal.workflow.Functions;
2728
import java.util.concurrent.*;
2829
import javax.annotation.Nullable;
@@ -83,22 +84,30 @@ private void processQueue() {
8384
QueuedLARequest request = null;
8485
try {
8586
request = requestQueue.take();
87+
88+
SlotSupplierFuture future = slotSupplier.reserveSlot(request.data);
8689
try {
87-
slotPermit = slotSupplier.reserveSlot(request.data);
90+
slotPermit = future.get();
8891
} catch (InterruptedException e) {
92+
SlotPermit maybePermitAnyway = future.abortReservation();
93+
if (maybePermitAnyway != null) {
94+
slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), maybePermitAnyway);
95+
}
8996
Thread.currentThread().interrupt();
9097
return;
91-
} catch (Exception e) {
98+
} catch (ExecutionException e) {
9299
log.error(
93100
"Error reserving local activity slot, dropped activity id {}",
94101
request.task.getActivityId(),
95102
e);
96103
continue;
97104
}
105+
98106
request.task.getExecutionContext().setPermit(slotPermit);
99107
afterReservedCallback.apply(request.task);
100108
} catch (InterruptedException e) {
101109
Thread.currentThread().interrupt();
110+
return;
102111
} catch (Throwable e) {
103112
// Fail the workflow task if something went wrong executing the local activity (at the
104113
// executor level, otherwise, the LA handler itself should be handling errors)
@@ -112,6 +121,11 @@ private void processQueue() {
112121
LocalActivityResult.processingFailed(
113122
executionContext.getActivityId(), request.task.getAttemptTask().getAttempt(), e));
114123
}
124+
if (e.getCause() instanceof InterruptedException) {
125+
// It's possible the interrupt happens inside the callback, so check that as well.
126+
Thread.currentThread().interrupt();
127+
return;
128+
}
115129
}
116130
}
117131
}
@@ -162,11 +176,9 @@ public boolean isTerminated() {
162176
@Override
163177
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
164178
running = false;
165-
if (requestQueue.isEmpty()) {
166-
// Just interrupt the thread, so that if we're waiting on blocking take the thread will
167-
// be interrupted and exit. Otherwise the loop will exit once the queue is empty.
168-
queueThreadService.shutdownNow();
169-
}
179+
// Always interrupt. This won't cause any *tasks* to be interrupted, since the queue thread is
180+
// only responsible for handing them out.
181+
queueThreadService.shutdownNow();
170182

171183
return interruptTasks
172184
? shutdownManager.shutdownExecutorNowUntimed(
@@ -182,6 +194,7 @@ public void awaitTermination(long timeout, TimeUnit unit) {
182194
// timeout duration if no task was ever submitted.
183195
return;
184196
}
197+
185198
ShutdownManager.awaitTermination(queueThreadService, unit.toMillis(timeout));
186199
}
187200
}

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,22 +85,21 @@ public NexusTask poll() {
8585
}
8686
PollNexusTaskQueueResponse response;
8787
SlotPermit permit;
88+
SlotSupplierFuture future;
8889
boolean isSuccessful = false;
89-
9090
try {
91-
permit =
91+
future =
9292
slotSupplier.reserveSlot(
9393
new SlotReservationData(
9494
pollRequest.getTaskQueue().getName(),
9595
pollRequest.getIdentity(),
9696
pollRequest.getWorkerVersionCapabilities().getBuildId()));
97-
} catch (InterruptedException e) {
98-
Thread.currentThread().interrupt();
99-
return null;
10097
} catch (Exception e) {
10198
log.warn("Error while trying to reserve a slot for a nexus task", e.getCause());
10299
return null;
103100
}
101+
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
102+
if (permit == null) return null;
104103

105104
try {
106105
response =

temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import io.temporal.internal.common.GrpcUtils;
2828
import io.temporal.internal.task.VirtualThreadDelegate;
2929
import io.temporal.worker.MetricsType;
30+
import io.temporal.worker.tuning.SlotPermit;
31+
import io.temporal.worker.tuning.SlotReleaseReason;
32+
import io.temporal.worker.tuning.SlotSupplierFuture;
3033
import java.time.Duration;
3134
import java.util.Objects;
3235
import java.util.concurrent.*;
@@ -222,6 +225,25 @@ public WorkerLifecycleState getLifecycleState() {
222225
return WorkerLifecycleState.ACTIVE;
223226
}
224227

228+
static SlotPermit getSlotPermitAndHandleInterrupts(
229+
SlotSupplierFuture future, TrackingSlotSupplier<?> slotSupplier) {
230+
SlotPermit permit;
231+
try {
232+
permit = future.get();
233+
} catch (InterruptedException e) {
234+
SlotPermit maybePermitAnyway = future.abortReservation();
235+
if (maybePermitAnyway != null) {
236+
slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), maybePermitAnyway);
237+
}
238+
Thread.currentThread().interrupt();
239+
return null;
240+
} catch (ExecutionException e) {
241+
log.warn("Error while trying to reserve a slot", e.getCause());
242+
return null;
243+
}
244+
return permit;
245+
}
246+
225247
@Override
226248
public String toString() {
227249
// TODO using pollThreadNamePrefix here is ugly. We should consider introducing some concept of

temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import java.util.Collections;
2727
import java.util.Map;
2828
import java.util.Optional;
29-
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.*;
3030
import java.util.concurrent.atomic.AtomicInteger;
3131

3232
/**
@@ -48,14 +48,20 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
4848
publishSlotsMetric();
4949
}
5050

51-
public SlotPermit reserveSlot(SlotReservationData dat) throws InterruptedException {
52-
SlotPermit p = inner.reserveSlot(createCtx(dat));
53-
issuedSlots.incrementAndGet();
54-
return p;
51+
public SlotSupplierFuture reserveSlot(SlotReservationData data) {
52+
final SlotSupplierFuture future;
53+
try {
54+
future = inner.reserveSlot(createCtx(data));
55+
} catch (Exception e) {
56+
throw new RuntimeException(e);
57+
}
58+
59+
future.thenRun(issuedSlots::incrementAndGet);
60+
return future;
5561
}
5662

57-
public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {
58-
Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(dat));
63+
public Optional<SlotPermit> tryReserveSlot(SlotReservationData data) {
64+
Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(data));
5965
if (p.isPresent()) {
6066
issuedSlots.incrementAndGet();
6167
}

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@
3535
import io.temporal.serviceclient.MetricsTag;
3636
import io.temporal.serviceclient.WorkflowServiceStubs;
3737
import io.temporal.worker.MetricsType;
38-
import io.temporal.worker.tuning.*;
38+
import io.temporal.worker.tuning.SlotPermit;
39+
import io.temporal.worker.tuning.SlotReleaseReason;
40+
import io.temporal.worker.tuning.SlotSupplierFuture;
41+
import io.temporal.worker.tuning.WorkflowSlotInfo;
3942
import java.util.Objects;
4043
import java.util.function.Supplier;
4144
import javax.annotation.Nonnull;
@@ -121,23 +124,24 @@ public WorkflowPollTask(
121124
@Override
122125
@SuppressWarnings("deprecation")
123126
public WorkflowTask poll() {
124-
boolean isSuccessful = false;
125127
SlotPermit permit;
128+
SlotSupplierFuture future;
129+
boolean isSuccessful = false;
126130
try {
127-
permit =
131+
future =
128132
slotSupplier.reserveSlot(
129133
new SlotReservationData(
130134
pollRequest.getTaskQueue().getName(),
131135
pollRequest.getIdentity(),
132136
pollRequest.getWorkerVersionCapabilities().getBuildId()));
133-
} catch (InterruptedException e) {
134-
Thread.currentThread().interrupt();
135-
return null;
136137
} catch (Exception e) {
137-
log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
138+
log.warn("Error while trying to reserve a slot for a workflow", e.getCause());
138139
return null;
139140
}
140141

142+
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
143+
if (permit == null) return null;
144+
141145
TaskQueueKind taskQueueKind = stickyQueueBalancer.makePoll();
142146
boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind);
143147
PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest;

temporal-sdk/src/main/java/io/temporal/worker/tuning/FixedSizeSlotSupplier.java

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
package io.temporal.worker.tuning;
2222

2323
import com.google.common.base.Preconditions;
24+
import java.util.ArrayDeque;
2425
import java.util.Optional;
25-
import java.util.concurrent.*;
26+
import java.util.Queue;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.locks.ReentrantLock;
2629

2730
/**
2831
* This implementation of {@link SlotSupplier} provides a fixed number of slots backed by a
@@ -32,18 +35,89 @@
3235
*/
3336
public class FixedSizeSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
3437
private final int numSlots;
35-
private final Semaphore executorSlotsSemaphore;
38+
private final AsyncSemaphore executorSlotsSemaphore;
39+
40+
/**
41+
* A simple version of an async semaphore. Unfortunately there's not any readily available
42+
* properly licensed library I could find for this which is a bit shocking, but this
43+
* implementation should be suitable for our needs
44+
*/
45+
static class AsyncSemaphore {
46+
private final ReentrantLock lock = new ReentrantLock();
47+
private final Queue<CompletableFuture<Void>> waiters = new ArrayDeque<>();
48+
private int permits;
49+
50+
AsyncSemaphore(int initialPermits) {
51+
this.permits = initialPermits;
52+
}
53+
54+
/**
55+
* Acquire a permit asynchronously. If a permit is available, returns a completed future,
56+
* otherwise returns a future that will be completed when a permit is released.
57+
*/
58+
public CompletableFuture<Void> acquire() {
59+
lock.lock();
60+
try {
61+
if (permits > 0) {
62+
permits--;
63+
return CompletableFuture.completedFuture(null);
64+
} else {
65+
CompletableFuture<Void> waiter = new CompletableFuture<>();
66+
waiters.add(waiter);
67+
return waiter;
68+
}
69+
} finally {
70+
lock.unlock();
71+
}
72+
}
73+
74+
public boolean tryAcquire() {
75+
lock.lock();
76+
try {
77+
if (permits > 0) {
78+
permits--;
79+
return true;
80+
}
81+
return false;
82+
} finally {
83+
lock.unlock();
84+
}
85+
}
86+
87+
/**
88+
* Release a permit. If there are waiting futures, completes the next one instead of
89+
* incrementing the permit count.
90+
*/
91+
public void release() {
92+
lock.lock();
93+
try {
94+
CompletableFuture<Void> waiter = waiters.poll();
95+
if (waiter != null) {
96+
if (!waiter.complete(null) && waiter.isCancelled()) {
97+
// If this waiter was cancelled, we need to release another permit, since this waiter
98+
// is now useless
99+
release();
100+
}
101+
} else {
102+
permits++;
103+
}
104+
} finally {
105+
lock.unlock();
106+
}
107+
}
108+
}
36109

37110
public FixedSizeSlotSupplier(int numSlots) {
38111
Preconditions.checkArgument(numSlots > 0, "FixedSizeSlotSupplier must have at least one slot");
39112
this.numSlots = numSlots;
40-
executorSlotsSemaphore = new Semaphore(numSlots);
113+
executorSlotsSemaphore = new AsyncSemaphore(numSlots);
41114
}
42115

43116
@Override
44-
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
45-
executorSlotsSemaphore.acquire();
46-
return new SlotPermit();
117+
public SlotSupplierFuture reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
118+
CompletableFuture<Void> slotFuture = executorSlotsSemaphore.acquire();
119+
return SlotSupplierFuture.fromCompletableFuture(
120+
slotFuture.thenApply(ignored -> new SlotPermit()), () -> slotFuture.cancel(true));
47121
}
48122

49123
@Override

0 commit comments

Comments
 (0)