Skip to content

Commit b95322f

Browse files
authored
Reintroduce slot supplier & add many tests (#2143)
1 parent 0ba6188 commit b95322f

File tree

61 files changed

+4225
-356
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+4225
-356
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ jobs:
3333
USE_DOCKER_SERVICE: false
3434
run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest
3535

36+
- name: Run independent resource tuner test
37+
env:
38+
USER: unittest
39+
USE_DOCKER_SERVICE: false
40+
run: ./gradlew --no-daemon temporal-sdk:testResourceIndependent -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest
41+
3642
- name: Publish Test Report
3743
uses: mikepenz/action-junit-report@v4
3844
if: success() || failure() # always run even if the previous step fails

temporal-sdk/build.gradle

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,16 @@ task registerNamespace(type: JavaExec) {
3636
}
3737

3838
test.dependsOn 'registerNamespace'
39+
40+
test {
41+
useJUnit {
42+
excludeCategories 'io.temporal.worker.IndependentResourceBasedTests'
43+
}
44+
}
45+
46+
task testResourceIndependent(type: Test) {
47+
useJUnit {
48+
includeCategories 'io.temporal.worker.IndependentResourceBasedTests'
49+
maxParallelForks = 1
50+
}
51+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.activity;
22+
23+
import io.temporal.activity.ActivityInfo;
24+
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
25+
26+
public class ActivityPollResponseToInfo {
27+
public static ActivityInfo toActivityInfoImpl(
28+
PollActivityTaskQueueResponseOrBuilder response,
29+
String namespace,
30+
String activityTaskQueue,
31+
boolean local) {
32+
return new ActivityInfoImpl(response, namespace, activityTaskQueue, local, null);
33+
}
34+
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import io.temporal.internal.common.ProtobufTimeUtils;
3434
import io.temporal.serviceclient.WorkflowServiceStubs;
3535
import io.temporal.worker.MetricsType;
36+
import io.temporal.worker.tuning.*;
3637
import java.util.Objects;
37-
import java.util.concurrent.Semaphore;
3838
import java.util.function.Supplier;
3939
import javax.annotation.Nonnull;
4040
import javax.annotation.Nullable;
@@ -45,7 +45,7 @@ final class ActivityPollTask implements Poller.PollTask<ActivityTask> {
4545
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
4646

4747
private final WorkflowServiceStubs service;
48-
private final Semaphore pollSemaphore;
48+
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
4949
private final Scope metricsScope;
5050
private final PollActivityTaskQueueRequest pollRequest;
5151

@@ -57,11 +57,11 @@ public ActivityPollTask(
5757
@Nullable String buildId,
5858
boolean useBuildIdForVersioning,
5959
double activitiesPerSecond,
60-
Semaphore pollSemaphore,
60+
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
6161
@Nonnull Scope metricsScope,
6262
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
6363
this.service = Objects.requireNonNull(service);
64-
this.pollSemaphore = pollSemaphore;
64+
this.slotSupplier = slotSupplier;
6565
this.metricsScope = Objects.requireNonNull(metricsScope);
6666

6767
PollActivityTaskQueueRequest.Builder pollRequest =
@@ -92,13 +92,22 @@ public ActivityTask poll() {
9292
log.trace("poll request begin: " + pollRequest);
9393
}
9494
PollActivityTaskQueueResponse response;
95+
SlotPermit permit;
9596
boolean isSuccessful = false;
9697

9798
try {
98-
pollSemaphore.acquire();
99+
permit =
100+
slotSupplier.reserveSlot(
101+
new SlotReservationData(
102+
pollRequest.getTaskQueue().getName(),
103+
pollRequest.getIdentity(),
104+
pollRequest.getWorkerVersionCapabilities().getBuildId()));
99105
} catch (InterruptedException e) {
100106
Thread.currentThread().interrupt();
101107
return null;
108+
} catch (Exception e) {
109+
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
110+
return null;
102111
}
103112

104113
try {
@@ -118,9 +127,12 @@ public ActivityTask poll() {
118127
ProtobufTimeUtils.toM3Duration(
119128
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
120129
isSuccessful = true;
121-
return new ActivityTask(response, pollSemaphore::release);
130+
return new ActivityTask(
131+
response,
132+
permit,
133+
() -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
122134
} finally {
123-
if (!isSuccessful) pollSemaphore.release();
135+
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
124136
}
125137
}
126138
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,21 @@
2121
package io.temporal.internal.worker;
2222

2323
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
24+
import io.temporal.worker.tuning.SlotPermit;
2425
import io.temporal.workflow.Functions;
2526
import javax.annotation.Nonnull;
2627

2728
public final class ActivityTask {
2829
private final @Nonnull PollActivityTaskQueueResponseOrBuilder response;
30+
private final @Nonnull SlotPermit permit;
2931
private final @Nonnull Functions.Proc completionCallback;
3032

3133
public ActivityTask(
3234
@Nonnull PollActivityTaskQueueResponseOrBuilder response,
35+
@Nonnull SlotPermit permit,
3336
@Nonnull Functions.Proc completionCallback) {
3437
this.response = response;
38+
this.permit = permit;
3539
this.completionCallback = completionCallback;
3640
}
3741

@@ -48,4 +52,9 @@ public PollActivityTaskQueueResponseOrBuilder getResponse() {
4852
public Functions.Proc getCompletionCallback() {
4953
return completionCallback;
5054
}
55+
56+
@Nonnull
57+
public SlotPermit getPermit() {
58+
return permit;
59+
}
5160
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
3131
import io.temporal.api.common.v1.WorkflowExecution;
3232
import io.temporal.api.workflowservice.v1.*;
33+
import io.temporal.internal.activity.ActivityPollResponseToInfo;
3334
import io.temporal.internal.common.ProtobufTimeUtils;
3435
import io.temporal.internal.logging.LoggerTag;
3536
import io.temporal.internal.retryer.GrpcRetryer;
@@ -39,9 +40,10 @@
3940
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
4041
import io.temporal.worker.MetricsType;
4142
import io.temporal.worker.WorkerMetricsTag;
43+
import io.temporal.worker.tuning.*;
4244
import java.util.Objects;
45+
import java.util.Optional;
4346
import java.util.concurrent.CompletableFuture;
44-
import java.util.concurrent.Semaphore;
4547
import java.util.concurrent.TimeUnit;
4648
import javax.annotation.Nonnull;
4749
import org.slf4j.Logger;
@@ -64,16 +66,16 @@ final class ActivityWorker implements SuspendableWorker {
6466
private final Scope workerMetricsScope;
6567
private final GrpcRetryer grpcRetryer;
6668
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
67-
private final int executorSlots;
68-
private final Semaphore executorSlotsSemaphore;
69+
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
6970

7071
public ActivityWorker(
7172
@Nonnull WorkflowServiceStubs service,
7273
@Nonnull String namespace,
7374
@Nonnull String taskQueue,
7475
double taskQueueActivitiesPerSecond,
7576
@Nonnull SingleWorkerOptions options,
76-
@Nonnull ActivityTaskHandler handler) {
77+
@Nonnull ActivityTaskHandler handler,
78+
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier) {
7779
this.service = Objects.requireNonNull(service);
7880
this.namespace = Objects.requireNonNull(namespace);
7981
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -87,8 +89,8 @@ public ActivityWorker(
8789
this.replyGrpcRetryerOptions =
8890
new GrpcRetryer.GrpcRetryerOptions(
8991
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
90-
this.executorSlots = options.getTaskExecutorThreadPoolSize();
91-
this.executorSlotsSemaphore = new Semaphore(executorSlots);
92+
93+
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
9294
}
9395

9496
@Override
@@ -101,8 +103,7 @@ public boolean start() {
101103
options.getIdentity(),
102104
new TaskHandlerImpl(handler),
103105
pollerOptions,
104-
options.getTaskExecutorThreadPoolSize(),
105-
workerMetricsScope,
106+
slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE),
106107
true);
107108
poller =
108109
new Poller<>(
@@ -115,7 +116,7 @@ public boolean start() {
115116
options.getBuildId(),
116117
options.isUsingBuildIdForVersioning(),
117118
taskQueueActivitiesPerSecond,
118-
executorSlotsSemaphore,
119+
this.slotSupplier,
119120
workerMetricsScope,
120121
service.getServerCapabilities()),
121122
this.pollTaskExecutor,
@@ -131,14 +132,14 @@ public boolean start() {
131132

132133
@Override
133134
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
134-
String semaphoreName = this + "#executorSlotsSemaphore";
135+
String supplierName = this + "#executorSlots";
135136
return poller
136137
.shutdown(shutdownManager, interruptTasks)
137138
.thenCompose(
138139
ignore ->
139140
!interruptTasks
140-
? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
141-
executorSlotsSemaphore, executorSlots, semaphoreName)
141+
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
142+
slotSupplier, supplierName)
142143
: CompletableFuture.completedFuture(null))
143144
.thenCompose(
144145
ignore ->
@@ -224,6 +225,15 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
224225
@Override
225226
public void handle(ActivityTask task) throws Exception {
226227
PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
228+
229+
slotSupplier.markSlotUsed(
230+
new ActivitySlotInfo(
231+
ActivityPollResponseToInfo.toActivityInfoImpl(
232+
pollResponse, namespace, taskQueue, false),
233+
options.getIdentity(),
234+
options.getBuildId()),
235+
task.getPermit());
236+
227237
Scope metricsScope =
228238
workerMetricsScope.tagged(
229239
ImmutableMap.of(
@@ -416,23 +426,34 @@ private void logExceptionDuringResultReporting(
416426

417427
private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
418428
@Override
419-
public boolean tryReserveActivitySlot(
429+
public Optional<SlotPermit> tryReserveActivitySlot(
420430
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
421-
return WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
422-
&& Objects.equals(
423-
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)
424-
&& ActivityWorker.this.executorSlotsSemaphore.tryAcquire();
431+
if (!WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
432+
|| !Objects.equals(
433+
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)) {
434+
return Optional.empty();
435+
}
436+
return ActivityWorker.this.slotSupplier.tryReserveSlot(
437+
new SlotReservationData(
438+
ActivityWorker.this.taskQueue, options.getIdentity(), options.getBuildId()));
425439
}
426440

427441
@Override
428-
public void releaseActivitySlotReservations(int slotCounts) {
429-
ActivityWorker.this.executorSlotsSemaphore.release(slotCounts);
442+
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
443+
for (SlotPermit permit : permits) {
444+
ActivityWorker.this.slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
445+
}
430446
}
431447

432448
@Override
433-
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
449+
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
434450
ActivityWorker.this.pollTaskExecutor.process(
435-
new ActivityTask(activity, ActivityWorker.this.executorSlotsSemaphore::release));
451+
new ActivityTask(
452+
activity,
453+
permit,
454+
() ->
455+
ActivityWorker.this.slotSupplier.releaseSlot(
456+
SlotReleaseReason.taskComplete(), permit)));
436457
}
437458
}
438459
}

temporal-sdk/src/main/java/io/temporal/internal/worker/EagerActivityDispatcher.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,33 @@
2222

2323
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
2424
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
25+
import io.temporal.worker.tuning.SlotPermit;
26+
import java.util.Optional;
2527

2628
public interface EagerActivityDispatcher {
27-
boolean tryReserveActivitySlot(ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);
29+
Optional<SlotPermit> tryReserveActivitySlot(
30+
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);
2831

29-
void releaseActivitySlotReservations(int slotCounts);
32+
void releaseActivitySlotReservations(Iterable<SlotPermit> permits);
3033

31-
void dispatchActivity(PollActivityTaskQueueResponse activity);
34+
void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit);
3235

3336
class NoopEagerActivityDispatcher implements EagerActivityDispatcher {
3437
@Override
35-
public boolean tryReserveActivitySlot(
38+
public Optional<SlotPermit> tryReserveActivitySlot(
3639
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
37-
return false;
40+
return Optional.empty();
3841
}
3942

4043
@Override
41-
public void releaseActivitySlotReservations(int slotCounts) {
42-
if (slotCounts > 0)
44+
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
45+
if (permits.iterator().hasNext())
4346
throw new IllegalStateException(
4447
"Trying to release activity slots on a NoopEagerActivityDispatcher");
4548
}
4649

4750
@Override
48-
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
51+
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
4952
throw new IllegalStateException(
5053
"Trying to dispatch activity on a NoopEagerActivityDispatcher");
5154
}

0 commit comments

Comments
 (0)