Skip to content

Commit 82d5a88

Browse files
authored
Don't return update handles until desired stage reached (#2066)
1 parent 9a856f3 commit 82d5a88

File tree

11 files changed

+209
-76
lines changed

11 files changed

+209
-76
lines changed

.github/workflows/features.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ on: [push, pull_request]
33

44
jobs:
55
features-test:
6-
uses: temporalio/features/.github/workflows/java.yaml@main
6+
uses: temporalio/features/.github/workflows/java.yaml@java-breaking-update
77
with:
88
java-repo-path: ${{github.event.pull_request.head.repo.full_name}}
99
version: ${{github.event.pull_request.head.ref}}
1010
version-is-repo-ref: true
11+
features-repo-ref: java-breaking-update

temporal-sdk/src/main/java/io/temporal/client/LazyUpdateHandleImpl.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ final class LazyUpdateHandleImpl<T> implements UpdateHandle<T> {
4242
private final WorkflowExecution execution;
4343
private final Class<T> resultClass;
4444
private final Type resultType;
45+
private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput<T> waitCompletedPollCall;
4546

4647
LazyUpdateHandleImpl(
4748
WorkflowClientCallsInterceptor workflowClientInvoker,
@@ -72,12 +73,23 @@ public String getId() {
7273

7374
@Override
7475
public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
75-
WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput output =
76-
workflowClientInvoker.pollWorkflowUpdate(
77-
new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>(
78-
execution, updateName, id, resultClass, resultType, timeout, unit));
7976

80-
return output
77+
WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput<T> pollCall = null;
78+
boolean setFromWaitCompleted = false;
79+
80+
// If waitCompleted was called, use the result from that call.
81+
synchronized (this) {
82+
if (waitCompletedPollCall != null) {
83+
pollCall = waitCompletedPollCall;
84+
waitCompletedPollCall = null;
85+
}
86+
}
87+
88+
if (!setFromWaitCompleted) {
89+
pollCall = pollUntilComplete(timeout, unit);
90+
}
91+
92+
return pollCall
8193
.getResult()
8294
.exceptionally(
8395
failure -> {
@@ -109,4 +121,17 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
109121
public CompletableFuture<T> getResultAsync() {
110122
return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
111123
}
124+
125+
// Can be called immediately after initialization to wait for the update to be completed, but
126+
// still have the result be returned by getResultAsync.
127+
void waitCompleted() {
128+
waitCompletedPollCall = pollUntilComplete(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
129+
}
130+
131+
private WorkflowClientCallsInterceptor.PollWorkflowUpdateOutput<T> pollUntilComplete(
132+
long timeout, TimeUnit unit) {
133+
return workflowClientInvoker.pollWorkflowUpdate(
134+
new WorkflowClientCallsInterceptor.PollWorkflowUpdateInput<>(
135+
execution, updateName, id, resultClass, resultType, timeout, unit));
136+
}
112137
}

temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,15 @@ public static UpdateOptions getDefaultInstance() {
5050
private final String updateName;
5151
private final String updateId;
5252
private final String firstExecutionRunId;
53-
private final UpdateWaitPolicy waitPolicy;
53+
private final WorkflowUpdateStage waitPolicy;
5454
private final Class<T> resultClass;
5555
private final Type resultType;
5656

5757
private UpdateOptions(
5858
String updateName,
5959
String updateId,
6060
String firstExecutionRunId,
61-
UpdateWaitPolicy waitPolicy,
61+
WorkflowUpdateStage waitPolicy,
6262
Class<T> resultClass,
6363
Type resultType) {
6464
this.updateName = updateName;
@@ -81,7 +81,7 @@ public String getFirstExecutionRunId() {
8181
return firstExecutionRunId;
8282
}
8383

84-
public UpdateWaitPolicy getWaitPolicy() {
84+
public WorkflowUpdateStage getWaitPolicy() {
8585
return waitPolicy;
8686
}
8787

@@ -152,7 +152,7 @@ public static final class Builder<T> {
152152
private String updateName;
153153
private String updateId;
154154
private String firstExecutionRunId;
155-
private UpdateWaitPolicy waitPolicy;
155+
private WorkflowUpdateStage waitPolicy;
156156
private Class<T> resultClass;
157157
private Type resultType;
158158

@@ -208,7 +208,7 @@ public Builder<T> setFirstExecutionRunId(String firstExecutionRunId) {
208208
* <li><b>Completed</b> Wait for the update to be completed by the workflow.
209209
* </ul>
210210
*/
211-
public Builder<T> setWaitPolicy(UpdateWaitPolicy waitPolicy) {
211+
public Builder<T> setWaitPolicy(WorkflowUpdateStage waitPolicy) {
212212
this.waitPolicy = waitPolicy;
213213
return this;
214214
}
@@ -239,7 +239,7 @@ public UpdateOptions<T> build() {
239239
updateName,
240240
updateId,
241241
firstExecutionRunId == null ? "" : firstExecutionRunId,
242-
waitPolicy == null ? UpdateWaitPolicy.ACCEPTED : waitPolicy,
242+
waitPolicy == null ? WorkflowUpdateStage.ACCEPTED : waitPolicy,
243243
resultClass,
244244
resultType == null ? resultClass : resultType);
245245
}

temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ static <T> WorkflowStub fromTyped(T typed) {
107107

108108
/**
109109
* Asynchronously update a workflow execution by invoking its update handler and returning a
110-
* handle to the update request.
110+
* handle to the update request. If {@link WorkflowUpdateStage#COMPLETED} is specified, in the
111+
* options, the handle will not be returned until the update is completed.
111112
*
112113
* @param options options that will be used to configure and start a new update request.
113114
* @param args update method arguments

temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {
297297
UpdateOptions<R> options =
298298
UpdateOptions.<R>newBuilder()
299299
.setUpdateName(updateName)
300-
.setWaitPolicy(UpdateWaitPolicy.COMPLETED)
300+
.setWaitPolicy(WorkflowUpdateStage.COMPLETED)
301301
.setResultClass(resultClass)
302302
.build();
303303
return startUpdate(options, args).getResultAsync().get();
@@ -316,7 +316,7 @@ public <R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass,
316316
UpdateOptions<R> options =
317317
UpdateOptions.<R>newBuilder()
318318
.setUpdateName(updateName)
319-
.setWaitPolicy(UpdateWaitPolicy.ACCEPTED)
319+
.setWaitPolicy(WorkflowUpdateStage.ACCEPTED)
320320
.setResultClass(resultClass)
321321
.setResultType(resultClass)
322322
.build();
@@ -351,14 +351,20 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
351351
result.getReference().getWorkflowExecution(),
352352
result.getResult());
353353
} else {
354-
return new LazyUpdateHandleImpl<>(
355-
workflowClientInvoker,
356-
workflowType.orElse(null),
357-
options.getUpdateName(),
358-
result.getReference().getUpdateId(),
359-
result.getReference().getWorkflowExecution(),
360-
options.getResultClass(),
361-
options.getResultType());
354+
LazyUpdateHandleImpl<R> handle =
355+
new LazyUpdateHandleImpl<>(
356+
workflowClientInvoker,
357+
workflowType.orElse(null),
358+
options.getUpdateName(),
359+
result.getReference().getUpdateId(),
360+
result.getReference().getWorkflowExecution(),
361+
options.getResultClass(),
362+
options.getResultType());
363+
if (options.getWaitPolicy() == WorkflowUpdateStage.COMPLETED) {
364+
// Don't return the handle until completed, since that's what's been asked for
365+
handle.waitCompleted();
366+
}
367+
return handle;
362368
}
363369
} catch (Exception e) {
364370
Throwable throwable = throwAsWorkflowFailureException(e, targetExecution);

temporal-sdk/src/main/java/io/temporal/client/UpdateWaitPolicy.java renamed to temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateStage.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,20 @@
2222

2323
import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
2424

25-
public enum UpdateWaitPolicy {
26-
/** Update request waits for the update to be accepted by the workflow */
25+
public enum WorkflowUpdateStage {
26+
/**
27+
* Update request waits for the update to be until the update request has been admitted by the
28+
* server - it may be the case that due to a considerations like load or resource limits that an
29+
* update is made to wait before the server will indicate that it has been received and will be
30+
* processed. This value does not wait for any sort of acknowledgement from a worker.
31+
*/
32+
ADMITTED(
33+
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED),
34+
35+
/**
36+
* Update request waits for the update to be accepted (and validated, if there is a validator) by
37+
* the workflow
38+
*/
2739
ACCEPTED(
2840
UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED),
2941

@@ -33,7 +45,7 @@ public enum UpdateWaitPolicy {
3345

3446
private final UpdateWorkflowExecutionLifecycleStage policy;
3547

36-
UpdateWaitPolicy(UpdateWorkflowExecutionLifecycleStage policy) {
48+
WorkflowUpdateStage(UpdateWorkflowExecutionLifecycleStage policy) {
3749
this.policy = policy;
3850
}
3951

temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,18 @@ public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
333333
.setFirstExecutionRunId(input.getFirstExecutionRunId())
334334
.setRequest(request)
335335
.build();
336-
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS);
337-
UpdateWorkflowExecutionResponse result =
338-
genericClient.update(updateRequest, pollTimeoutDeadline);
336+
337+
// Re-attempt the update until it is at least accepted, or passes the lifecycle stage specified
338+
// by the user.
339+
UpdateWorkflowExecutionResponse result;
340+
do {
341+
Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS);
342+
result = genericClient.update(updateRequest, pollTimeoutDeadline);
343+
} while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber()
344+
&& result.getStage().getNumber()
345+
< UpdateWorkflowExecutionLifecycleStage
346+
.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED
347+
.getNumber());
339348

340349
if (result.hasOutcome()) {
341350
switch (result.getOutcome().getValueCase()) {
@@ -399,20 +408,18 @@ public <R> PollWorkflowUpdateOutput<R> pollWorkflowUpdate(PollWorkflowUpdateInpu
399408

400409
Deadline pollTimeoutDeadline = Deadline.after(input.getTimeout(), input.getTimeoutUnit());
401410
pollWorkflowUpdateHelper(future, pollUpdateRequest, pollTimeoutDeadline);
402-
return new PollWorkflowUpdateOutput(
411+
return new PollWorkflowUpdateOutput<>(
403412
future.thenApply(
404413
(result) -> {
405414
if (result.hasOutcome()) {
406415
switch (result.getOutcome().getValueCase()) {
407416
case SUCCESS:
408417
Optional<Payloads> updateResult = Optional.of(result.getOutcome().getSuccess());
409-
R resultValue =
410-
convertResultPayloads(
411-
updateResult,
412-
input.getResultClass(),
413-
input.getResultType(),
414-
dataConverterWithWorkflowContext);
415-
return resultValue;
418+
return convertResultPayloads(
419+
updateResult,
420+
input.getResultClass(),
421+
input.getResultType(),
422+
dataConverterWithWorkflowContext);
416423
case FAILURE:
417424
throw new WorkflowUpdateException(
418425
input.getWorkflowExecution(),
@@ -434,31 +441,26 @@ private void pollWorkflowUpdateHelper(
434441
CompletableFuture<PollWorkflowExecutionUpdateResponse> resultCF,
435442
PollWorkflowExecutionUpdateRequest request,
436443
Deadline deadline) {
437-
438-
Deadline pollTimeoutDeadline =
439-
Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS).minimum(deadline);
440444
genericClient
441-
.pollUpdateAsync(request, pollTimeoutDeadline)
445+
.pollUpdateAsync(request, deadline)
442446
.whenComplete(
443447
(r, e) -> {
448+
if (e == null && !r.hasOutcome()) {
449+
pollWorkflowUpdateHelper(resultCF, request, deadline);
450+
return;
451+
}
444452
if ((e instanceof StatusRuntimeException
445453
&& ((StatusRuntimeException) e).getStatus().getCode()
446454
== Status.Code.DEADLINE_EXCEEDED)
447-
|| pollTimeoutDeadline.isExpired()
448-
|| (e == null && !r.hasOutcome())) {
449-
// if the request has timed out, stop retrying
450-
if (!deadline.isExpired()) {
451-
pollWorkflowUpdateHelper(resultCF, request, deadline);
452-
} else {
453-
resultCF.completeExceptionally(
454-
new TimeoutException(
455-
"WorkflowId="
456-
+ request.getUpdateRef().getWorkflowExecution().getWorkflowId()
457-
+ ", runId="
458-
+ request.getUpdateRef().getWorkflowExecution().getRunId()
459-
+ ", updateId="
460-
+ request.getUpdateRef().getUpdateId()));
461-
}
455+
|| deadline.isExpired()) {
456+
resultCF.completeExceptionally(
457+
new TimeoutException(
458+
"WorkflowId="
459+
+ request.getUpdateRef().getWorkflowExecution().getWorkflowId()
460+
+ ", runId="
461+
+ request.getUpdateRef().getWorkflowExecution().getRunId()
462+
+ ", updateId="
463+
+ request.getUpdateRef().getUpdateId()));
462464
} else if (e != null) {
463465
resultCF.completeExceptionally(e);
464466
} else {

temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,6 @@ public interface QueryableWorkflow {
177177
void mySignal(String value);
178178
}
179179

180-
@WorkflowInterface
181-
public interface SimpleWorkflowWithUpdate {
182-
183-
@WorkflowMethod
184-
String execute();
185-
186-
@UpdateMethod
187-
String update(String value);
188-
}
189-
190180
@WorkflowInterface
191181
public interface WorkflowWithUpdate {
192182

0 commit comments

Comments
 (0)