Skip to content

Commit 93f124f

Browse files
authored
Priorities for Workflows/Activities (#2453)
Add priority to various options
1 parent ead142e commit 93f124f

File tree

27 files changed

+636
-65
lines changed

27 files changed

+636
-65
lines changed

.github/workflows/ci.yml

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,36 @@ jobs:
7070

7171
- name: Start containerized server and dependencies
7272
run: |
73-
docker compose \
74-
-f ./docker/github/docker-compose.yaml \
75-
up -d temporal
76-
73+
wget https://github.com/temporalio/cli/releases/download/v1.3.1-priority.0/temporal_cli_1.3.1-priority.0_linux_amd64.tar.gz
74+
tar -xzf temporal_cli_1.3.1-priority.0_linux_amd64.tar.gz
75+
chmod +x temporal
76+
./temporal server start-dev \
77+
--headless \
78+
--port 7233 \
79+
--http-port 7243 \
80+
--namespace UnitTest \
81+
--search-attribute CustomKeywordField=Keyword \
82+
--search-attribute CustomStringField=Text \
83+
--search-attribute CustomTextField=Text \
84+
--search-attribute CustomIntField=Int \
85+
--search-attribute CustomDatetimeField=Datetime \
86+
--search-attribute CustomDoubleField=Double \
87+
--search-attribute CustomBoolField=Bool \
88+
--dynamic-config-value system.forceSearchAttributesCacheRefreshOnRead=true \
89+
--dynamic-config-value system.enableActivityEagerExecution=true \
90+
--dynamic-config-value system.enableEagerWorkflowStart=true \
91+
--dynamic-config-value system.enableExecuteMultiOperation=true \
92+
--dynamic-config-value frontend.enableUpdateWorkflowExecutionAsyncAccepted=true \
93+
--dynamic-config-value history.MaxBufferedQueryCount=100000 \
94+
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
95+
--dynamic-config-value worker.buildIdScavengerEnabled=true \
96+
--dynamic-config-value frontend.workerVersioningRuleAPIs=true \
97+
--dynamic-config-value worker.removableBuildIdDurationSinceDefault=true \
98+
--dynamic-config-value matching.useNewMatcher=true \
99+
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
100+
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true &
101+
sleep 10s
102+
77103
- name: Run unit tests
78104
env:
79105
USER: unittest

temporal-sdk/src/main/java/io/temporal/activity/ActivityInfo.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
package io.temporal.activity;
2222

2323
import io.temporal.api.common.v1.Payloads;
24+
import io.temporal.common.Experimental;
25+
import io.temporal.common.Priority;
2426
import java.time.Duration;
2527
import java.util.Optional;
2628
import javax.annotation.Nonnull;
@@ -139,4 +141,14 @@ public interface ActivityInfo {
139141

140142
/** Used to determine if the Activity Execution is a local Activity. */
141143
boolean isLocal();
144+
145+
/**
146+
* Return the priority of the activity task.
147+
*
148+
* @apiNote If unset or on an older server version, this method will return {@link
149+
* Priority#getDefaultInstance()}.
150+
*/
151+
@Experimental
152+
@Nonnull
153+
Priority getPriority();
142154
}

temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222

2323
import com.google.common.base.Objects;
2424
import io.temporal.client.WorkflowClientOptions;
25-
import io.temporal.common.Experimental;
26-
import io.temporal.common.MethodRetry;
27-
import io.temporal.common.RetryOptions;
28-
import io.temporal.common.VersioningIntent;
25+
import io.temporal.common.*;
2926
import io.temporal.common.context.ContextPropagator;
3027
import io.temporal.failure.CanceledFailure;
3128
import java.time.Duration;
@@ -66,6 +63,7 @@ public static final class Builder {
6663
private boolean disableEagerExecution;
6764
private VersioningIntent versioningIntent;
6865
private String summary;
66+
private Priority priority;
6967

7068
private Builder() {}
7169

@@ -84,6 +82,7 @@ private Builder(ActivityOptions options) {
8482
this.disableEagerExecution = options.disableEagerExecution;
8583
this.versioningIntent = options.versioningIntent;
8684
this.summary = options.summary;
85+
this.priority = options.priority;
8786
}
8887

8988
/**
@@ -195,13 +194,13 @@ public Builder setRetryOptions(RetryOptions retryOptions) {
195194
* could make sense. <br>
196195
* This is also why there is no equivalent method on {@link LocalActivityOptions}.
197196
*
198-
* @see <a href="https://github.com/temporalio/sdk-java/issues/490">Rejected feature reqest for
199-
* LocalActivityOption#contextPropagators</a>
200197
* @param contextPropagators specifies the list of context propagators to use during propagation
201198
* from a workflow to the activity with these {@link ActivityOptions}. This list overrides
202199
* the list specified on {@link
203200
* io.temporal.client.WorkflowClientOptions#getContextPropagators()}, {@code null} means no
204201
* overriding
202+
* @see <a href="https://github.com/temporalio/sdk-java/issues/490">Rejected feature reqest for
203+
* LocalActivityOption#contextPropagators</a>
205204
*/
206205
public Builder setContextPropagators(List<ContextPropagator> contextPropagators) {
207206
this.contextPropagators = contextPropagators;
@@ -258,6 +257,18 @@ public Builder setSummary(String summary) {
258257
return this;
259258
}
260259

260+
/**
261+
* Optional priority settings that control relative ordering of task processing when tasks are
262+
* backed up in a queue.
263+
*
264+
* <p>Defaults to inheriting priority from the workflow that scheduled the activity.
265+
*/
266+
@Experimental
267+
public Builder setPriority(Priority priority) {
268+
this.priority = priority;
269+
return this;
270+
}
271+
261272
public Builder mergeActivityOptions(ActivityOptions override) {
262273
if (override == null) {
263274
return this;
@@ -290,6 +301,7 @@ public Builder mergeActivityOptions(ActivityOptions override) {
290301
this.versioningIntent = override.versioningIntent;
291302
}
292303
this.summary = (override.summary == null) ? this.summary : override.summary;
304+
this.priority = (override.priority == null) ? this.priority : override.priority;
293305
return this;
294306
}
295307

@@ -313,7 +325,8 @@ public ActivityOptions build() {
313325
cancellationType,
314326
disableEagerExecution,
315327
versioningIntent,
316-
summary);
328+
summary,
329+
priority);
317330
}
318331

319332
public ActivityOptions validateAndBuildWithDefaults() {
@@ -330,7 +343,8 @@ public ActivityOptions validateAndBuildWithDefaults() {
330343
versioningIntent == null
331344
? VersioningIntent.VERSIONING_INTENT_UNSPECIFIED
332345
: versioningIntent,
333-
summary);
346+
summary,
347+
priority);
334348
}
335349
}
336350

