Skip to content

Commit 46b239d

Browse files
Revert configurable slot provider (#2134)
* Revert "Resource based tuner (#2110)" This reverts commit 8a2d5cd. * Revert "Slot supplier interface & fixed-size implementation (#2014)" This reverts commit d2a06fc. * Fix merge conflict * Keep Publish Test Report step * Add tests for worker slots * Fix white space * One other whitespace change
1 parent abd9f2d commit 46b239d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+521
-3025
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,6 @@ jobs:
3333
USE_DOCKER_SERVICE: false
3434
run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest
3535

36-
- name: Run independent resource tuner test
37-
env:
38-
USER: unittest
39-
USE_DOCKER_SERVICE: false
40-
run: ./gradlew --no-daemon temporal-sdk:testResourceIndependent -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest
41-
4236
- name: Publish Test Report
4337
uses: mikepenz/action-junit-report@v4
4438
if: success() || failure() # always run even if the previous step fails

temporal-sdk/build.gradle

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,3 @@ task registerNamespace(type: JavaExec) {
3636
}
3737

3838
test.dependsOn 'registerNamespace'
39-
40-
test {
41-
useJUnit {
42-
excludeCategories 'io.temporal.worker.IndependentResourceBasedTests'
43-
}
44-
}
45-
46-
task testResourceIndependent(type: Test) {
47-
useJUnit {
48-
includeCategories 'io.temporal.worker.IndependentResourceBasedTests'
49-
maxParallelForks = 1
50-
}
51-
}

temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityPollResponseToInfo.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,11 @@
3030
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
3131
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
3232
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
33-
import io.temporal.internal.activity.ActivityPollResponseToInfo;
3433
import io.temporal.internal.common.ProtobufTimeUtils;
3534
import io.temporal.serviceclient.WorkflowServiceStubs;
3635
import io.temporal.worker.MetricsType;
37-
import io.temporal.worker.tuning.*;
3836
import java.util.Objects;
37+
import java.util.concurrent.Semaphore;
3938
import java.util.function.Supplier;
4039
import javax.annotation.Nonnull;
4140
import javax.annotation.Nullable;
@@ -46,7 +45,7 @@ final class ActivityPollTask implements Poller.PollTask<ActivityTask> {
4645
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
4746

4847
private final WorkflowServiceStubs service;
49-
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
48+
private final Semaphore pollSemaphore;
5049
private final Scope metricsScope;
5150
private final PollActivityTaskQueueRequest pollRequest;
5251

@@ -58,11 +57,11 @@ public ActivityPollTask(
5857
@Nullable String buildId,
5958
boolean useBuildIdForVersioning,
6059
double activitiesPerSecond,
61-
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
60+
Semaphore pollSemaphore,
6261
@Nonnull Scope metricsScope,
6362
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
6463
this.service = Objects.requireNonNull(service);
65-
this.slotSupplier = slotSupplier;
64+
this.pollSemaphore = pollSemaphore;
6665
this.metricsScope = Objects.requireNonNull(metricsScope);
6766

6867
PollActivityTaskQueueRequest.Builder pollRequest =
@@ -93,22 +92,13 @@ public ActivityTask poll() {
9392
log.trace("poll request begin: " + pollRequest);
9493
}
9594
PollActivityTaskQueueResponse response;
96-
SlotPermit permit;
9795
boolean isSuccessful = false;
9896

9997
try {
100-
permit =
101-
slotSupplier.reserveSlot(
102-
new SlotReservationData(
103-
pollRequest.getTaskQueue().getName(),
104-
pollRequest.getIdentity(),
105-
pollRequest.getWorkerVersionCapabilities().getBuildId()));
98+
pollSemaphore.acquire();
10699
} catch (InterruptedException e) {
107100
Thread.currentThread().interrupt();
108101
return null;
109-
} catch (Exception e) {
110-
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
111-
return null;
112102
}
113103

114104
try {
@@ -128,20 +118,9 @@ public ActivityTask poll() {
128118
ProtobufTimeUtils.toM3Duration(
129119
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
130120
isSuccessful = true;
131-
slotSupplier.markSlotUsed(
132-
new ActivitySlotInfo(
133-
ActivityPollResponseToInfo.toActivityInfoImpl(
134-
response,
135-
pollRequest.getNamespace(),
136-
pollRequest.getTaskQueue().getNormalName(),
137-
false),
138-
pollRequest.getIdentity(),
139-
pollRequest.getWorkerVersionCapabilities().getBuildId()),
140-
permit);
141-
return new ActivityTask(
142-
response, () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
121+
return new ActivityTask(response, pollSemaphore::release);
143122
} finally {
144-
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
123+
if (!isSuccessful) pollSemaphore.release();
145124
}
146125
}
147126
}

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,9 @@
3939
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
4040
import io.temporal.worker.MetricsType;
4141
import io.temporal.worker.WorkerMetricsTag;
42-
import io.temporal.worker.tuning.*;
4342
import java.util.Objects;
44-
import java.util.Optional;
4543
import java.util.concurrent.CompletableFuture;
44+
import java.util.concurrent.Semaphore;
4645
import java.util.concurrent.TimeUnit;
4746
import javax.annotation.Nonnull;
4847
import org.slf4j.Logger;
@@ -65,16 +64,16 @@ final class ActivityWorker implements SuspendableWorker {
6564
private final Scope workerMetricsScope;
6665
private final GrpcRetryer grpcRetryer;
6766
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
68-
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
67+
private final int executorSlots;
68+
private final Semaphore executorSlotsSemaphore;
6969

7070
public ActivityWorker(
7171
@Nonnull WorkflowServiceStubs service,
7272
@Nonnull String namespace,
7373
@Nonnull String taskQueue,
7474
double taskQueueActivitiesPerSecond,
7575
@Nonnull SingleWorkerOptions options,
76-
@Nonnull ActivityTaskHandler handler,
77-
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier) {
76+
@Nonnull ActivityTaskHandler handler) {
7877
this.service = Objects.requireNonNull(service);
7978
this.namespace = Objects.requireNonNull(namespace);
8079
this.taskQueue = Objects.requireNonNull(taskQueue);
@@ -88,8 +87,8 @@ public ActivityWorker(
8887
this.replyGrpcRetryerOptions =
8988
new GrpcRetryer.GrpcRetryerOptions(
9089
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
91-
this.slotSupplier = slotSupplier;
92-
this.slotSupplier.setMetricsScope(this.workerMetricsScope);
90+
this.executorSlots = options.getTaskExecutorThreadPoolSize();
91+
this.executorSlotsSemaphore = new Semaphore(executorSlots);
9392
}
9493

9594
@Override
@@ -102,7 +101,8 @@ public boolean start() {
102101
options.getIdentity(),
103102
new TaskHandlerImpl(handler),
104103
pollerOptions,
105-
slotSupplier.maximumSlots(),
104+
options.getTaskExecutorThreadPoolSize(),
105+
workerMetricsScope,
106106
true);
107107
poller =
108108
new Poller<>(
@@ -115,7 +115,7 @@ public boolean start() {
115115
options.getBuildId(),
116116
options.isUsingBuildIdForVersioning(),
117117
taskQueueActivitiesPerSecond,
118-
this.slotSupplier,
118+
executorSlotsSemaphore,
119119
workerMetricsScope,
120120
service.getServerCapabilities()),
121121
this.pollTaskExecutor,
@@ -131,14 +131,14 @@ public boolean start() {
131131

132132
@Override
133133
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
134-
String supplierName = this + "#executorSlots";
134+
String semaphoreName = this + "#executorSlotsSemaphore";
135135
return poller
136136
.shutdown(shutdownManager, interruptTasks)
137137
.thenCompose(
138138
ignore ->
139139
!interruptTasks
140-
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
141-
slotSupplier, supplierName)
140+
? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
141+
executorSlotsSemaphore, executorSlots, semaphoreName)
142142
: CompletableFuture.completedFuture(null))
143143
.thenCompose(
144144
ignore ->
@@ -416,33 +416,23 @@ private void logExceptionDuringResultReporting(
416416

417417
private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
418418
@Override
419-
public Optional<SlotPermit> tryReserveActivitySlot(
419+
public boolean tryReserveActivitySlot(
420420
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
421-
if (!WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
422-
|| !Objects.equals(
423-
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)) {
424-
return Optional.empty();
425-
}
426-
return ActivityWorker.this.slotSupplier.tryReserveSlot(
427-
new SlotReservationData(
428-
ActivityWorker.this.taskQueue, options.getIdentity(), options.getBuildId()));
421+
return WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
422+
&& Objects.equals(
423+
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)
424+
&& ActivityWorker.this.executorSlotsSemaphore.tryAcquire();
429425
}
430426

431427
@Override
432-
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
433-
for (SlotPermit permit : permits) {
434-
ActivityWorker.this.slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
435-
}
428+
public void releaseActivitySlotReservations(int slotCounts) {
429+
ActivityWorker.this.executorSlotsSemaphore.release(slotCounts);
436430
}
437431

438432
@Override
439-
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
433+
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
440434
ActivityWorker.this.pollTaskExecutor.process(
441-
new ActivityTask(
442-
activity,
443-
() ->
444-
ActivityWorker.this.slotSupplier.releaseSlot(
445-
SlotReleaseReason.taskComplete(), permit)));
435+
new ActivityTask(activity, ActivityWorker.this.executorSlotsSemaphore::release));
446436
}
447437
}
448438
}

temporal-sdk/src/main/java/io/temporal/internal/worker/EagerActivityDispatcher.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,33 +22,30 @@
2222

2323
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
2424
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
25-
import io.temporal.worker.tuning.SlotPermit;
26-
import java.util.Optional;
2725

2826
public interface EagerActivityDispatcher {
29-
Optional<SlotPermit> tryReserveActivitySlot(
30-
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);
27+
boolean tryReserveActivitySlot(ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);
3128

32-
void releaseActivitySlotReservations(Iterable<SlotPermit> permits);
29+
void releaseActivitySlotReservations(int slotCounts);
3330

34-
void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit);
31+
void dispatchActivity(PollActivityTaskQueueResponse activity);
3532

3633
class NoopEagerActivityDispatcher implements EagerActivityDispatcher {
3734
@Override
38-
public Optional<SlotPermit> tryReserveActivitySlot(
35+
public boolean tryReserveActivitySlot(
3936
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
40-
return Optional.empty();
37+
return false;
4138
}
4239

4340
@Override
44-
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
45-
if (permits.iterator().hasNext())
41+
public void releaseActivitySlotReservations(int slotCounts) {
42+
if (slotCounts > 0)
4643
throw new IllegalStateException(
4744
"Trying to release activity slots on a NoopEagerActivityDispatcher");
4845
}
4946

5047
@Override
51-
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
48+
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
5249
throw new IllegalStateException(
5350
"Trying to dispatch activity on a NoopEagerActivityDispatcher");
5451
}

0 commit comments

Comments
 (0)