Skip to content

Commit 3410677

Browse files
authored
Fix UpdateWithStart untyped operation (#2288)
1 parent 7bcade2 commit 3410677

File tree

3 files changed

+54
-25
lines changed

3 files changed

+54
-25
lines changed

temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Arrays;
2727
import java.util.concurrent.*;
2828
import java.util.concurrent.atomic.AtomicBoolean;
29+
import javax.annotation.Nullable;
2930

3031
/**
3132
* UpdateWithStartWorkflowOperation is an update workflow request that can be executed together with
@@ -284,26 +285,27 @@ public static <R> Builder<R> newBuilder(String updateName, Class<R> resultClass,
284285

285286
private final CompletableFuture<WorkflowUpdateHandle<R>> handle;
286287

287-
private final Functions.Proc request;
288+
@Nullable private final Functions.Proc updateRequest;
288289

289290
private UpdateWithStartWorkflowOperation(
290-
UpdateOptions<R> options, Functions.Proc request, Object[] updateArgs) {
291+
UpdateOptions<R> options, Functions.Proc updateRequest, Object[] updateArgs) {
291292
this.options = options;
292293
this.updateArgs = updateArgs;
293294
this.handle = new CompletableFuture<>();
294-
this.request = request;
295+
this.updateRequest = updateRequest;
295296
}
296297

297-
WorkflowUpdateHandle<R> invoke(Functions.Proc workflow) {
298+
WorkflowUpdateHandle<R> invoke(Functions.Proc workflowRequest) {
298299
WorkflowInvocationHandler.initAsyncInvocation(
299300
WorkflowInvocationHandler.InvocationType.UPDATE_WITH_START, this);
300301
try {
301-
// invokes `prepareUpdate` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler
302-
request.apply();
303-
304302
// invokes `prepareStart` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler
305-
workflow.apply();
303+
workflowRequest.apply();
306304

305+
if (updateRequest != null) { // only present when using typed API
306+
// invokes `prepareUpdate` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler
307+
updateRequest.apply();
308+
}
307309
stub.updateWithStart(this, this.workflowArgs);
308310
return this.handle.get();
309311
} catch (InterruptedException e) {
@@ -365,8 +367,8 @@ public Object[] getUpdateArgs() {
365367
public String toString() {
366368
StringBuilder sb = new StringBuilder();
367369
sb.append("UpdateWithStartWorkflowOperation{options=").append(options);
368-
if (request != null) {
369-
sb.append(", request=").append(request);
370+
if (updateRequest != null) {
371+
sb.append(", updateRequest=").append(updateRequest);
370372
}
371373
if (updateArgs != null) {
372374
sb.append(", updateArgs=").append(Arrays.toString(updateArgs));

temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -444,14 +444,14 @@ public <R> R getResult(Class<R> resultClass) {
444444
private static class UpdateWithStartInvocationHandler implements SpecificInvocationHandler {
445445

446446
enum State {
447-
NOT_STARTED,
447+
INIT,
448448
START_RECEIVED,
449449
UPDATE_RECEIVED,
450450
}
451451

452452
private final UpdateWithStartWorkflowOperation operation;
453453

454-
private State state = State.NOT_STARTED;
454+
private State state = State.INIT;
455455

456456
public UpdateWithStartInvocationHandler(UpdateWithStartWorkflowOperation operation) {
457457
this.operation = operation;
@@ -471,7 +471,15 @@ public void invoke(
471471

472472
POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method);
473473

474-
if (state == State.NOT_STARTED) {
474+
if (state == State.INIT) {
475+
WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
476+
if (workflowMethod == null) {
477+
throw new IllegalArgumentException(
478+
"Method '" + method.getName() + "' is not a WorkflowMethod");
479+
}
480+
this.operation.prepareStart(untyped, args);
481+
state = State.START_RECEIVED;
482+
} else if (state == State.START_RECEIVED) {
475483
UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class);
476484
if (updateMethod == null) {
477485
throw new IllegalArgumentException(
@@ -483,14 +491,6 @@ public void invoke(
483491
method.getReturnType(),
484492
method.getGenericReturnType(),
485493
args);
486-
state = State.START_RECEIVED;
487-
} else if (state == State.START_RECEIVED) {
488-
WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
489-
if (workflowMethod == null) {
490-
throw new IllegalArgumentException(
491-
"Method '" + method.getName() + "' is not a WorkflowMethod");
492-
}
493-
this.operation.prepareStart(untyped, args);
494494
state = State.UPDATE_RECEIVED;
495495
} else {
496496
throw new IllegalArgumentException(

temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,35 @@ public void startAndSendUpdateTogether() throws ExecutionException, InterruptedE
8484
assertEquals(options.getWorkflowId(), handle1.getExecution().getWorkflowId());
8585
assertEquals("Hello Update", handle1.getResultAsync().get());
8686

87-
WorkflowUpdateHandle<String> updHandle = updateOp.getUpdateHandle().get();
88-
assertEquals(updateOp.getResult(), updHandle.getResultAsync().get());
87+
WorkflowUpdateHandle<String> handle2 = updateOp.getUpdateHandle().get();
88+
assertEquals(updateOp.getResult(), handle2.getResultAsync().get());
89+
90+
workflow.complete();
91+
92+
assertEquals("Hello Update complete", WorkflowStub.fromTyped(workflow).getResult(String.class));
93+
}
94+
95+
@Test
96+
public void startAndSendUpdateTogetherUsingUntypedWorkflowOperation()
97+
throws ExecutionException, InterruptedException {
98+
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
99+
100+
WorkflowOptions options = createOptions();
101+
TestWorkflows.WorkflowWithUpdate workflow =
102+
workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options);
103+
104+
UpdateWithStartWorkflowOperation<String> updateOp =
105+
UpdateWithStartWorkflowOperation.newBuilder(
106+
"update", String.class, new Object[] {1, "Hello Update"}) // untyped!
107+
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
108+
.build();
109+
110+
WorkflowUpdateHandle<String> handle1 =
111+
WorkflowClient.updateWithStart(workflow::execute, updateOp);
112+
assertEquals("Hello Update", handle1.getResultAsync().get());
113+
114+
WorkflowUpdateHandle<String> handle2 = updateOp.getUpdateHandle().get();
115+
assertEquals(updateOp.getResult(), handle2.getResultAsync().get());
89116

90117
workflow.complete();
91118

@@ -110,8 +137,8 @@ public void startAndSendUpdateTogetherWithNullUpdateResult()
110137
WorkflowClient.updateWithStart(workflow::execute, updateOp);
111138
assertNull(handle1.getResultAsync().get());
112139

113-
WorkflowUpdateHandle<Void> updHandle = updateOp.getUpdateHandle().get();
114-
assertEquals(updateOp.getResult(), updHandle.getResultAsync().get());
140+
WorkflowUpdateHandle<Void> handle2 = updateOp.getUpdateHandle().get();
141+
assertEquals(updateOp.getResult(), handle2.getResultAsync().get());
115142

116143
assertEquals("Hello Update", WorkflowStub.fromTyped(workflow).getResult(String.class));
117144
}

0 commit comments

Comments
 (0)