Skip to content

Commit 74022f1

Browse files
Add getResult to WorkflowUpdateHandle (#2324)
Add getResult to UpdateHandle
1 parent c6f0b58 commit 74022f1

File tree

6 files changed

+117
-19
lines changed

6 files changed

+117
-19
lines changed

temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateHandle.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,23 @@ public interface WorkflowUpdateHandle<T> {
4545
*/
4646
String getId();
4747

48+
/**
49+
* Returns the result of the workflow update.
50+
*
51+
* @return the result of the workflow update
52+
*/
53+
T getResult();
54+
55+
/**
56+
* Returns the result of the workflow update.
57+
*
58+
* @param timeout maximum time to wait and perform the background long polling
59+
* @param unit unit of timeout
60+
* @throws WorkflowUpdateTimeoutOrCancelledException if the timeout is reached.
61+
* @return the result of the workflow update
62+
*/
63+
T getResult(long timeout, TimeUnit unit);
64+
4865
/**
4966
* Returns a {@link CompletableFuture} with the update workflow execution request result,
5067
* potentially waiting for the update to complete.

temporal-sdk/src/main/java/io/temporal/internal/client/CompletedWorkflowUpdateHandleImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ public String getId() {
4949
return id;
5050
}
5151

52+
@Override
53+
public T getResult() {
54+
return result;
55+
}
56+
57+
@Override
58+
public T getResult(long timeout, TimeUnit unit) {
59+
return result;
60+
}
61+
5262
@Override
5363
public CompletableFuture<T> getResultAsync() {
5464
return CompletableFuture.completedFuture(result);

temporal-sdk/src/main/java/io/temporal/internal/client/LazyWorkflowUpdateHandleImpl.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package io.temporal.internal.client;
2222

23-
import io.grpc.Status;
2423
import io.grpc.StatusRuntimeException;
2524
import io.temporal.api.common.v1.WorkflowExecution;
2625
import io.temporal.client.WorkflowException;
@@ -30,10 +29,7 @@
3029
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
3130
import io.temporal.serviceclient.CheckedExceptionWrapper;
3231
import java.lang.reflect.Type;
33-
import java.util.concurrent.CompletableFuture;
34-
import java.util.concurrent.CompletionException;
35-
import java.util.concurrent.TimeUnit;
36-
import java.util.concurrent.TimeoutException;
32+
import java.util.concurrent.*;
3733

3834
@Experimental
3935
public final class LazyWorkflowUpdateHandleImpl<T> implements WorkflowUpdateHandle<T> {
@@ -97,19 +93,16 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
9793
failure -> {
9894
if (failure instanceof CompletionException) {
9995
// unwrap the CompletionException
100-
failure = ((Throwable) failure).getCause();
96+
failure = failure.getCause();
10197
}
102-
failure = CheckedExceptionWrapper.unwrap((Throwable) failure);
98+
failure = CheckedExceptionWrapper.unwrap(failure);
10399
if (failure instanceof Error) {
104100
throw (Error) failure;
105101
}
106102
if (failure instanceof StatusRuntimeException) {
107103
StatusRuntimeException sre = (StatusRuntimeException) failure;
108-
if (Status.Code.NOT_FOUND.equals(sre.getStatus().getCode())) {
109-
// Currently no way to tell if the NOT_FOUND was because the workflow ID
110-
// does not exist or because the update ID does not exist.
111-
throw sre;
112-
}
104+
// Currently no way to tell if the NOT_FOUND was because the workflow ID
105+
// does not exist or because the update ID does not exist.
113106
throw sre;
114107
} else if (failure instanceof WorkflowException) {
115108
throw (WorkflowException) failure;
@@ -120,6 +113,34 @@ public CompletableFuture<T> getResultAsync(long timeout, TimeUnit unit) {
120113
});
121114
}
122115

116+
@Override
117+
public T getResult() {
118+
try {
119+
return getResultAsync().get();
120+
} catch (InterruptedException e) {
121+
throw new RuntimeException(e);
122+
} catch (ExecutionException e) {
123+
Throwable cause = e.getCause();
124+
throw (cause instanceof RuntimeException
125+
? (RuntimeException) cause
126+
: new RuntimeException(cause));
127+
}
128+
}
129+
130+
@Override
131+
public T getResult(long timeout, TimeUnit unit) {
132+
try {
133+
return getResultAsync(timeout, unit).get();
134+
} catch (InterruptedException e) {
135+
throw new RuntimeException(e);
136+
} catch (ExecutionException e) {
137+
Throwable cause = e.getCause();
138+
throw (cause instanceof RuntimeException
139+
? (RuntimeException) cause
140+
: new RuntimeException(cause));
141+
}
142+
}
143+
123144
@Override
124145
public CompletableFuture<T> getResultAsync() {
125146
return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import static org.junit.Assert.*;
2424

25+
import io.grpc.StatusRuntimeException;
2526
import io.temporal.api.common.v1.WorkflowExecution;
2627
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
2728
import io.temporal.client.*;
@@ -54,12 +55,18 @@ public void updateNonExistentWorkflow() {
5455
}
5556

5657
@Test
57-
public void pollUpdateNonExistentWorkflow() throws ExecutionException, InterruptedException {
58+
public void pollUpdateNonExistentWorkflow() {
5859
WorkflowStub workflowStub =
5960
testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("non-existing-id");
6061
// Getting the update handle to a nonexistent workflow is fine
6162
WorkflowUpdateHandle<String> handle = workflowStub.getUpdateHandle("update-id", String.class);
62-
assertThrows(Exception.class, () -> handle.getResultAsync().get());
63+
ExecutionException e =
64+
assertThrows(ExecutionException.class, () -> handle.getResultAsync().get());
65+
assertTrue(e.getCause() instanceof StatusRuntimeException);
66+
StatusRuntimeException sre = (StatusRuntimeException) e.getCause();
67+
assertEquals(io.grpc.Status.Code.NOT_FOUND, sre.getStatus().getCode());
68+
sre = assertThrows(StatusRuntimeException.class, () -> handle.getResult());
69+
assertEquals(io.grpc.Status.Code.NOT_FOUND, sre.getStatus().getCode());
6370
}
6471

6572
@Test
@@ -127,7 +134,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx
127134

128135
// Try to get the result of an invalid update
129136
WorkflowUpdateHandle<String> handle = workflowStub.getUpdateHandle(updateId, String.class);
130-
assertThrows(Exception.class, () -> handle.getResultAsync().get());
137+
assertThrows(ExecutionException.class, () -> handle.getResultAsync().get());
131138

132139
assertEquals(
133140
"some-value",
@@ -192,9 +199,10 @@ public void updateWorkflowReuseOptions() throws ExecutionException, InterruptedE
192199
workflowStub.startUpdate(updateOptions, 0, "some-value").getResultAsync().get());
193200
testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId());
194201
// Try to send another update request with the same update options
195-
assertEquals(
196-
"some-other-value",
197-
workflowStub.startUpdate(updateOptions, 0, "some-other-value").getResultAsync().get());
202+
WorkflowUpdateHandle<String> handle =
203+
workflowStub.startUpdate(updateOptions, 0, "some-other-value");
204+
assertEquals("some-other-value", handle.getResultAsync().get());
205+
assertEquals("some-other-value", handle.getResult());
198206

199207
// Complete the workflow
200208
workflowStub.update("complete", void.class);

temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void LongRunningWorkflowUpdateId() throws ExecutionException, Interrupted
8888
}
8989

9090
@Test
91-
public void WorkflowUpdateGetResultTimeout() throws ExecutionException, InterruptedException {
91+
public void WorkflowUpdateGetResultAsyncTimeout() {
9292
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
9393
String workflowType = BlockingWorkflow.class.getSimpleName();
9494
WorkflowStub workflowStub =
@@ -123,6 +123,40 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup
123123
assertEquals("complete", workflowStub.getResult(String.class));
124124
}
125125

126+
@Test
127+
public void WorkflowUpdateGetResultTimeout() {
128+
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
129+
String workflowType = BlockingWorkflow.class.getSimpleName();
130+
WorkflowStub workflowStub =
131+
workflowClient.newUntypedWorkflowStub(
132+
workflowType,
133+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()));
134+
135+
workflowStub.start();
136+
SDKTestWorkflowRule.waitForOKQuery(workflowStub);
137+
138+
WorkflowUpdateHandle<String> handle =
139+
workflowStub.startUpdate(
140+
"update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value");
141+
142+
// Verify get throws the correct exception in around the right amount of time
143+
Stopwatch stopWatch = Stopwatch.createStarted();
144+
assertThrows(
145+
WorkflowUpdateTimeoutOrCancelledException.class,
146+
() -> handle.getResult(2, TimeUnit.SECONDS));
147+
stopWatch.stop();
148+
long elapsedSeconds = stopWatch.elapsed(TimeUnit.SECONDS);
149+
assertTrue(
150+
"We shouldn't return too early or too late by the timeout, took "
151+
+ elapsedSeconds
152+
+ " seconds",
153+
elapsedSeconds >= 1 && elapsedSeconds <= 3);
154+
155+
// Complete workflow, since the update is accepted it will not block completion
156+
workflowStub.update("complete", void.class);
157+
assertEquals("complete", workflowStub.getResult(String.class));
158+
}
159+
126160
@WorkflowInterface
127161
public interface BlockingWorkflow {
128162
@WorkflowMethod

temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,14 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
186186
WorkflowUpdateException.class,
187187
() -> workflowStub.update("bad_update_name", String.class, 0, "Bad Update"));
188188

189+
// send an update request to a bad name through the async path
190+
assertThrows(
191+
WorkflowUpdateException.class,
192+
() ->
193+
workflowStub
194+
.startUpdate("bad_update_name", WorkflowUpdateStage.COMPLETED, String.class, 0, "")
195+
.getResult());
196+
189197
// send a bad update that will be rejected through the sync path
190198
assertThrows(
191199
WorkflowUpdateException.class,

0 commit comments

Comments
 (0)