@@ -345,6 +359,7 @@ public ActivityOptions validateAndBuildWithDefaults() {
345359
private final boolean disableEagerExecution;
346360
private final VersioningIntent versioningIntent;
347361
private final String summary;
362+
private final Priority priority;
348363

349364
private ActivityOptions(
350365
Duration heartbeatTimeout,
@@ -357,7 +372,8 @@ private ActivityOptions(
357372
ActivityCancellationType cancellationType,
358373
boolean disableEagerExecution,
359374
VersioningIntent versioningIntent,
360-
String summary) {
375+
String summary,
376+
Priority priority) {
361377
this.heartbeatTimeout = heartbeatTimeout;
362378
this.scheduleToStartTimeout = scheduleToStartTimeout;
363379
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
@@ -369,6 +385,7 @@ private ActivityOptions(
369385
this.disableEagerExecution = disableEagerExecution;
370386
this.versioningIntent = versioningIntent;
371387
this.summary = summary;
388+
this.priority = priority;
372389
}
373390

374391
/**
@@ -443,6 +460,11 @@ public String getSummary() {
443460
return summary;
444461
}
445462

463+
@Experimental
464+
public Priority getPriority() {
465+
return priority;
466+
}
467+
446468
public Builder toBuilder() {
447469
return new Builder(this);
448470
}
@@ -462,7 +484,8 @@ public boolean equals(Object o) {
462484
&& Objects.equal(contextPropagators, that.contextPropagators)
463485
&& disableEagerExecution == that.disableEagerExecution
464486
&& versioningIntent == that.versioningIntent
465-
&& Objects.equal(summary, that.summary);
487+
&& Objects.equal(summary, that.summary)
488+
&& Objects.equal(priority, that.priority);
466489
}
467490

468491
@Override
@@ -478,7 +501,8 @@ public int hashCode() {
478501
cancellationType,
479502
disableEagerExecution,
480503
versioningIntent,
481-
summary);
504+
summary,
505+
priority);
482506
}
483507

484508
@Override
@@ -507,6 +531,8 @@ public String toString() {
507531
+ versioningIntent
508532
+ ", summary="
509533
+ summary
534+
+ ", priority="
535+
+ priority
510536
+ '}';
511537
}
512538
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public static WorkflowOptions merge(
8585
.setCompletionCallbacks(o.getCompletionCallbacks())
8686
.setLinks(o.getLinks())
8787
.setOnConflictOptions(o.getOnConflictOptions())
88+
.setPriority(o.getPriority())
8889
.validateBuildWithDefaults();
8990
}
9091

@@ -132,6 +133,8 @@ public static final class Builder {
132133

133134
private OnConflictOptions onConflictOptions;
134135

136+
private Priority priority;
137+
135138
private Builder() {}
136139

137140
private Builder(WorkflowOptions options) {
@@ -159,6 +162,7 @@ private Builder(WorkflowOptions options) {
159162
this.completionCallbacks = options.completionCallbacks;
160163
this.links = options.links;
161164
this.onConflictOptions = options.onConflictOptions;
165+
this.priority = options.priority;
162166
}
163167

164168
/**
@@ -380,8 +384,8 @@ public Builder setContextPropagators(@Nullable List<ContextPropagator> contextPr
380384
* <li>has available workflow task executor slots
381385
* </ul>
382386
*
383-
* and such a {@link WorkflowClient} is used to start a workflow, then the first workflow task
384-
* could be dispatched on this local worker with the response to the start call if Server
387+
* <p>and such a {@link WorkflowClient} is used to start a workflow, then the first workflow
388+
* task could be dispatched on this local worker with the response to the start call if Server
385389
* supports it. This option can be used to disable this mechanism.
386390
*
387391
* <p>Default is true
@@ -478,6 +482,16 @@ public Builder setOnConflictOptions(OnConflictOptions onConflictOptions) {
478482
return this;
479483
}
480484

485+
/**
486+
* Optional priority settings that control relative ordering of task processing when tasks are
487+
* backed up in a queue.
488+
*/
489+
@Experimental
490+
public Builder setPriority(Priority priority) {
491+
this.priority = priority;
492+
return this;
493+
}
494+
481495
public WorkflowOptions build() {
482496
return new WorkflowOptions(
483497
workflowId,
@@ -500,7 +514,8 @@ public WorkflowOptions build() {
500514
requestId,
501515
completionCallbacks,
502516
links,
503-
onConflictOptions);
517+
onConflictOptions,
518+
priority);
504519
}
505520

506521
/**
@@ -528,7 +543,8 @@ public WorkflowOptions validateBuildWithDefaults() {
528543
requestId,
529544
completionCallbacks,
530545
links,
531-
onConflictOptions);
546+
onConflictOptions,
547+
priority);
532548
}
533549
}
534550

@@ -572,6 +588,7 @@ public WorkflowOptions validateBuildWithDefaults() {
572588

573589
private final List<Link> links;
574590
private final OnConflictOptions onConflictOptions;
591+
private final Priority priority;
575592

576593
private WorkflowOptions(
577594
String workflowId,
@@ -594,7 +611,8 @@ private WorkflowOptions(
594611
String requestId,
595612
List<Callback> completionCallbacks,
596613
List<Link> links,
597-
OnConflictOptions onConflictOptions) {
614+
OnConflictOptions onConflictOptions,
615+
Priority priority) {
598616
this.workflowId = workflowId;
599617
this.workflowIdReusePolicy = workflowIdReusePolicy;
600618
this.workflowRunTimeout = workflowRunTimeout;
@@ -616,6 +634,7 @@ private WorkflowOptions(
616634
this.completionCallbacks = completionCallbacks;
617635
this.links = links;
618636
this.onConflictOptions = onConflictOptions;
637+
this.priority = priority;
619638
}
620639

621640
public String getWorkflowId() {
@@ -717,6 +736,11 @@ public String getStaticDetails() {
717736
return onConflictOptions;
718737
}
719738

739+
@Experimental
740+
public Priority getPriority() {
741+
return priority;
742+
}
743+
720744
public Builder toBuilder() {
721745
return new Builder(this);
722746
}
@@ -746,7 +770,8 @@ public boolean equals(Object o) {
746770
&& Objects.equal(requestId, that.requestId)
747771
&& Objects.equal(completionCallbacks, that.completionCallbacks)
748772
&& Objects.equal(links, that.links)
749-
&& Objects.equal(onConflictOptions, that.onConflictOptions);
773+
&& Objects.equal(onConflictOptions, that.onConflictOptions)
774+
&& Objects.equal(priority, that.priority);
750775
}
751776

752777
@Override
@@ -772,7 +797,8 @@ public int hashCode() {
772797
requestId,
773798
completionCallbacks,
774799
links,
775-
onConflictOptions);
800+
onConflictOptions,
801+
priority);
776802
}
777803

778804
@Override
@@ -823,6 +849,8 @@ public String toString() {
823849
+ links
824850
+ ", onConflictOptions="
825851
+ onConflictOptions
852+
+ ", priority="
853+
+ priority
826854
+ '}';
827855
}
828856
}

0 commit comments

Comments
 (0)