From 51249bd6023428d943f74f9bc25d783a83ab6b0d Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 18 Apr 2024 09:34:49 -0700 Subject: [PATCH 1/4] Refactored ActivityOptions merging --- .../io/temporal/activity/ActivityOptions.java | 1 + .../common/ActivityOptionsWithDefault.java | 53 +++++++++++++++++++ .../sync/ActivityInvocationHandler.java | 21 +++----- .../internal/sync/SyncWorkflowContext.java | 43 +++++++-------- .../internal/sync/WorkflowInternal.java | 24 ++------- .../TestActivityEnvironmentInternal.java | 10 +++- 6 files changed, 92 insertions(+), 60 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionsWithDefault.java diff --git a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java index fb498e28b..95856a8de 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/ActivityOptions.java @@ -243,6 +243,7 @@ public Builder setVersioningIntent(VersioningIntent versioningIntent) { return this; } + /** Merge options with override parameter having higher precedence. */ public Builder mergeActivityOptions(ActivityOptions override) { if (override == null) { return this; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionsWithDefault.java b/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionsWithDefault.java new file mode 100644 index 000000000..d11fdb515 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionsWithDefault.java @@ -0,0 +1,53 @@ +package io.temporal.internal.common; + +import io.temporal.activity.ActivityOptions; +import java.util.HashMap; +import java.util.Map; + +public final class ActivityOptionsWithDefault { + private ActivityOptions defaultOptions; + + private Map optionsMap; + + private final ActivityOptionsWithDefault overridden; + + public ActivityOptionsWithDefault( + ActivityOptionsWithDefault overridden, + ActivityOptions defaultOptions, + Map optionsMap) { + this.overridden = overridden; + this.defaultOptions = defaultOptions; + this.optionsMap = new HashMap<>(optionsMap); + } + + public ActivityOptionsWithDefault(ActivityOptionsWithDefault overridden) { + this.overridden = overridden; + defaultOptions = null; + optionsMap = null; + } + + public void setDefaultOptions(ActivityOptions defaultOptions) { + this.defaultOptions = defaultOptions; + } + + public void setOptionsMap(Map optionsMap) { + this.optionsMap = optionsMap; + } + + public ActivityOptions getMergedOptions(String activityType) { + ActivityOptions overrideOptions = overridden.getMergedOptions(activityType); + return merge(overrideOptions, defaultOptions, optionsMap.get(activityType)); + } + + /** later options override the previous ones */ + private static ActivityOptions merge(ActivityOptions... options) { + if (options == null || options.length == 0) { + return null; + } + ActivityOptions result = options[0]; + for (int i = 1; i < options.length; i++) { + result = result.toBuilder().mergeActivityOptions(options[i]).build(); + } + return result; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java index 18ef07f8c..1837bafe8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java @@ -24,41 +24,36 @@ import io.temporal.activity.ActivityOptions; import io.temporal.common.MethodRetry; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; +import io.temporal.internal.common.ActivityOptionsWithDefault; import io.temporal.workflow.ActivityStub; import io.temporal.workflow.Functions; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; import java.util.function.Function; @VisibleForTesting public class ActivityInvocationHandler extends ActivityInvocationHandlerBase { - private final ActivityOptions options; - private final Map activityMethodOptions; + private final ActivityOptionsWithDefault options; private final WorkflowOutboundCallsInterceptor activityExecutor; private final Functions.Proc assertReadOnly; @VisibleForTesting public static InvocationHandler newInstance( Class activityInterface, - ActivityOptions options, - Map methodOptions, + ActivityOptionsWithDefault options, WorkflowOutboundCallsInterceptor activityExecutor, Functions.Proc assertReadOnly) { return new ActivityInvocationHandler( - activityInterface, activityExecutor, options, methodOptions, assertReadOnly); + activityInterface, activityExecutor, options, assertReadOnly); } private ActivityInvocationHandler( Class activityInterface, WorkflowOutboundCallsInterceptor activityExecutor, - ActivityOptions options, - Map methodOptions, + ActivityOptionsWithDefault options, Functions.Proc assertReadOnly) { super(activityInterface); this.options = options; - this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions; this.activityExecutor = activityExecutor; this.assertReadOnly = assertReadOnly; } @@ -67,11 +62,7 @@ private ActivityInvocationHandler( protected Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; - ActivityOptions merged = - ActivityOptions.newBuilder(options) - .mergeActivityOptions(this.activityMethodOptions.get(activityName)) - .mergeMethodRetry(methodRetry) - .build(); + ActivityOptions merged = options.getMergedOptions(activityName); if (merged.getStartToCloseTimeout() == null && merged.getScheduleToCloseTimeout() == null) { throw new IllegalArgumentException( "Both StartToCloseTimeout and ScheduleToCloseTimeout aren't specified for " diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 78a3b714b..b018488c6 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -56,6 +56,7 @@ import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.failure.*; import io.temporal.internal.common.ActivityOptionUtils; +import io.temporal.internal.common.ActivityOptionsWithDefault; import io.temporal.internal.common.HeaderUtils; import io.temporal.internal.common.OptionsUtils; import io.temporal.internal.common.ProtobufTimeUtils; @@ -119,8 +120,15 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private WorkflowInboundCallsInterceptor headInboundInterceptor; private WorkflowOutboundCallsInterceptor headOutboundInterceptor; - private ActivityOptions defaultActivityOptions = null; - private Map activityOptionsMap; + /** Activity options specified through {@link WorkflowImplementationOptions}. */ + private ActivityOptionsWithDefault workflowImplementationOptionsActivityOptions; + + /** + * Activity options specified through {@link Workflow#setDefaultActivityOptions(ActivityOptions)} + * and {@link Workflow#applyActivityOptions(Map)} + */ + private ActivityOptionsWithDefault workflowActivityOptions; + private LocalActivityOptions defaultLocalActivityOptions = null; private Map localActivityOptionsMap; private boolean readOnly = false; @@ -145,8 +153,11 @@ public SyncWorkflowContext( this.queryDispatcher = queryDispatcher; this.updateDispatcher = updateDispatcher; if (workflowImplementationOptions != null) { - this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions(); - this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions()); + this.workflowImplementationOptionsActivityOptions = + new ActivityOptionsWithDefault( + null, + workflowImplementationOptions.getDefaultActivityOptions(), + workflowImplementationOptions.getActivityOptions()); this.defaultLocalActivityOptions = workflowImplementationOptions.getDefaultLocalActivityOptions(); this.localActivityOptionsMap = @@ -200,14 +211,8 @@ public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head updateDispatcher.setInboundCallsInterceptor(head); } - public ActivityOptions getDefaultActivityOptions() { - return defaultActivityOptions; - } - - public @Nonnull Map getActivityOptions() { - return activityOptionsMap != null - ? Collections.unmodifiableMap(activityOptionsMap) - : Collections.emptyMap(); + public ActivityOptionsWithDefault getActivityOptions() { + return workflowImplementationOptionsActivityOptions; } public LocalActivityOptions getDefaultLocalActivityOptions() { @@ -221,21 +226,11 @@ public LocalActivityOptions getDefaultLocalActivityOptions() { } public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) { - this.defaultActivityOptions = - (this.defaultActivityOptions == null) - ? defaultActivityOptions - : this.defaultActivityOptions.toBuilder() - .mergeActivityOptions(defaultActivityOptions) - .build(); + workflowActivityOptions.setDefaultOptions(defaultActivityOptions); } public void applyActivityOptions(Map activityTypeToOption) { - Objects.requireNonNull(activityTypeToOption); - if (this.activityOptionsMap == null) { - this.activityOptionsMap = new HashMap<>(activityTypeToOption); - return; - } - ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption); + workflowActivityOptions.setOptionsMap(activityTypeToOption); } public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 03c4da2b2..1ce9d28b2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -42,6 +42,7 @@ import io.temporal.common.metadata.POJOWorkflowMethodMetadata; import io.temporal.internal.WorkflowThreadMarker; import io.temporal.internal.common.ActivityOptionUtils; +import io.temporal.internal.common.ActivityOptionsWithDefault; import io.temporal.internal.common.NonIdempotentHandle; import io.temporal.internal.common.SearchAttributesUtil; import io.temporal.internal.logging.ReplayAwareLogger; @@ -300,29 +301,14 @@ public static T newActivityStub( // Merge the activity options we may have received from the workflow with the options we may // have received in WorkflowImplementationOptions. SyncWorkflowContext context = getRootWorkflowContext(); - options = (options == null) ? context.getDefaultActivityOptions() : options; - - Map mergedActivityOptionsMap; - @Nonnull Map predefinedActivityOptions = context.getActivityOptions(); - if (activityMethodOptions != null - && !activityMethodOptions.isEmpty() - && predefinedActivityOptions.isEmpty()) { - // we need to merge only in this case - mergedActivityOptionsMap = new HashMap<>(predefinedActivityOptions); - ActivityOptionUtils.mergePredefinedActivityOptions( - mergedActivityOptionsMap, activityMethodOptions); - } else { - mergedActivityOptionsMap = - MoreObjects.firstNonNull( - activityMethodOptions, - MoreObjects.firstNonNull(predefinedActivityOptions, Collections.emptyMap())); - } + ActivityOptionsWithDefault optionsWithDefault = + new ActivityOptionsWithDefault( + context.getActivityOptions(), options, activityMethodOptions); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( activityInterface, - options, - mergedActivityOptionsMap, + optionsWithDefault, context.getWorkflowOutboundInterceptor(), () -> assertNotReadOnly("schedule activity")); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 1760edf5c..8b65121b5 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -48,6 +48,7 @@ import io.temporal.internal.activity.ActivityExecutionContextFactory; import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl; import io.temporal.internal.activity.ActivityTaskHandlerImpl; +import io.temporal.internal.common.ActivityOptionsWithDefault; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.sync.*; import io.temporal.internal.testservice.InProcessGRPCServer; @@ -188,9 +189,11 @@ public T newActivityStub(Class activityInterface) { .setScheduleToCloseTimeout(Duration.ofDays(1)) .setHeartbeatTimeout(Duration.ofSeconds(1)) .build(); + ActivityOptionsWithDefault optionsWithDefault = + new ActivityOptionsWithDefault(null, options, null); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, options, null, new TestActivityExecutor(), () -> {}); + activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {}); invocationHandler = new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); @@ -204,9 +207,12 @@ public T newActivityStub(Class activityInterface) { */ @Override public T newActivityStub(Class activityInterface, ActivityOptions options) { + ActivityOptionsWithDefault optionsWithDefault = + new ActivityOptionsWithDefault(null, options, null); + InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, options, null, new TestActivityExecutor(), () -> {}); + activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {}); invocationHandler = new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); From 45f1f41a7fda1a1e4a0da6f1149d735c78c10cb9 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 18 Apr 2024 09:54:49 -0700 Subject: [PATCH 2/4] Renamed ActivityOptionsWithDefault to MergedActivityOptions --- ...efault.java => MergedActivityOptions.java} | 27 +++++++++++++++---- .../sync/ActivityInvocationHandler.java | 8 +++--- .../internal/sync/SyncWorkflowContext.java | 26 ++++++++---------- .../internal/sync/WorkflowInternal.java | 9 +++---- .../TestActivityEnvironmentInternal.java | 12 ++++----- 5 files changed, 46 insertions(+), 36 deletions(-) rename temporal-sdk/src/main/java/io/temporal/internal/common/{ActivityOptionsWithDefault.java => MergedActivityOptions.java} (63%) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionsWithDefault.java b/temporal-sdk/src/main/java/io/temporal/internal/common/MergedActivityOptions.java similarity index 63% rename from temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionsWithDefault.java rename to temporal-sdk/src/main/java/io/temporal/internal/common/MergedActivityOptions.java index d11fdb515..d9e8002a9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionsWithDefault.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/MergedActivityOptions.java @@ -4,15 +4,31 @@ import java.util.HashMap; import java.util.Map; -public final class ActivityOptionsWithDefault { +/** + * The chain of ActivityOptions and per type options maps. Used to merge options specified at the + * following layers: + * + *
+ *     * WorkflowImplementationOptions
+ *     * Workflow
+ *     * ActivityStub
+ * 
+ * + * Each next layer overrides specific options specified at the previous layer. + */ +public final class MergedActivityOptions { + + /** Common options across all activity types. */ private ActivityOptions defaultOptions; + /** Per activity type options. These override defaultOptions. */ private Map optionsMap; - private final ActivityOptionsWithDefault overridden; + /** The options specified at the previous layer. They are overriden by this object. */ + private final MergedActivityOptions overridden; - public ActivityOptionsWithDefault( - ActivityOptionsWithDefault overridden, + public MergedActivityOptions( + MergedActivityOptions overridden, ActivityOptions defaultOptions, Map optionsMap) { this.overridden = overridden; @@ -20,7 +36,7 @@ public ActivityOptionsWithDefault( this.optionsMap = new HashMap<>(optionsMap); } - public ActivityOptionsWithDefault(ActivityOptionsWithDefault overridden) { + public MergedActivityOptions(MergedActivityOptions overridden) { this.overridden = overridden; defaultOptions = null; optionsMap = null; @@ -34,6 +50,7 @@ public void setOptionsMap(Map optionsMap) { this.optionsMap = optionsMap; } + /** Get merged options for the given activityType. */ public ActivityOptions getMergedOptions(String activityType) { ActivityOptions overrideOptions = overridden.getMergedOptions(activityType); return merge(overrideOptions, defaultOptions, optionsMap.get(activityType)); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java index 1837bafe8..931fa0442 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ActivityInvocationHandler.java @@ -24,7 +24,7 @@ import io.temporal.activity.ActivityOptions; import io.temporal.common.MethodRetry; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; -import io.temporal.internal.common.ActivityOptionsWithDefault; +import io.temporal.internal.common.MergedActivityOptions; import io.temporal.workflow.ActivityStub; import io.temporal.workflow.Functions; import java.lang.reflect.InvocationHandler; @@ -33,14 +33,14 @@ @VisibleForTesting public class ActivityInvocationHandler extends ActivityInvocationHandlerBase { - private final ActivityOptionsWithDefault options; + private final MergedActivityOptions options; private final WorkflowOutboundCallsInterceptor activityExecutor; private final Functions.Proc assertReadOnly; @VisibleForTesting public static InvocationHandler newInstance( Class activityInterface, - ActivityOptionsWithDefault options, + MergedActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor, Functions.Proc assertReadOnly) { return new ActivityInvocationHandler( @@ -50,7 +50,7 @@ public static InvocationHandler newInstance( private ActivityInvocationHandler( Class activityInterface, WorkflowOutboundCallsInterceptor activityExecutor, - ActivityOptionsWithDefault options, + MergedActivityOptions options, Functions.Proc assertReadOnly) { super(activityInterface); this.options = options; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index b018488c6..4faddc14b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -56,8 +56,8 @@ import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.failure.*; import io.temporal.internal.common.ActivityOptionUtils; -import io.temporal.internal.common.ActivityOptionsWithDefault; import io.temporal.internal.common.HeaderUtils; +import io.temporal.internal.common.MergedActivityOptions; import io.temporal.internal.common.OptionsUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SdkFlag; @@ -120,14 +120,7 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private WorkflowInboundCallsInterceptor headInboundInterceptor; private WorkflowOutboundCallsInterceptor headOutboundInterceptor; - /** Activity options specified through {@link WorkflowImplementationOptions}. */ - private ActivityOptionsWithDefault workflowImplementationOptionsActivityOptions; - - /** - * Activity options specified through {@link Workflow#setDefaultActivityOptions(ActivityOptions)} - * and {@link Workflow#applyActivityOptions(Map)} - */ - private ActivityOptionsWithDefault workflowActivityOptions; + private final MergedActivityOptions activityOptions; private LocalActivityOptions defaultLocalActivityOptions = null; private Map localActivityOptionsMap; @@ -152,9 +145,10 @@ public SyncWorkflowContext( this.signalDispatcher = signalDispatcher; this.queryDispatcher = queryDispatcher; this.updateDispatcher = updateDispatcher; + MergedActivityOptions activityOptionsFromWorkflowImplementationOptions = null; if (workflowImplementationOptions != null) { - this.workflowImplementationOptionsActivityOptions = - new ActivityOptionsWithDefault( + activityOptionsFromWorkflowImplementationOptions = + new MergedActivityOptions( null, workflowImplementationOptions.getDefaultActivityOptions(), workflowImplementationOptions.getActivityOptions()); @@ -163,6 +157,8 @@ public SyncWorkflowContext( this.localActivityOptionsMap = new HashMap<>(workflowImplementationOptions.getLocalActivityOptions()); } + this.activityOptions = + new MergedActivityOptions(activityOptionsFromWorkflowImplementationOptions); this.workflowImplementationOptions = workflowImplementationOptions == null ? WorkflowImplementationOptions.getDefaultInstance() @@ -211,8 +207,8 @@ public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head updateDispatcher.setInboundCallsInterceptor(head); } - public ActivityOptionsWithDefault getActivityOptions() { - return workflowImplementationOptionsActivityOptions; + public MergedActivityOptions getActivityOptions() { + return activityOptions; } public LocalActivityOptions getDefaultLocalActivityOptions() { @@ -226,11 +222,11 @@ public LocalActivityOptions getDefaultLocalActivityOptions() { } public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) { - workflowActivityOptions.setDefaultOptions(defaultActivityOptions); + activityOptions.setDefaultOptions(defaultActivityOptions); } public void applyActivityOptions(Map activityTypeToOption) { - workflowActivityOptions.setOptionsMap(activityTypeToOption); + activityOptions.setOptionsMap(activityTypeToOption); } public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 1ce9d28b2..e5695e08b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -42,7 +42,7 @@ import io.temporal.common.metadata.POJOWorkflowMethodMetadata; import io.temporal.internal.WorkflowThreadMarker; import io.temporal.internal.common.ActivityOptionUtils; -import io.temporal.internal.common.ActivityOptionsWithDefault; +import io.temporal.internal.common.MergedActivityOptions; import io.temporal.internal.common.NonIdempotentHandle; import io.temporal.internal.common.SearchAttributesUtil; import io.temporal.internal.logging.ReplayAwareLogger; @@ -301,14 +301,13 @@ public static T newActivityStub( // Merge the activity options we may have received from the workflow with the options we may // have received in WorkflowImplementationOptions. SyncWorkflowContext context = getRootWorkflowContext(); - ActivityOptionsWithDefault optionsWithDefault = - new ActivityOptionsWithDefault( - context.getActivityOptions(), options, activityMethodOptions); + MergedActivityOptions activityOptions = + new MergedActivityOptions(context.getActivityOptions(), options, activityMethodOptions); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( activityInterface, - optionsWithDefault, + activityOptions, context.getWorkflowOutboundInterceptor(), () -> assertNotReadOnly("schedule activity")); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 8b65121b5..e421a4e75 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -48,7 +48,7 @@ import io.temporal.internal.activity.ActivityExecutionContextFactory; import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl; import io.temporal.internal.activity.ActivityTaskHandlerImpl; -import io.temporal.internal.common.ActivityOptionsWithDefault; +import io.temporal.internal.common.MergedActivityOptions; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.sync.*; import io.temporal.internal.testservice.InProcessGRPCServer; @@ -189,11 +189,10 @@ public T newActivityStub(Class activityInterface) { .setScheduleToCloseTimeout(Duration.ofDays(1)) .setHeartbeatTimeout(Duration.ofSeconds(1)) .build(); - ActivityOptionsWithDefault optionsWithDefault = - new ActivityOptionsWithDefault(null, options, null); + MergedActivityOptions activityOptions = new MergedActivityOptions(null, options, null); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {}); + activityInterface, activityOptions, new TestActivityExecutor(), () -> {}); invocationHandler = new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); @@ -207,12 +206,11 @@ public T newActivityStub(Class activityInterface) { */ @Override public T newActivityStub(Class activityInterface, ActivityOptions options) { - ActivityOptionsWithDefault optionsWithDefault = - new ActivityOptionsWithDefault(null, options, null); + MergedActivityOptions activityOptions = new MergedActivityOptions(null, options, null); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, optionsWithDefault, new TestActivityExecutor(), () -> {}); + activityInterface, activityOptions, new TestActivityExecutor(), () -> {}); invocationHandler = new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); From 53bf5b878ad06674507bd61e9dc74716bf2752ab Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 18 Apr 2024 11:45:01 -0700 Subject: [PATCH 3/4] Rewrote the unit test --- .../internal/common/ActivityOptionUtils.java | 8 - .../common/MergedActivityOptions.java | 44 ++++- .../internal/sync/SyncWorkflowContext.java | 2 +- .../ActivityOptionsPrecedenceTest.java | 127 ++++++++++++++ ...faultActivityOptionsSetOnWorkflowTest.java | 157 ------------------ 5 files changed, 165 insertions(+), 173 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsPrecedenceTest.java delete mode 100644 temporal-sdk/src/test/java/io/temporal/activity/DefaultActivityOptionsSetOnWorkflowTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java index d62854563..76f901e6a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java @@ -20,19 +20,11 @@ package io.temporal.internal.common; -import io.temporal.activity.ActivityOptions; import io.temporal.activity.LocalActivityOptions; import java.util.Map; import javax.annotation.Nonnull; public class ActivityOptionUtils { - public static void mergePredefinedActivityOptions( - @Nonnull Map mergeTo, - @Nonnull Map override) { - override.forEach( - (key, value) -> - mergeTo.merge(key, value, (o1, o2) -> o1.toBuilder().mergeActivityOptions(o2).build())); - } public static void mergePredefinedLocalActivityOptions( @Nonnull Map mergeTo, diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/MergedActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/internal/common/MergedActivityOptions.java index d9e8002a9..bb1df403a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/MergedActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/MergedActivityOptions.java @@ -1,3 +1,23 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.temporal.internal.common; import io.temporal.activity.ActivityOptions; @@ -22,7 +42,7 @@ public final class MergedActivityOptions { private ActivityOptions defaultOptions; /** Per activity type options. These override defaultOptions. */ - private Map optionsMap; + private final Map optionsMap = new HashMap<>(); /** The options specified at the previous layer. They are overriden by this object. */ private final MergedActivityOptions overridden; @@ -33,26 +53,32 @@ public MergedActivityOptions( Map optionsMap) { this.overridden = overridden; this.defaultOptions = defaultOptions; - this.optionsMap = new HashMap<>(optionsMap); + if (optionsMap != null) { + this.optionsMap.putAll(optionsMap); + } } public MergedActivityOptions(MergedActivityOptions overridden) { this.overridden = overridden; defaultOptions = null; - optionsMap = null; } public void setDefaultOptions(ActivityOptions defaultOptions) { this.defaultOptions = defaultOptions; } - public void setOptionsMap(Map optionsMap) { - this.optionsMap = optionsMap; + public void applyOptionsMap(Map optionsMap) { + if (optionsMap != null) { + this.optionsMap.putAll(optionsMap); + } } /** Get merged options for the given activityType. */ public ActivityOptions getMergedOptions(String activityType) { - ActivityOptions overrideOptions = overridden.getMergedOptions(activityType); + ActivityOptions overrideOptions = null; + if (overridden != null) { + overrideOptions = overridden.getMergedOptions(activityType); + } return merge(overrideOptions, defaultOptions, optionsMap.get(activityType)); } @@ -63,7 +89,11 @@ private static ActivityOptions merge(ActivityOptions... options) { } ActivityOptions result = options[0]; for (int i = 1; i < options.length; i++) { - result = result.toBuilder().mergeActivityOptions(options[i]).build(); + if (result == null) { + result = options[i]; + } else { + result = result.toBuilder().mergeActivityOptions(options[i]).build(); + } } return result; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 4faddc14b..155c99dd9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -226,7 +226,7 @@ public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) { } public void applyActivityOptions(Map activityTypeToOption) { - activityOptions.setOptionsMap(activityTypeToOption); + activityOptions.applyOptionsMap(activityTypeToOption); } public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) { diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsPrecedenceTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsPrecedenceTest.java new file mode 100644 index 000000000..8d575fa3c --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsPrecedenceTest.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.activity; + +import static org.junit.Assert.assertEquals; + +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities.TestActivity; +import io.temporal.workflow.shared.TestActivities.TestActivityImpl; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnMap; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.Rule; +import org.junit.Test; + +public class ActivityOptionsPrecedenceTest { + + private static final ActivityOptions implOptions = + ActivityOptions.newBuilder() + .setHeartbeatTimeout(Duration.ofSeconds(10)) + .setStartToCloseTimeout(Duration.ofHours(10)) + .setScheduleToCloseTimeout(Duration.ofDays(10)) + .build(); + + private static final Map implOptionsMap = + Collections.singletonMap( + "Activity1", + ActivityOptions.newBuilder().setHeartbeatTimeout(Duration.ofSeconds(11)).build()); + + private static final ActivityOptions workflowOps = + ActivityOptions.newBuilder() + .setHeartbeatTimeout(Duration.ofSeconds(20)) + .setStartToCloseTimeout(Duration.ofHours(20)) + .build(); + + private static final Map workflowOptionsMap = + Collections.singletonMap( + "Activity1", + ActivityOptions.newBuilder().setHeartbeatTimeout(Duration.ofSeconds(22)).build()); + + private static final ActivityOptions stubOptions = + ActivityOptions.newBuilder().setHeartbeatTimeout(Duration.ofSeconds(30)).build(); + + private static final Map stubOptionsMap = + Collections.singletonMap( + "Activity2", + ActivityOptions.newBuilder().setHeartbeatTimeout(Duration.ofSeconds(33)).build()); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + WorkflowImplementationOptions.newBuilder() + .setDefaultActivityOptions(implOptions) + .setActivityOptions(implOptionsMap) + .build(), + TestSetDefaultActivityOptionsWorkflowImpl.class) + .setActivityImplementations(new TestActivityImpl()) + .build(); + + @Test + public void testSetWorkflowImplementationOptions() { + TestWorkflowReturnMap workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + TestWorkflowReturnMap.class, + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); + Map> result = workflowStub.execute(); + + Map activity1Values = result.get("Activity1"); + Duration a1Heartbeat = activity1Values.get("HeartbeatTimeout"); + Duration a1StartToClose = activity1Values.get("StartToCloseTimeout"); + Duration a1ScheduleToClose = activity1Values.get("ScheduleToCloseTimeout"); + + assertEquals(stubOptions.getHeartbeatTimeout(), a1Heartbeat); + assertEquals(workflowOps.getStartToCloseTimeout(), a1StartToClose); + assertEquals(implOptions.getScheduleToCloseTimeout(), a1ScheduleToClose); + + Map activity2Values = result.get("Activity2"); + Duration a2Heartbeat = activity2Values.get("HeartbeatTimeout"); + Duration a2StartToClose = activity2Values.get("StartToCloseTimeout"); + Duration a2ScheduleToClose = activity2Values.get("ScheduleToCloseTimeout"); + + assertEquals(stubOptionsMap.get("Activity2").getHeartbeatTimeout(), a2Heartbeat); + assertEquals(workflowOps.getStartToCloseTimeout(), a2StartToClose); + assertEquals(implOptions.getScheduleToCloseTimeout(), a2ScheduleToClose); + } + + public static class TestSetDefaultActivityOptionsWorkflowImpl implements TestWorkflowReturnMap { + @Override + public Map> execute() { + Workflow.setDefaultActivityOptions(workflowOps); + Workflow.applyActivityOptions(workflowOptionsMap); + TestActivity activities = + Workflow.newActivityStub(TestActivity.class, stubOptions, stubOptionsMap); + + Map> result = new HashMap<>(); + result.put("Activity1", activities.activity1()); + result.put("Activity2", activities.activity2()); + return result; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/activity/DefaultActivityOptionsSetOnWorkflowTest.java b/temporal-sdk/src/test/java/io/temporal/activity/DefaultActivityOptionsSetOnWorkflowTest.java deleted file mode 100644 index b8b07cd63..000000000 --- a/temporal-sdk/src/test/java/io/temporal/activity/DefaultActivityOptionsSetOnWorkflowTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this material except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.temporal.activity; - -import static org.junit.Assert.assertEquals; - -import io.temporal.client.WorkflowOptions; -import io.temporal.testing.internal.SDKTestOptions; -import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.worker.WorkflowImplementationOptions; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.shared.TestActivities.TestActivity; -import io.temporal.workflow.shared.TestActivities.TestActivityImpl; -import io.temporal.workflow.shared.TestActivities.TestLocalActivity; -import io.temporal.workflow.shared.TestActivities.TestLocalActivityImpl; -import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnMap; -import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.junit.Rule; -import org.junit.Test; - -public class DefaultActivityOptionsSetOnWorkflowTest { - - private static final ActivityOptions workflowOps = ActivityTestOptions.newActivityOptions1(); - private static final ActivityOptions workerOps = ActivityTestOptions.newActivityOptions2(); - private static final ActivityOptions activity2Ops = - SDKTestOptions.newActivityOptions20sScheduleToClose(); - private static final Map activity2options = - Collections.singletonMap("Activity2", activity2Ops); - private static final Map defaultActivity2options = - Collections.singletonMap( - "Activity2", - ActivityOptions.newBuilder().setHeartbeatTimeout(Duration.ofSeconds(2)).build()); - - // local activity options - private static final LocalActivityOptions localActivityWorkflowOps = - ActivityTestOptions.newLocalActivityOptions1(); - private static final LocalActivityOptions localActivityWorkerOps = - ActivityTestOptions.newLocalActivityOptions2(); - private static final LocalActivityOptions localActivity2Ops = - SDKTestOptions.newLocalActivityOptions20sScheduleToClose(); - private static final Map localActivity2options = - Collections.singletonMap("LocalActivity2", localActivity2Ops); - private static final Map defaultLocalActivity2options = - Collections.singletonMap( - "LocalActivity2", - LocalActivityOptions.newBuilder().setDoNotIncludeArgumentsIntoMarker(false).build()); - - @Rule - public SDKTestWorkflowRule testWorkflowRule = - SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes( - WorkflowImplementationOptions.newBuilder() - .setDefaultActivityOptions(workerOps) - .setActivityOptions(activity2options) - .setDefaultLocalActivityOptions(localActivityWorkerOps) - .setLocalActivityOptions(localActivity2options) - .build(), - TestSetDefaultActivityOptionsWorkflowImpl.class) - .setActivityImplementations(new TestActivityImpl(), new TestLocalActivityImpl()) - .build(); - - @Test - public void testSetWorkflowImplementationOptions() { - TestWorkflowReturnMap workflowStub = - testWorkflowRule - .getWorkflowClient() - .newWorkflowStub( - TestWorkflowReturnMap.class, - WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); - Map> result = workflowStub.execute(); - - // Check that activity1 has default workerOptions options that were partially overwritten with - // workflow. - Map activity1Values = result.get("Activity1"); - assertEquals(workerOps.getHeartbeatTimeout(), activity1Values.get("HeartbeatTimeout")); - assertEquals( - workflowOps.getScheduleToCloseTimeout(), activity1Values.get("ScheduleToCloseTimeout")); - assertEquals(workflowOps.getStartToCloseTimeout(), activity1Values.get("StartToCloseTimeout")); - - // Check that default options for activity2 were overwritten. - Map activity2Values = result.get("Activity2"); - assertEquals( - defaultActivity2options.get("Activity2").getHeartbeatTimeout(), - activity2Values.get("HeartbeatTimeout")); - assertEquals( - activity2Ops.getScheduleToCloseTimeout(), activity2Values.get("ScheduleToCloseTimeout")); - assertEquals(workflowOps.getStartToCloseTimeout(), activity2Values.get("StartToCloseTimeout")); - } - - @Test - public void testSetLocalActivityWorkflowImplementationOptions() { - TestWorkflowReturnMap workflowStub = - testWorkflowRule - .getWorkflowClient() - .newWorkflowStub( - TestWorkflowReturnMap.class, - WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); - Map> result = workflowStub.execute(); - - // Check that local activity1 has default workerOptions options that were partially overwritten - // with workflow. - Map localActivity1Values = result.get("LocalActivity1"); - assertEquals( - localActivityWorkflowOps.getScheduleToCloseTimeout(), - localActivity1Values.get("ScheduleToCloseTimeout")); - assertEquals( - localActivityWorkflowOps.getStartToCloseTimeout(), - localActivity1Values.get("StartToCloseTimeout")); - // Check that default options for local activity2 were overwritten. - Map localActivity2Values = result.get("LocalActivity2"); - assertEquals( - localActivity2Ops.getScheduleToCloseTimeout(), - localActivity2Values.get("ScheduleToCloseTimeout")); - assertEquals( - localActivityWorkflowOps.getStartToCloseTimeout(), - localActivity2Values.get("StartToCloseTimeout")); - } - - public static class TestSetDefaultActivityOptionsWorkflowImpl implements TestWorkflowReturnMap { - @Override - public Map> execute() { - Workflow.setDefaultActivityOptions(workflowOps); - Workflow.applyActivityOptions(defaultActivity2options); - Workflow.setDefaultLocalActivityOptions(localActivityWorkflowOps); - Workflow.applyLocalActivityOptions(defaultLocalActivity2options); - Map> result = new HashMap<>(); - TestActivity activities = Workflow.newActivityStub(TestActivity.class); - TestLocalActivity localActivities = Workflow.newLocalActivityStub(TestLocalActivity.class); - result.put("Activity1", activities.activity1()); - result.put("Activity2", activities.activity2()); - result.put("LocalActivity1", localActivities.localActivity1()); - result.put("LocalActivity2", localActivities.localActivity2()); - return result; - } - } -} From be32eeb4f5764416a15f7888f84475a550dea464 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 18 Apr 2024 20:27:28 -0700 Subject: [PATCH 4/4] Fixed precedence of LocalActivityOptions --- .../internal/common/ActivityOptionUtils.java | 36 ------ .../common/MergedLocalActivityOptions.java | 100 ++++++++++++++++ .../sync/LocalActivityInvocationHandler.java | 21 +--- .../internal/sync/SyncWorkflowContext.java | 48 +++----- .../internal/sync/WorkflowInternal.java | 28 +---- .../activity/ActivityOptionsTest.java | 73 +++++++++++- .../LocalActivityOptionsPrecedenceTest.java | 107 ++++++++++++++++++ .../testing/TestActivityEnvironment.java | 14 +++ .../TestActivityEnvironmentInternal.java | 21 +++- 9 files changed, 335 insertions(+), 113 deletions(-) delete mode 100644 temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/common/MergedLocalActivityOptions.java create mode 100644 temporal-sdk/src/test/java/io/temporal/activity/LocalActivityOptionsPrecedenceTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java deleted file mode 100644 index 76f901e6a..000000000 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/ActivityOptionUtils.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this material except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.temporal.internal.common; - -import io.temporal.activity.LocalActivityOptions; -import java.util.Map; -import javax.annotation.Nonnull; - -public class ActivityOptionUtils { - - public static void mergePredefinedLocalActivityOptions( - @Nonnull Map mergeTo, - @Nonnull Map override) { - override.forEach( - (key, value) -> - mergeTo.merge(key, value, (o1, o2) -> o1.toBuilder().mergeActivityOptions(o2).build())); - } -} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/MergedLocalActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/internal/common/MergedLocalActivityOptions.java new file mode 100644 index 000000000..09963114c --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/MergedLocalActivityOptions.java @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.common; + +import io.temporal.activity.LocalActivityOptions; +import java.util.HashMap; +import java.util.Map; + +/** + * The chain of LocalActivityOptions and per type options maps. Used to merge options specified at + * the following layers: + * + *
+ *     * WorkflowImplementationOptions
+ *     * Workflow
+ *     * ActivityStub
+ * 
+ * + * Each next layer overrides specific options specified at the previous layer. + */ +public final class MergedLocalActivityOptions { + + /** Common options across all activity types. */ + private LocalActivityOptions defaultOptions; + + /** Per activity type options. These override defaultOptions. */ + private final Map optionsMap = new HashMap<>(); + + /** The options specified at the previous layer. They are overriden by this object. */ + private final MergedLocalActivityOptions overridden; + + public MergedLocalActivityOptions( + MergedLocalActivityOptions overridden, + LocalActivityOptions defaultOptions, + Map optionsMap) { + this.overridden = overridden; + this.defaultOptions = defaultOptions; + if (optionsMap != null) { + this.optionsMap.putAll(optionsMap); + } + } + + public MergedLocalActivityOptions(MergedLocalActivityOptions overridden) { + this.overridden = overridden; + defaultOptions = null; + } + + public void setDefaultOptions(LocalActivityOptions defaultOptions) { + this.defaultOptions = defaultOptions; + } + + public void applyOptionsMap(Map optionsMap) { + if (optionsMap != null) { + this.optionsMap.putAll(optionsMap); + } + } + + /** Get merged options for the given activityType. */ + public LocalActivityOptions getMergedOptions(String activityType) { + LocalActivityOptions overrideOptions = null; + if (overridden != null) { + overrideOptions = overridden.getMergedOptions(activityType); + } + return merge(overrideOptions, defaultOptions, optionsMap.get(activityType)); + } + + /** later options override the previous ones */ + private static LocalActivityOptions merge(LocalActivityOptions... options) { + if (options == null || options.length == 0) { + return null; + } + LocalActivityOptions result = options[0]; + for (int i = 1; i < options.length; i++) { + if (result == null) { + result = options[i]; + } else { + result = result.toBuilder().mergeActivityOptions(options[i]).build(); + } + } + return result; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java index 6d12850f8..3942ebfe7 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/LocalActivityInvocationHandler.java @@ -24,41 +24,36 @@ import io.temporal.activity.LocalActivityOptions; import io.temporal.common.MethodRetry; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; +import io.temporal.internal.common.MergedLocalActivityOptions; import io.temporal.workflow.ActivityStub; import io.temporal.workflow.Functions; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.Map; import java.util.function.Function; @VisibleForTesting public class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase { - private final LocalActivityOptions options; - private final Map activityMethodOptions; + private final MergedLocalActivityOptions options; private final WorkflowOutboundCallsInterceptor activityExecutor; private final Functions.Proc assertReadOnly; @VisibleForTesting public static InvocationHandler newInstance( Class activityInterface, - LocalActivityOptions options, - Map methodOptions, + MergedLocalActivityOptions options, WorkflowOutboundCallsInterceptor activityExecutor, Functions.Proc assertReadOnly) { return new LocalActivityInvocationHandler( - activityInterface, activityExecutor, options, methodOptions, assertReadOnly); + activityInterface, activityExecutor, options, assertReadOnly); } private LocalActivityInvocationHandler( Class activityInterface, WorkflowOutboundCallsInterceptor activityExecutor, - LocalActivityOptions options, - Map methodOptions, + MergedLocalActivityOptions options, Functions.Proc assertReadOnly) { super(activityInterface); this.options = options; - this.activityMethodOptions = (methodOptions == null) ? new HashMap<>() : methodOptions; this.activityExecutor = activityExecutor; this.assertReadOnly = assertReadOnly; } @@ -68,11 +63,7 @@ private LocalActivityInvocationHandler( public Function getActivityFunc( Method method, MethodRetry methodRetry, String activityName) { Function function; - LocalActivityOptions mergedOptions = - LocalActivityOptions.newBuilder(options) - .mergeActivityOptions(activityMethodOptions.get(activityName)) - .setMethodRetry(methodRetry) - .build(); + LocalActivityOptions mergedOptions = options.getMergedOptions(activityName); ActivityStub stub = LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor, assertReadOnly); function = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 155c99dd9..83225b8de 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -55,9 +55,9 @@ import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.failure.*; -import io.temporal.internal.common.ActivityOptionUtils; import io.temporal.internal.common.HeaderUtils; import io.temporal.internal.common.MergedActivityOptions; +import io.temporal.internal.common.MergedLocalActivityOptions; import io.temporal.internal.common.OptionsUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SdkFlag; @@ -121,9 +121,8 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private WorkflowOutboundCallsInterceptor headOutboundInterceptor; private final MergedActivityOptions activityOptions; + private final MergedLocalActivityOptions localActivityOptions; - private LocalActivityOptions defaultLocalActivityOptions = null; - private Map localActivityOptionsMap; private boolean readOnly = false; public SyncWorkflowContext( @@ -145,20 +144,22 @@ public SyncWorkflowContext( this.signalDispatcher = signalDispatcher; this.queryDispatcher = queryDispatcher; this.updateDispatcher = updateDispatcher; - MergedActivityOptions activityOptionsFromWorkflowImplementationOptions = null; + MergedActivityOptions activityOptions = null; + MergedLocalActivityOptions localActivityOptions = null; if (workflowImplementationOptions != null) { - activityOptionsFromWorkflowImplementationOptions = + activityOptions = new MergedActivityOptions( null, workflowImplementationOptions.getDefaultActivityOptions(), workflowImplementationOptions.getActivityOptions()); - this.defaultLocalActivityOptions = - workflowImplementationOptions.getDefaultLocalActivityOptions(); - this.localActivityOptionsMap = - new HashMap<>(workflowImplementationOptions.getLocalActivityOptions()); + localActivityOptions = + new MergedLocalActivityOptions( + null, + workflowImplementationOptions.getDefaultLocalActivityOptions(), + workflowImplementationOptions.getLocalActivityOptions()); } - this.activityOptions = - new MergedActivityOptions(activityOptionsFromWorkflowImplementationOptions); + this.activityOptions = new MergedActivityOptions(activityOptions); + this.localActivityOptions = new MergedLocalActivityOptions(localActivityOptions); this.workflowImplementationOptions = workflowImplementationOptions == null ? WorkflowImplementationOptions.getDefaultInstance() @@ -211,14 +212,8 @@ public MergedActivityOptions getActivityOptions() { return activityOptions; } - public LocalActivityOptions getDefaultLocalActivityOptions() { - return defaultLocalActivityOptions; - } - - public @Nonnull Map getLocalActivityOptions() { - return localActivityOptionsMap != null - ? Collections.unmodifiableMap(localActivityOptionsMap) - : Collections.emptyMap(); + public MergedLocalActivityOptions getLocalActivityOptions() { + return localActivityOptions; } public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) { @@ -230,22 +225,11 @@ public void applyActivityOptions(Map activityTypeToOpti } public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) { - this.defaultLocalActivityOptions = - (this.defaultLocalActivityOptions == null) - ? defaultLocalActivityOptions - : this.defaultLocalActivityOptions.toBuilder() - .mergeActivityOptions(defaultLocalActivityOptions) - .build(); + localActivityOptions.setDefaultOptions(defaultLocalActivityOptions); } public void applyLocalActivityOptions(Map activityTypeToOption) { - Objects.requireNonNull(activityTypeToOption); - if (this.localActivityOptionsMap == null) { - this.localActivityOptionsMap = new HashMap<>(activityTypeToOption); - return; - } - ActivityOptionUtils.mergePredefinedLocalActivityOptions( - localActivityOptionsMap, activityTypeToOption); + localActivityOptions.applyOptionsMap(activityTypeToOption); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index e5695e08b..0898d303a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -24,7 +24,6 @@ import static io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal; import com.google.common.base.Joiner; -import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityOptions; @@ -41,8 +40,8 @@ import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata; import io.temporal.common.metadata.POJOWorkflowMethodMetadata; import io.temporal.internal.WorkflowThreadMarker; -import io.temporal.internal.common.ActivityOptionUtils; import io.temporal.internal.common.MergedActivityOptions; +import io.temporal.internal.common.MergedLocalActivityOptions; import io.temporal.internal.common.NonIdempotentHandle; import io.temporal.internal.common.SearchAttributesUtil; import io.temporal.internal.logging.ReplayAwareLogger; @@ -328,31 +327,14 @@ public static T newLocalActivityStub( // Merge the activity options we may have received from the workflow with the options we may // have received in WorkflowImplementationOptions. SyncWorkflowContext context = getRootWorkflowContext(); - options = (options == null) ? context.getDefaultLocalActivityOptions() : options; - - Map mergedLocalActivityOptionsMap; - @Nonnull - Map predefinedLocalActivityOptions = - context.getLocalActivityOptions(); - if (activityMethodOptions != null - && !activityMethodOptions.isEmpty() - && predefinedLocalActivityOptions.isEmpty()) { - // we need to merge only in this case - mergedLocalActivityOptionsMap = new HashMap<>(predefinedLocalActivityOptions); - ActivityOptionUtils.mergePredefinedLocalActivityOptions( - mergedLocalActivityOptionsMap, activityMethodOptions); - } else { - mergedLocalActivityOptionsMap = - MoreObjects.firstNonNull( - activityMethodOptions, - MoreObjects.firstNonNull(predefinedLocalActivityOptions, Collections.emptyMap())); - } + MergedLocalActivityOptions activityOptions = + new MergedLocalActivityOptions( + context.getLocalActivityOptions(), options, activityMethodOptions); InvocationHandler invocationHandler = LocalActivityInvocationHandler.newInstance( activityInterface, - options, - mergedLocalActivityOptionsMap, + activityOptions, WorkflowInternal.getWorkflowOutboundInterceptor(), () -> assertNotReadOnly("schedule local activity")); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); diff --git a/temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsTest.java index 7bd9d8760..e0fc5d2e3 100644 --- a/temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/activity/ActivityOptionsTest.java @@ -27,11 +27,13 @@ import io.temporal.common.MethodRetry; import io.temporal.common.RetryOptions; +import io.temporal.internal.common.MergedActivityOptions; import io.temporal.testing.TestActivityEnvironment; import io.temporal.workflow.shared.TestActivities.TestActivity; import io.temporal.workflow.shared.TestActivities.TestActivityImpl; import java.lang.reflect.Method; import java.time.Duration; +import java.util.HashMap; import java.util.Map; import org.junit.*; import org.junit.rules.Timeout; @@ -85,6 +87,76 @@ public void testActivityOptionsMerge() { Assert.assertEquals(methodOps1, merged); } + @Test + public void testMergedActivityOptions() { + MergedActivityOptions options0 = new MergedActivityOptions(null, null, null); + + ActivityOptions a1 = + ActivityOptions.newBuilder() + .setScheduleToStartTimeout(Duration.ofMillis(10)) + .setScheduleToCloseTimeout(Duration.ofDays(10)) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(10).build()) + .setCancellationType(ActivityCancellationType.TRY_CANCEL) + .build(); + + Map map1 = new HashMap<>(); + map1.put( + "Activity1", + ActivityOptions.newBuilder() + .setScheduleToStartTimeout(Duration.ofMillis(11)) + .setScheduleToCloseTimeout(Duration.ofDays(11)) + .setStartToCloseTimeout(Duration.ofSeconds(11)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(11).build()) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .build()); + map1.put( + "Activity2", + ActivityOptions.newBuilder() + .setScheduleToStartTimeout(Duration.ofMillis(12)) + .setStartToCloseTimeout(Duration.ofSeconds(12)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(12).build()) + .setCancellationType(ActivityCancellationType.ABANDON) + .build()); + + MergedActivityOptions options1 = new MergedActivityOptions(options0, a1, map1); + + ActivityOptions a2 = + ActivityOptions.newBuilder() + .setScheduleToStartTimeout(Duration.ofMillis(20)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(20).build()) + .setCancellationType(ActivityCancellationType.TRY_CANCEL) + .build(); + + MergedActivityOptions options2 = new MergedActivityOptions(options1, a2, null); + + Map map3 = new HashMap<>(); + map3.put( + "Activity1", + ActivityOptions.newBuilder() + .setScheduleToStartTimeout(Duration.ofMillis(31)) + .setScheduleToCloseTimeout(Duration.ofDays(31)) + .setStartToCloseTimeout(Duration.ofSeconds(31)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(31).build()) + .build()); + + MergedActivityOptions options3 = new MergedActivityOptions(options2, null, map3); + + // From a1 + Assert.assertEquals( + Duration.ofDays(10), options3.getMergedOptions("Activity2").getScheduleToCloseTimeout()); + // From map1 + Assert.assertEquals( + Duration.ofSeconds(12), options3.getMergedOptions("Activity2").getStartToCloseTimeout()); + // From a2 + assertEquals( + ActivityCancellationType.TRY_CANCEL, + options3.getMergedOptions("Activity2").getCancellationType()); + // From map3 + assertEquals( + Duration.ofMillis(31), options3.getMergedOptions("Activity1").getScheduleToStartTimeout()); + } + @Test public void testActivityOptionsDefaultInstance() { testEnv.registerActivitiesImplementations(new TestActivityImpl()); @@ -101,7 +173,6 @@ public void testActivityOptionsDefaultInstance() { @Test public void testOnlyAnnotationsPresent() throws NoSuchMethodException { Method method = ActivityOptionsTest.class.getMethod("activityAndRetryOptions"); - ActivityMethod a = method.getAnnotation(ActivityMethod.class); MethodRetry r = method.getAnnotation(MethodRetry.class); ActivityOptions o = ActivityOptions.newBuilder().build(); ActivityOptions merged = ActivityOptions.newBuilder(o).mergeMethodRetry(r).build(); diff --git a/temporal-sdk/src/test/java/io/temporal/activity/LocalActivityOptionsPrecedenceTest.java b/temporal-sdk/src/test/java/io/temporal/activity/LocalActivityOptionsPrecedenceTest.java new file mode 100644 index 000000000..57c6cedc8 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/activity/LocalActivityOptionsPrecedenceTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.activity; + +import static org.junit.Assert.assertEquals; + +import io.temporal.client.WorkflowOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities.TestActivity; +import io.temporal.workflow.shared.TestActivities.TestActivityImpl; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnMap; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.junit.Rule; +import org.junit.Test; + +public class LocalActivityOptionsPrecedenceTest { + + private static final LocalActivityOptions implOptions = + LocalActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofHours(10)) + .setScheduleToCloseTimeout(Duration.ofDays(10)) + .build(); + + private static final Map workflowOptionsMap = + Collections.singletonMap( + "Activity1", + LocalActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofHours(20)).build()); + + private static final Map stubOptionsMap = + Collections.singletonMap( + "Activity2", + LocalActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofDays(30)).build()); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + WorkflowImplementationOptions.newBuilder() + .setDefaultLocalActivityOptions(implOptions) + .build(), + TestSetDefaultLocalActivityOptionsWorkflowImpl.class) + .setActivityImplementations(new TestActivityImpl()) + .build(); + + @Test + public void testSetWorkflowImplementationOptions() { + TestWorkflowReturnMap workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + TestWorkflowReturnMap.class, + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); + Map> result = workflowStub.execute(); + + Map activity1Values = result.get("Activity1"); + Duration a1StartToClose = activity1Values.get("StartToCloseTimeout"); + Duration a1ScheduleToClose = activity1Values.get("ScheduleToCloseTimeout"); + + assertEquals(workflowOptionsMap.get("Activity1").getStartToCloseTimeout(), a1StartToClose); + assertEquals(implOptions.getScheduleToCloseTimeout(), a1ScheduleToClose); + + Map activity2Values = result.get("Activity2"); + Duration a2StartToClose = activity2Values.get("StartToCloseTimeout"); + Duration a2ScheduleToClose = activity2Values.get("ScheduleToCloseTimeout"); + + assertEquals(implOptions.getStartToCloseTimeout(), a2StartToClose); + assertEquals(stubOptionsMap.get("Activity2").getScheduleToCloseTimeout(), a2ScheduleToClose); + } + + public static class TestSetDefaultLocalActivityOptionsWorkflowImpl + implements TestWorkflowReturnMap { + @Override + public Map> execute() { + Workflow.applyLocalActivityOptions(workflowOptionsMap); + TestActivity activities = + Workflow.newLocalActivityStub(TestActivity.class, null, stubOptionsMap); + + Map> result = new HashMap<>(); + result.put("Activity1", activities.activity1()); + result.put("Activity2", activities.activity2()); + return result; + } + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java index 96065966e..55c0be350 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java @@ -99,6 +99,20 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) { */ T newActivityStub(Class activityInterface, ActivityOptions options); + /** + * Creates a stub that can be used to invoke activities registered through {@link + * #registerActivitiesImplementations(Object...)}. + * + * @param activityInterface interface type implemented by activities + * @param options options that together with the properties of {@link + * io.temporal.activity.ActivityMethod} specify the activity invocation parameters + * @param activityMethodOptions activity method-specific invocation parameters + */ + T newActivityStub( + Class activityInterface, + ActivityOptions options, + Map activityMethodOptions); + /** * Creates a stub that can be used to invoke activities registered through {@link * #registerActivitiesImplementations(Object...)}. diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index e421a4e75..bd6e87926 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -49,6 +49,7 @@ import io.temporal.internal.activity.ActivityExecutionContextFactoryImpl; import io.temporal.internal.activity.ActivityTaskHandlerImpl; import io.temporal.internal.common.MergedActivityOptions; +import io.temporal.internal.common.MergedLocalActivityOptions; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.sync.*; import io.temporal.internal.testservice.InProcessGRPCServer; @@ -206,7 +207,16 @@ public T newActivityStub(Class activityInterface) { */ @Override public T newActivityStub(Class activityInterface, ActivityOptions options) { - MergedActivityOptions activityOptions = new MergedActivityOptions(null, options, null); + return newActivityStub(activityInterface, options, null); + } + + @Override + public T newActivityStub( + Class activityInterface, + ActivityOptions options, + Map activityMethodOptions) { + MergedActivityOptions activityOptions = + new MergedActivityOptions(null, options, activityMethodOptions); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( @@ -228,13 +238,12 @@ public T newLocalActivityStub( Class activityInterface, LocalActivityOptions options, Map activityMethodOptions) { + MergedLocalActivityOptions activityOptions = + new MergedLocalActivityOptions(null, options, activityMethodOptions); + InvocationHandler invocationHandler = LocalActivityInvocationHandler.newInstance( - activityInterface, - options, - activityMethodOptions, - new TestActivityExecutor(), - () -> {}); + activityInterface, activityOptions, new TestActivityExecutor(), () -> {}); invocationHandler = new DeterministicRunnerWrapper(invocationHandler, deterministicRunnerExecutor::submit); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);