Skip to content

Commit 29d23d0

Browse files
Make sure task failures use a serialization context (#2022)
1 parent 0c6c566 commit 29d23d0

File tree

3 files changed

+53
-7
lines changed

3 files changed

+53
-7
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
4141
import io.temporal.api.taskqueue.v1.TaskQueue;
4242
import io.temporal.api.workflowservice.v1.*;
43+
import io.temporal.common.converter.DataConverter;
4344
import io.temporal.internal.common.ProtobufTimeUtils;
4445
import io.temporal.internal.common.WorkflowExecutionUtils;
4546
import io.temporal.internal.worker.*;
47+
import io.temporal.payload.context.WorkflowSerializationContext;
4648
import io.temporal.serviceclient.MetricsTag;
4749
import io.temporal.serviceclient.WorkflowServiceStubs;
4850
import io.temporal.worker.NonDeterministicException;
@@ -173,7 +175,12 @@ private Result handleWorkflowTaskWithQuery(
173175
return createDirectQueryResult(workflowTask, null, e);
174176
} else {
175177
// this call rethrows an exception in some scenarios
176-
return failureToWFTResult(workflowTask, e);
178+
DataConverter dataConverterWithWorkflowContext =
179+
options
180+
.getDataConverter()
181+
.withContext(
182+
new WorkflowSerializationContext(namespace, execution.getWorkflowId()));
183+
return failureToWFTResult(workflowTask, e, dataConverterWithWorkflowContext);
177184
}
178185
} finally {
179186
if (!useCache && workflowRunTaskHandler != null) {
@@ -254,7 +261,8 @@ private Result createCompletedWFTRequest(
254261
}
255262

256263
private Result failureToWFTResult(
257-
PollWorkflowTaskQueueResponseOrBuilder workflowTask, Throwable e) throws Exception {
264+
PollWorkflowTaskQueueResponseOrBuilder workflowTask, Throwable e, DataConverter dc)
265+
throws Exception {
258266
String workflowType = workflowTask.getWorkflowType().getName();
259267
if (e instanceof WorkflowExecutionException) {
260268
RespondWorkflowTaskCompletedRequest response =
@@ -299,7 +307,7 @@ private Result failureToWFTResult(
299307
throw (Exception) e;
300308
}
301309

302-
Failure failure = options.getDataConverter().exceptionToFailure(e);
310+
Failure failure = dc.exceptionToFailure(e);
303311
RespondWorkflowTaskFailedRequest.Builder failedRequest =
304312
RespondWorkflowTaskFailedRequest.newBuilder()
305313
.setTaskToken(workflowTask.getTaskToken())

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,10 @@ public void handleUpdate(
167167
// Rethrow instead on rejecting the update to fail the WFT
168168
throw r;
169169
} catch (Exception e) {
170-
callbacks.reject(this.dataConverter.exceptionToFailure(e));
170+
callbacks.reject(
171+
workflowContext
172+
.getDataConverterWithCurrentWorkflowContext()
173+
.exceptionToFailure(e));
171174
return;
172175
} finally {
173176
workflowContext.setReadOnly(false);

temporal-sdk/src/test/java/io/temporal/functional/serialization/WorkflowIdSignedPayloadsTest.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import io.temporal.activity.*;
3030
import io.temporal.api.common.v1.Payload;
3131
import io.temporal.api.common.v1.WorkflowExecution;
32+
import io.temporal.api.enums.v1.EventType;
3233
import io.temporal.client.*;
33-
import io.temporal.client.schedules.*;
3434
import io.temporal.common.converter.*;
3535
import io.temporal.failure.CanceledFailure;
3636
import io.temporal.payload.codec.PayloadCodec;
@@ -46,6 +46,7 @@
4646
import java.io.IOException;
4747
import java.time.Duration;
4848
import java.util.*;
49+
import java.util.concurrent.atomic.AtomicInteger;
4950
import java.util.stream.Collectors;
5051
import javax.annotation.Nonnull;
5152
import javax.annotation.Nullable;
@@ -71,15 +72,17 @@ public class WorkflowIdSignedPayloadsTest {
7172
private static final DataConverter codecDataConverter =
7273
new CodecDataConverter(
7374
DefaultDataConverter.STANDARD_INSTANCE,
74-
Collections.singletonList(new PayloadEncoderWithWorkflowIdSignature()));
75+
Collections.singletonList(new PayloadEncoderWithWorkflowIdSignature()),
76+
true);
7577

7678
@Rule
7779
public SDKTestWorkflowRule testWorkflowRule =
7880
SDKTestWorkflowRule.newBuilder()
7981
.setWorkflowTypes(
8082
SimpleWorkflowWithAnActivity.class,
8183
TestWorkflowWithCronScheduleImpl.class,
82-
DynamicWorkflowImpl.class)
84+
DynamicWorkflowImpl.class,
85+
WorkflowWithQuery.class)
8386
.setWorkflowClientOptions(
8487
WorkflowClientOptions.newBuilder().setDataConverter(codecDataConverter).build())
8588
.setActivityImplementations(
@@ -173,6 +176,26 @@ public void testDynamicWorkflow() {
173176
assertEquals("Hello World", workflow.getResult(String.class));
174177
}
175178

179+
@Test
180+
public void testWorkflowWithTaskFailure() {
181+
WorkflowOptions options =
182+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue());
183+
TestWorkflows.TestWorkflowReturnString workflow =
184+
testWorkflowRule
185+
.getWorkflowClient()
186+
.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class, options);
187+
assertEquals("Hello World", workflow.execute());
188+
// Test that the task failure is recorded in the history
189+
// if the serialization fails the workflow task will timeout.
190+
assertEquals(
191+
1,
192+
testWorkflowRule
193+
.getHistoryEvents(
194+
WorkflowStub.fromTyped(workflow).getExecution().getWorkflowId(),
195+
EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
196+
.size());
197+
}
198+
176199
@ActivityInterface
177200
public interface SimpleActivity {
178201
@ActivityMethod(name = "simple")
@@ -213,6 +236,18 @@ public String execute(String input) {
213236
}
214237
}
215238

239+
public static class WorkflowWithQuery implements TestWorkflows.TestWorkflowReturnString {
240+
static AtomicInteger taskRetryCount = new AtomicInteger();
241+
242+
@Override
243+
public String execute() {
244+
if (taskRetryCount.incrementAndGet() == 1) {
245+
throw new RuntimeException("test");
246+
}
247+
return "Hello World";
248+
}
249+
}
250+
216251
public static class SimpleWorkflowWithAnActivity implements TestWorkflows.TestWorkflow1 {
217252

218253
private final SimpleActivity activity =

0 commit comments

Comments
 (0)