Skip to content

Commit 8a2d5cd

Browse files
authored
Resource based tuner (#2110)
1 parent 69769cb commit 8a2d5cd

20 files changed

+1483
-91
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ jobs:
3333
USE_DOCKER_SERVICE: false
3434
run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest
3535

36+
- name: Run independent resource tuner test
37+
env:
38+
USER: unittest
39+
USE_DOCKER_SERVICE: false
40+
run: ./gradlew --no-daemon temporal-sdk:testResourceIndependent -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest
41+
3642
- name: Publish Test Report
3743
uses: mikepenz/action-junit-report@v4
3844
if: success() || failure() # always run even if the previous step fails

temporal-sdk/build.gradle

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,16 @@ task registerNamespace(type: JavaExec) {
3434
}
3535

3636
test.dependsOn 'registerNamespace'
37+
38+
test {
39+
useJUnit {
40+
excludeCategories 'io.temporal.worker.IndependentResourceBasedTests'
41+
}
42+
}
43+
44+
task testResourceIndependent(type: Test) {
45+
useJUnit {
46+
includeCategories 'io.temporal.worker.IndependentResourceBasedTests'
47+
maxParallelForks = 1
48+
}
49+
}

temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ public void setMetricsScope(Scope metricsScope) {
9494
this.metricsScope = metricsScope;
9595
}
9696

97+
/**
98+
* If any slot supplier is resource-based, we want to attach a metrics scope to the controller
99+
* (before it's labelled with the worker type).
100+
*/
101+
public void attachMetricsToResourceController(Scope metricsScope) {
102+
if (inner instanceof ResourceBasedSlotSupplier) {
103+
((ResourceBasedSlotSupplier<?>) inner).getResourceController().setMetricsScope(metricsScope);
104+
}
105+
}
106+
97107
Map<SlotPermit, SI> getUsedSlots() {
98108
return usedSlots;
99109
}
@@ -109,24 +119,28 @@ private SlotReserveContext<SI> createCtx(SlotReservationData dat) {
109119
dat.taskQueue,
110120
Collections.unmodifiableMap(usedSlots),
111121
dat.workerIdentity,
112-
dat.workerBuildId);
122+
dat.workerBuildId,
123+
issuedSlots);
113124
}
114125

