Skip to content

Commit e3921b6

Browse files
Add support for workflow init in Springboot (#2470)
Add support for workflow init in Springboot
1 parent b3b7806 commit e3921b6

File tree

6 files changed

+111
-26
lines changed

6 files changed

+111
-26
lines changed

temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import io.temporal.worker.WorkflowImplementationOptions;
5151
import io.temporal.workflow.DynamicWorkflow;
5252
import io.temporal.workflow.Functions;
53-
import io.temporal.workflow.Functions.Func;
5453
import java.lang.reflect.Constructor;
5554
import java.lang.reflect.InvocationTargetException;
5655
import java.lang.reflect.Method;
@@ -91,7 +90,7 @@ public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFa
9190
workflowDefinitions = Collections.synchronizedMap(new HashMap<>());
9291

9392
/** Factories providing instances of workflow classes. */
94-
private final Map<Class<?>, Functions.Func<?>> workflowInstanceFactories =
93+
private final Map<Class<?>, Functions.Func1<EncodedValues, ?>> workflowInstanceFactories =
9594
Collections.synchronizedMap(new HashMap<>());
9695

9796
/** If present then it is called for any unknown workflow type. */
@@ -146,15 +145,17 @@ public void registerWorkflowImplementationTypes(
146145
*/
147146
@SuppressWarnings("unchecked")
148147
public <R> void addWorkflowImplementationFactory(
149-
WorkflowImplementationOptions options, Class<R> clazz, Functions.Func<R> factory) {
148+
WorkflowImplementationOptions options,
149+
Class<R> clazz,
150+
Functions.Func1<EncodedValues, R> factory) {
150151
if (DynamicWorkflow.class.isAssignableFrom(clazz)) {
151152
if (dynamicWorkflowImplementationFactory != null) {
152153
throw new TypeAlreadyRegisteredException(
153154
"DynamicWorkflow",
154155
"An implementation of DynamicWorkflow or its factory is already registered with the worker");
155156
}
156157
dynamicWorkflowImplementationFactory =
157-
(unused) -> ((Func<? extends DynamicWorkflow>) factory).apply();
158+
(Functions.Func1<EncodedValues, ? extends DynamicWorkflow>) factory;
158159
return;
159160
}
160161
workflowInstanceFactories.put(clazz, factory);
@@ -433,16 +434,17 @@ public WorkflowOutput execute(WorkflowInput input) {
433434
}
434435

435436
protected void newInstance(Optional<Payloads> input) {
436-
Func<?> factory = workflowInstanceFactories.get(workflowImplementationClass);
437+
Functions.Func1<EncodedValues, ?> factory =
438+
workflowInstanceFactories.get(workflowImplementationClass);
437439
if (factory != null) {
438-
workflow = factory.apply();
440+
workflow = factory.apply(new EncodedValues(input, dataConverterWithWorkflowContext));
439441
} else {
440442
// Historically any exception thrown from the constructor was wrapped into Error causing a
441443
// workflow task failure.
442444
// This is not consistent with throwing exception from the workflow method which can
443445
// causes a workflow failure depending on the exception type.
444446
// To preserve backwards compatibility we only change behaviour if a constructor is
445-
// annotated with WorkflowInit.
447+
// annotated with @WorkflowInit.
446448
if (ctor != null) {
447449
try {
448450
workflow =

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.temporal.api.taskqueue.v1.TaskQueue;
2727
import io.temporal.client.WorkflowClient;
2828
import io.temporal.common.converter.DataConverter;
29+
import io.temporal.common.converter.EncodedValues;
2930
import io.temporal.internal.activity.ActivityExecutionContextFactory;
3031
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
3132
import io.temporal.internal.activity.LocalActivityExecutionContextFactoryImpl;
@@ -38,6 +39,7 @@
3839
import io.temporal.worker.tuning.SlotSupplier;
3940
import io.temporal.worker.tuning.WorkflowSlotInfo;
4041
import io.temporal.workflow.Functions.Func;
42+
import io.temporal.workflow.Functions.Func1;
4143
import java.lang.reflect.Type;
4244
import java.time.Duration;
4345
import java.util.Objects;
@@ -165,6 +167,11 @@ public void registerWorkflowImplementationTypes(
165167

166168
public <R> void registerWorkflowImplementationFactory(
167169
WorkflowImplementationOptions options, Class<R> clazz, Func<R> factory) {
170+
this.factory.addWorkflowImplementationFactory(options, clazz, unused -> factory.apply());
171+
}
172+
173+
public <R> void registerWorkflowImplementationFactory(
174+
WorkflowImplementationOptions options, Class<R> clazz, Func1<EncodedValues, R> factory) {
168175
this.factory.addWorkflowImplementationFactory(options, clazz, factory);
169176
}
170177

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@
3131
import io.temporal.common.WorkflowExecutionHistory;
3232
import io.temporal.common.context.ContextPropagator;
3333
import io.temporal.common.converter.DataConverter;
34+
import io.temporal.common.converter.EncodedValues;
3435
import io.temporal.failure.TemporalFailure;
3536
import io.temporal.internal.sync.WorkflowInternal;
3637
import io.temporal.internal.sync.WorkflowThreadExecutor;
3738
import io.temporal.internal.worker.*;
3839
import io.temporal.serviceclient.MetricsTag;
3940
import io.temporal.worker.tuning.*;
41+
import io.temporal.workflow.Functions;
4042
import io.temporal.workflow.Functions.Func;
4143
import io.temporal.workflow.WorkflowMethod;
4244
import java.time.Duration;
@@ -325,6 +327,14 @@ public <R> void registerWorkflowImplementationFactory(
325327
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
326328
}
327329

330+
@VisibleForTesting
331+
public <R> void registerWorkflowImplementationFactory(
332+
Class<R> workflowInterface,
333+
Functions.Func1<EncodedValues, R> factory,
334+
WorkflowImplementationOptions options) {
335+
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
336+
}
337+
328338
/**
329339
* Configures a factory to use when an instance of a workflow implementation is created.
330340
*

temporal-spring-boot-autoconfigure/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ dependencies {
3434
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
3535

3636
testImplementation "org.springframework.boot:spring-boot-starter-test"
37+
38+
testImplementation('org.slf4j:slf4j-api') {
39+
version {
40+
strictly "${slf4jVersion}"
41+
}
42+
}
3743
}
3844

3945
tasks.test {

temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkersTemplate.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.temporal.spring.boot.autoconfigure.properties.NamespaceProperties;
3838
import io.temporal.spring.boot.autoconfigure.properties.WorkerProperties;
3939
import io.temporal.worker.*;
40+
import java.lang.reflect.Constructor;
41+
import java.lang.reflect.InvocationTargetException;
4042
import java.util.ArrayList;
4143
import java.util.Collection;
4244
import java.util.HashMap;
@@ -510,7 +512,6 @@ private void configureWorkflowImplementationAutoDiscovery(
510512

511513
@SuppressWarnings("unchecked")
512514
private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz) {
513-
514515
POJOWorkflowImplMetadata workflowMetadata =
515516
POJOWorkflowImplMetadata.newInstanceForWorkflowFactory(clazz);
516517
List<POJOWorkflowMethodMetadata> workflowMethods = workflowMetadata.getWorkflowMethods();
@@ -527,7 +528,18 @@ private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz)
527528

528529
WorkerDeploymentOptions deploymentOptions = worker.getWorkerOptions().getDeploymentOptions();
529530

530-
for (POJOWorkflowMethodMetadata workflowMethod : workflowMetadata.getWorkflowMethods()) {
531+
// If the workflow implementation class has a constructor annotated with @WorkflowInit,
532+
// we need to register it as a workflow factory.
533+
if (workflowMetadata.getWorkflowInit() != null) {
534+
// Currently, we only support one workflow method in a class with a constructor annotated with
535+
// @WorkflowInit.
536+
if (workflowMethods.size() > 1) {
537+
throw new BeanDefinitionValidationException(
538+
"Workflow implementation class "
539+
+ clazz
540+
+ " has more then one workflow method and a constructor annotated with @WorkflowInit.");
541+
}
542+
POJOWorkflowMethodMetadata workflowMethod = workflowMetadata.getWorkflowMethods().get(0);
531543
if (deploymentOptions != null && deploymentOptions.isUsingVersioning()) {
532544
POJOWorkflowImplementationFactory.validateVersioningBehavior(
533545
clazz,
@@ -538,10 +550,43 @@ private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz)
538550

539551
worker.registerWorkflowImplementationFactory(
540552
(Class<T>) workflowMethod.getWorkflowInterface(),
541-
() -> (T) beanFactory.createBean(clazz),
553+
(encodedValues) -> {
554+
try {
555+
Constructor<?> ctor = workflowMetadata.getWorkflowInit();
556+
Object[] parameters = new Object[ctor.getParameterCount()];
557+
for (int i = 0; i < ctor.getParameterCount(); i++) {
558+
parameters[i] =
559+
encodedValues.get(
560+
i, ctor.getParameterTypes()[i], ctor.getGenericParameterTypes()[i]);
561+
}
562+
T workflowInstance = (T) workflowMetadata.getWorkflowInit().newInstance(parameters);
563+
beanFactory.autowireBean(workflowInstance);
564+
return workflowInstance;
565+
} catch (InstantiationException
566+
| IllegalAccessException
567+
| InvocationTargetException e) {
568+
throw new RuntimeException(e);
569+
}
570+
},
542571
workflowImplementationOptions);
543572
addRegisteredWorkflowImpl(
544573
worker, workflowMethod.getWorkflowInterface().getName(), workflowMetadata);
574+
} else {
575+
for (POJOWorkflowMethodMetadata workflowMethod : workflowMetadata.getWorkflowMethods()) {
576+
if (deploymentOptions != null && deploymentOptions.isUsingVersioning()) {
577+
POJOWorkflowImplementationFactory.validateVersioningBehavior(
578+
clazz,
579+
workflowMethod,
580+
deploymentOptions.getDefaultVersioningBehavior(),
581+
deploymentOptions.isUsingVersioning());
582+
}
583+
worker.registerWorkflowImplementationFactory(
584+
(Class<T>) workflowMethod.getWorkflowInterface(),
585+
() -> (T) beanFactory.createBean(clazz),
586+
workflowImplementationOptions);
587+
addRegisteredWorkflowImpl(
588+
worker, workflowMethod.getWorkflowInterface().getName(), workflowMetadata);
589+
}
545590
}
546591
}
547592

temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestWorkflowImpl.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,44 @@
2626
import io.temporal.workflow.NexusOperationOptions;
2727
import io.temporal.workflow.NexusServiceOptions;
2828
import io.temporal.workflow.Workflow;
29+
import io.temporal.workflow.WorkflowInit;
2930
import java.time.Duration;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.ConfigurableApplicationContext;
3033

3134
@WorkflowImpl(taskQueues = {"${default-queue.name:UnitTest}"})
3235
public class TestWorkflowImpl implements TestWorkflow {
36+
private final TestNexusService nexusService;
37+
private final TestActivity activity;
38+
39+
@Autowired private ConfigurableApplicationContext applicationContext;
40+
41+
@WorkflowInit
42+
public TestWorkflowImpl(String input) {
43+
nexusService =
44+
Workflow.newNexusServiceStub(
45+
TestNexusService.class,
46+
NexusServiceOptions.newBuilder()
47+
.setEndpoint("AutoDiscoveryByTaskQueueEndpoint")
48+
.setOperationOptions(
49+
NexusOperationOptions.newBuilder()
50+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
51+
.build())
52+
.build());
53+
54+
activity =
55+
Workflow.newActivityStub(
56+
TestActivity.class,
57+
ActivityOptions.newBuilder()
58+
.setStartToCloseTimeout(Duration.ofSeconds(1))
59+
.validateAndBuildWithDefaults());
60+
}
61+
3362
@Override
3463
public String execute(String input) {
3564
if (input.equals("nexus")) {
36-
Workflow.newNexusServiceStub(
37-
TestNexusService.class,
38-
NexusServiceOptions.newBuilder()
39-
.setEndpoint("AutoDiscoveryByTaskQueueEndpoint")
40-
.setOperationOptions(
41-
NexusOperationOptions.newBuilder()
42-
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
43-
.build())
44-
.build())
45-
.operation(input);
65+
nexusService.operation(input);
4666
}
47-
return Workflow.newActivityStub(
48-
TestActivity.class,
49-
ActivityOptions.newBuilder()
50-
.setStartToCloseTimeout(Duration.ofSeconds(1))
51-
.validateAndBuildWithDefaults())
52-
.execute("done");
67+
return activity.execute("done");
5368
}
5469
}

0 commit comments

Comments
 (0)