Skip to content

Commit f4a572a

Browse files
Apply serialization context to Dynamic Workflows (#1992)
1 parent 4da4591 commit f4a572a

File tree

3 files changed

+44
-11
lines changed

3 files changed

+44
-11
lines changed

temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,17 @@ final class DynamicSyncWorkflowDefinition implements SyncWorkflowDefinition {
3636

3737
private final Functions.Func<? extends DynamicWorkflow> factory;
3838
private final WorkerInterceptor[] workerInterceptors;
39-
private final DataConverter dataConverter;
39+
// don't pass it down to other classes, it's a "cached" instance for internal usage only
40+
private final DataConverter dataConverterWithWorkflowContext;
4041
private WorkflowInboundCallsInterceptor workflowInvoker;
4142

4243
public DynamicSyncWorkflowDefinition(
4344
Functions.Func<? extends DynamicWorkflow> factory,
4445
WorkerInterceptor[] workerInterceptors,
45-
DataConverter dataConverter) {
46+
DataConverter dataConverterWithWorkflowContext) {
4647
this.factory = factory;
4748
this.workerInterceptors = workerInterceptors;
48-
this.dataConverter = dataConverter;
49+
this.dataConverterWithWorkflowContext = dataConverterWithWorkflowContext;
4950
}
5051

5152
@Override
@@ -61,11 +62,11 @@ public void initialize() {
6162

6263
@Override
6364
public Optional<Payloads> execute(Header header, Optional<Payloads> input) {
64-
Values args = new EncodedValues(input, dataConverter);
65+
Values args = new EncodedValues(input, dataConverterWithWorkflowContext);
6566
WorkflowInboundCallsInterceptor.WorkflowOutput result =
6667
workflowInvoker.execute(
6768
new WorkflowInboundCallsInterceptor.WorkflowInput(header, new Object[] {args}));
68-
return dataConverter.toPayloads(result.getResult());
69+
return dataConverterWithWorkflowContext.toPayloads(result.getResult());
6970
}
7071

7172
class RootWorkflowInboundCallsInterceptor extends BaseRootWorkflowInboundCallsInterceptor {

temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,10 @@ private SyncWorkflowDefinition getWorkflowDefinition(
230230
if (factory == null) {
231231
if (dynamicWorkflowImplementationFactory != null) {
232232
return new DynamicSyncWorkflowDefinition(
233-
dynamicWorkflowImplementationFactory, workerInterceptors, dataConverter);
233+
dynamicWorkflowImplementationFactory,
234+
workerInterceptors,
235+
dataConverter.withContext(
236+
new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId())));
234237
}
235238
// throw Error to abort the workflow task, not fail the workflow
236239
throw new Error(

temporal-sdk/src/test/java/io/temporal/functional/serialization/WorkflowIdSignedPayloadsTest.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@
4040
import io.temporal.payload.context.SerializationContext;
4141
import io.temporal.testing.internal.SDKTestOptions;
4242
import io.temporal.testing.internal.SDKTestWorkflowRule;
43-
import io.temporal.workflow.ChildWorkflowOptions;
44-
import io.temporal.workflow.ContinueAsNewOptions;
45-
import io.temporal.workflow.Workflow;
43+
import io.temporal.workflow.*;
4644
import io.temporal.workflow.shared.TestWorkflowWithCronScheduleImpl;
4745
import io.temporal.workflow.shared.TestWorkflows;
4846
import java.io.IOException;
@@ -79,10 +77,13 @@ public class WorkflowIdSignedPayloadsTest {
7977
public SDKTestWorkflowRule testWorkflowRule =
8078
SDKTestWorkflowRule.newBuilder()
8179
.setWorkflowTypes(
82-
SimpleWorkflowWithAnActivity.class, TestWorkflowWithCronScheduleImpl.class)
80+
SimpleWorkflowWithAnActivity.class,
81+
TestWorkflowWithCronScheduleImpl.class,
82+
DynamicWorkflowImpl.class)
8383
.setWorkflowClientOptions(
8484
WorkflowClientOptions.newBuilder().setDataConverter(codecDataConverter).build())
85-
.setActivityImplementations(heartbeatingActivity, manualCompletionActivity)
85+
.setActivityImplementations(
86+
heartbeatingActivity, manualCompletionActivity, new DynamicActivityImpl())
8687
.build();
8788

8889
@Rule public TestName testName = new TestName();
@@ -162,6 +163,16 @@ public void testSimpleCronWorkflow() {
162163
TestWorkflowWithCronScheduleImpl.lastFail.get().getMessage().contains("simulated error"));
163164
}
164165

166+
@Test
167+
public void testDynamicWorkflow() {
168+
WorkflowOptions options =
169+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue());
170+
WorkflowStub workflow =
171+
testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("workflowFoo", options);
172+
workflow.start("World");
173+
assertEquals("Hello World", workflow.getResult(String.class));
174+
}
175+
165176
@ActivityInterface
166177
public interface SimpleActivity {
167178
@ActivityMethod(name = "simple")
@@ -258,6 +269,24 @@ public String execute(String input) {
258269
}
259270
}
260271

272+
public static class DynamicWorkflowImpl implements DynamicWorkflow {
273+
@Override
274+
public Object execute(EncodedValues args) {
275+
String input = args.get(0, String.class);
276+
ActivityStub activity =
277+
Workflow.newUntypedActivityStub(
278+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
279+
return activity.execute("dynamicActivity", String.class, input);
280+
}
281+
}
282+
283+
public static class DynamicActivityImpl implements DynamicActivity {
284+
@Override
285+
public Object execute(EncodedValues args) {
286+
return "Hello " + args.get(0, String.class);
287+
}
288+
}
289+
261290
private static class PayloadEncoderWithWorkflowIdSignature implements PayloadCodec {
262291
private final ByteString METADATA_ENCODING = ByteString.copyFromUtf8("signed");
263292

0 commit comments

Comments
 (0)