115126
private class SlotReserveContextImpl implements SlotReserveContext<SI> {
116127
private final String taskQueue;
117128
private final Map<SlotPermit, SI> usedSlots;
118129
private final String workerIdentity;
119130
private final String workerBuildId;
131+
private final AtomicInteger issuedSlots;
120132

121133
private SlotReserveContextImpl(
122134
String taskQueue,
123135
Map<SlotPermit, SI> usedSlots,
124136
String workerIdentity,
125-
String workerBuildId) {
137+
String workerBuildId,
138+
AtomicInteger issuedSlots) {
126139
this.taskQueue = taskQueue;
127140
this.usedSlots = usedSlots;
128141
this.workerIdentity = workerIdentity;
129142
this.workerBuildId = workerBuildId;
143+
this.issuedSlots = issuedSlots;
130144
}
131145

132146
@Override
@@ -148,6 +162,11 @@ public String getWorkerIdentity() {
148162
public String getWorkerBuildId() {
149163
return workerBuildId;
150164
}
165+
166+
@Override
167+
public int getNumIssuedSlots() {
168+
return issuedSlots.get();
169+
}
151170
}
152171

153172
private class SlotMarkUsedContextImpl implements SlotMarkUsedContext<SI> {

temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,4 +169,13 @@ private MetricsType() {}
169169
// gauge
170170
public static final String WORKFLOW_ACTIVE_THREAD_COUNT =
171171
TEMPORAL_METRICS_PREFIX + "workflow_active_thread_count";
172+
173+
//
174+
// Resource tuner
175+
//
176+
// Tagged with namespace & task_queue
177+
public static final String RESOURCE_MEM_USAGE = "resource_slots_mem_usage";
178+
public static final String RESOURCE_CPU_USAGE = "resource_slots_cpu_usage";
179+
public static final String RESOURCE_MEM_PID = "resource_slots_mem_pid_output";
180+
public static final String RESOURCE_CPU_PID = "resource_slots_cpu_pid_output";
172181
}

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ public final class Worker {
109109
} else {
110110
TrackingSlotSupplier<ActivitySlotInfo> activitySlotSupplier =
111111
new TrackingSlotSupplier<>(
112-
this.options.getActivitySlotSupplier() == null
112+
this.options.getWorkerTuner() == null
113113
? new FixedSizeSlotSupplier<>(
114114
this.options.getMaxConcurrentActivityExecutionSize())
115-
: this.options.getActivitySlotSupplier());
115+
: this.options.getWorkerTuner().getActivityTaskSlotSupplier());
116+
activitySlotSupplier.attachMetricsToResourceController(taggedScope);
116117

117118
activityWorker =
118119
new SyncActivityWorker(
@@ -143,16 +144,18 @@ public final class Worker {
143144

144145
TrackingSlotSupplier<WorkflowSlotInfo> workflowSlotSupplier =
145146
new TrackingSlotSupplier<>(
146-
this.options.getWorkflowSlotSupplier() == null
147+
this.options.getWorkerTuner() == null
147148
? new FixedSizeSlotSupplier<>(
148149
this.options.getMaxConcurrentWorkflowTaskExecutionSize())
149-
: this.options.getWorkflowSlotSupplier());
150+
: this.options.getWorkerTuner().getWorkflowTaskSlotSupplier());
151+
workflowSlotSupplier.attachMetricsToResourceController(taggedScope);
150152
TrackingSlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier =
151153
new TrackingSlotSupplier<>(
152-
this.options.getLocalActivitySlotSupplier() == null
154+
this.options.getWorkerTuner() == null
153155
? new FixedSizeSlotSupplier<>(
154156
this.options.getMaxConcurrentLocalActivityExecutionSize())
155-
: this.options.getLocalActivitySlotSupplier());
157+
: this.options.getWorkerTuner().getLocalActivitySlotSupplier());
158+
localActivitySlotSupplier.attachMetricsToResourceController(taggedScope);
156159
workflowWorker =
157160
new SyncWorkflowWorker(
158161
service,

temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java

Lines changed: 27 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,7 @@ public static final class Builder {
8383
private String buildId;
8484
private boolean useBuildIdForVersioning;
8585
private Duration stickyTaskQueueDrainTimeout;
86-
private SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier;
87-
private SlotSupplier<ActivitySlotInfo> activitySlotSupplier;
88-
private SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier;
86+
private WorkerTuner workerTuner;
8987
private String identity;
9088

9189
private Builder() {}
@@ -98,9 +96,7 @@ private Builder(WorkerOptions o) {
9896
this.maxConcurrentActivityExecutionSize = o.maxConcurrentActivityExecutionSize;
9997
this.maxConcurrentWorkflowTaskExecutionSize = o.maxConcurrentWorkflowTaskExecutionSize;
10098
this.maxConcurrentLocalActivityExecutionSize = o.maxConcurrentLocalActivityExecutionSize;
101-
this.workflowSlotSupplier = o.workflowSlotSupplier;
102-
this.activitySlotSupplier = o.activitySlotSupplier;
103-
this.localActivitySlotSupplier = o.localActivitySlotSupplier;
99+
this.workerTuner = o.workerTuner;
104100
this.maxTaskQueueActivitiesPerSecond = o.maxTaskQueueActivitiesPerSecond;
105101
this.maxConcurrentWorkflowTaskPollers = o.maxConcurrentWorkflowTaskPollers;
106102
this.maxConcurrentActivityTaskPollers = o.maxConcurrentActivityTaskPollers;
@@ -138,8 +134,7 @@ public Builder setMaxWorkerActivitiesPerSecond(double maxWorkerActivitiesPerSeco
138134
* @param maxConcurrentActivityExecutionSize Maximum number of activities executed in parallel.
139135
* Default is 200, which is chosen if set to zero.
140136
* @return {@code this}
141-
* <p>Note setting is mutually exclusive with {@link
142-
* #setActivitySlotSupplier(SlotSupplier)}.
137+
* <p>Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)}
143138
*/
144139
public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityExecutionSize) {
145140
if (maxConcurrentActivityExecutionSize < 0) {
@@ -157,7 +152,7 @@ public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityEx
157152
* @return {@code this}
158153
* <p>Note that this is not related to the total number of open workflows which do not need
159154
* to be loaded in a worker when they are not making state transitions.
160-
* <p>Note setting is mutually exclusive with {@link #setWorkflowSlotSupplier(SlotSupplier)}
155+
* <p>Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)}
161156
*/
162157
public Builder setMaxConcurrentWorkflowTaskExecutionSize(
163158
int maxConcurrentWorkflowTaskExecutionSize) {
@@ -174,8 +169,7 @@ public Builder setMaxConcurrentWorkflowTaskExecutionSize(
174169
* @param maxConcurrentLocalActivityExecutionSize Maximum number of local activities executed in
175170
* parallel. Default is 200, which is chosen if set to zero.
176171
* @return {@code this}
177-
* <p>Note setting is mutually exclusive with {@link
178-
* #setLocalActivitySlotSupplier(SlotSupplier)}
172+
* <p>Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)}
179173
*/
180174
public Builder setMaxConcurrentLocalActivityExecutionSize(
181175
int maxConcurrentLocalActivityExecutionSize) {
@@ -384,37 +378,13 @@ public Builder setStickyTaskQueueDrainTimeout(Duration stickyTaskQueueDrainTimeo
384378
}
385379

386380
/**
387-
* Set the {@link SlotSupplier} for workflow tasks.
388-
*
389-
* <p>Note that this setting is mutually exclusive with {@link
390-
* #setMaxConcurrentWorkflowTaskExecutionSize(int)}.
391-
*/
392-
@Experimental
393-
public void setWorkflowSlotSupplier(SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier) {
394-
this.workflowSlotSupplier = workflowSlotSupplier;
395-
}
396-
397-
/**
398-
* Set the {@link SlotSupplier} for activity tasks.
399-
*
400-
* <p>Note that this setting is mutually exclusive with {@link
401-
* #setMaxConcurrentActivityExecutionSize(int)}.
402-
*/
403-
@Experimental
404-
public void setActivitySlotSupplier(SlotSupplier<ActivitySlotInfo> activitySlotSupplier) {
405-
this.activitySlotSupplier = activitySlotSupplier;
406-
}
407-
408-
/**
409-
* Set the {@link SlotSupplier} for local activity tasks.
410-
*
411-
* <p>Note that this setting is mutually exclusive with {@link
412-
* #setMaxConcurrentLocalActivityExecutionSize(int)}.
381+
* Set a {@link WorkerTuner} to determine how slots will be allocated for different types of
382+
* tasks.
413383
*/
414384
@Experimental
415-
public void setLocalActivitySlotSupplier(
416-
SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier) {
417-
this.localActivitySlotSupplier = localActivitySlotSupplier;
385+
public Builder setWorkerTuner(WorkerTuner workerTuner) {
386+
this.workerTuner = workerTuner;
387+
return this;
418388
}
419389

420390
/** Override identity of the worker primary specified in a WorkflowClient options. */
@@ -429,9 +399,7 @@ public WorkerOptions build() {
429399
maxConcurrentActivityExecutionSize,
430400
maxConcurrentWorkflowTaskExecutionSize,
431401
maxConcurrentLocalActivityExecutionSize,
432-
workflowSlotSupplier,
433-
activitySlotSupplier,
434-
localActivitySlotSupplier,
402+
workerTuner,
435403
maxTaskQueueActivitiesPerSecond,
436404
maxConcurrentWorkflowTaskPollers,
437405
maxConcurrentActivityTaskPollers,
@@ -458,20 +426,20 @@ public WorkerOptions validateAndBuildWithDefaults() {
458426
Preconditions.checkState(
459427
maxConcurrentLocalActivityExecutionSize >= 0,
460428
"negative maxConcurrentLocalActivityExecutionSize");
461-
if (activitySlotSupplier != null) {
429+
if (workerTuner != null) {
462430
Preconditions.checkState(
463431
maxConcurrentActivityExecutionSize == 0,
464-
"maxConcurrentActivityExecutionSize must not be set if activitySlotSupplier is set");
432+
"maxConcurrentActivityExecutionSize must not be set if workerTuner is set");
465433
}
466-
if (workflowSlotSupplier != null) {
434+
if (workerTuner != null) {
467435
Preconditions.checkState(
468436
maxConcurrentWorkflowTaskExecutionSize == 0,
469-
"maxConcurrentWorkflowTaskExecutionSize must not be set if workflowSlotSupplier is set");
437+
"maxConcurrentWorkflowTaskExecutionSize must not be set if workerTuner is set");
470438
}
471-
if (localActivitySlotSupplier != null) {
439+
if (workerTuner != null) {
472440
Preconditions.checkState(
473441
maxConcurrentLocalActivityExecutionSize == 0,
474-
"maxConcurrentLocalActivityExecutionSize must not be set if localActivitySlotSupplier is set");
442+
"maxConcurrentLocalActivityExecutionSize must not be set if workerTuner is set");
475443
}
476444
Preconditions.checkState(
477445
maxTaskQueueActivitiesPerSecond >= 0, "negative taskQueueActivitiesPerSecond");
@@ -505,9 +473,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
505473
maxConcurrentLocalActivityExecutionSize == 0
506474
? DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE
507475
: maxConcurrentLocalActivityExecutionSize,
508-
workflowSlotSupplier,
509-
activitySlotSupplier,
510-
localActivitySlotSupplier,
476+
workerTuner,
511477
maxTaskQueueActivitiesPerSecond,
512478
maxConcurrentWorkflowTaskPollers == 0
513479
? DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_POLLERS
@@ -542,9 +508,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
542508
private final int maxConcurrentActivityExecutionSize;
543509
private final int maxConcurrentWorkflowTaskExecutionSize;
544510
private final int maxConcurrentLocalActivityExecutionSize;
545-
private final SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier;
546-
private final SlotSupplier<ActivitySlotInfo> activitySlotSupplier;
547-
private final SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier;
511+
private final WorkerTuner workerTuner;
548512
private final double maxTaskQueueActivitiesPerSecond;
549513
private final int maxConcurrentWorkflowTaskPollers;
550514
private final int maxConcurrentActivityTaskPollers;
@@ -564,9 +528,7 @@ private WorkerOptions(
564528
int maxConcurrentActivityExecutionSize,
565529
int maxConcurrentWorkflowTaskExecutionSize,
566530
int maxConcurrentLocalActivityExecutionSize,
567-
SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier,
568-
SlotSupplier<ActivitySlotInfo> activitySlotSupplier,
569-
SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier,
531+
WorkerTuner workerTuner,
570532
double maxTaskQueueActivitiesPerSecond,
571533
int workflowPollThreadCount,
572534
int activityPollThreadCount,
@@ -584,9 +546,7 @@ private WorkerOptions(
584546
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
585547
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize;
586548
this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize;
587-
this.workflowSlotSupplier = workflowSlotSupplier;
588-
this.activitySlotSupplier = activitySlotSupplier;
589-
this.localActivitySlotSupplier = localActivitySlotSupplier;
549+
this.workerTuner = workerTuner;
590550
this.maxTaskQueueActivitiesPerSecond = maxTaskQueueActivitiesPerSecond;
591551
this.maxConcurrentWorkflowTaskPollers = workflowPollThreadCount;
592552
this.maxConcurrentActivityTaskPollers = activityPollThreadCount;
@@ -683,16 +643,8 @@ public Duration getStickyTaskQueueDrainTimeout() {
683643
return stickyTaskQueueDrainTimeout;
684644
}
685645

686-
public SlotSupplier<WorkflowSlotInfo> getWorkflowSlotSupplier() {
687-
return workflowSlotSupplier;
688-
}
689-
690-
public SlotSupplier<ActivitySlotInfo> getActivitySlotSupplier() {
691-
return activitySlotSupplier;
692-
}
693-
694-
public SlotSupplier<LocalActivitySlotInfo> getLocalActivitySlotSupplier() {
695-
return localActivitySlotSupplier;
646+
public WorkerTuner getWorkerTuner() {
647+
return workerTuner;
696648
}
697649

698650
@Nullable
@@ -716,9 +668,7 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond
716668
&& defaultDeadlockDetectionTimeout == that.defaultDeadlockDetectionTimeout
717669
&& disableEagerExecution == that.disableEagerExecution
718670
&& useBuildIdForVersioning == that.useBuildIdForVersioning
719-
&& Objects.equals(workflowSlotSupplier, that.workflowSlotSupplier)
720-
&& Objects.equals(activitySlotSupplier, that.activitySlotSupplier)
721-
&& Objects.equals(localActivitySlotSupplier, that.localActivitySlotSupplier)
671+
&& Objects.equals(workerTuner, that.workerTuner)
722672
&& Objects.equals(maxHeartbeatThrottleInterval, that.maxHeartbeatThrottleInterval)
723673
&& Objects.equals(defaultHeartbeatThrottleInterval, that.defaultHeartbeatThrottleInterval)
724674
&& Objects.equals(stickyQueueScheduleToStartTimeout, that.stickyQueueScheduleToStartTimeout)
@@ -734,9 +684,7 @@ public int hashCode() {
734684
maxConcurrentActivityExecutionSize,
735685
maxConcurrentWorkflowTaskExecutionSize,
736686
maxConcurrentLocalActivityExecutionSize,
737-
workflowSlotSupplier,
738-
activitySlotSupplier,
739-
localActivitySlotSupplier,
687+
workerTuner,
740688
maxTaskQueueActivitiesPerSecond,
741689
maxConcurrentWorkflowTaskPollers,
742690
maxConcurrentActivityTaskPollers,
@@ -763,12 +711,8 @@ public String toString() {
763711
+ maxConcurrentWorkflowTaskExecutionSize
764712
+ ", maxConcurrentLocalActivityExecutionSize="
765713
+ maxConcurrentLocalActivityExecutionSize
766-
+ ", workflowSlotSupplier="
767-
+ workflowSlotSupplier
768-
+ ", activitySlotSupplier="
769-
+ activitySlotSupplier
770-
+ ", localActivitySlotSupplier="
771-
+ localActivitySlotSupplier
714+
+ ", workerTuner="
715+
+ workerTuner
772716
+ ", maxTaskQueueActivitiesPerSecond="
773717
+ maxTaskQueueActivitiesPerSecond
774718
+ ", maxConcurrentWorkflowTaskPollers="

0 commit comments

Comments
 (0)