diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 7cb9616442..2c4435733d 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -598,6 +598,8 @@ R mutableSideEffect( int getVersion(String changeId, int minSupported, int maxSupported); + int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported); + void continueAsNew(ContinueAsNewInput input); void registerQuery(RegisterQueryInput input); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index e2d4223556..771dd21456 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -113,6 +113,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { return next.getVersion(changeId, minSupported, maxSupported); } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + return next.getVersion(seriesId, iterationId, minSupported, maxSupported); + } + @Override public void continueAsNew(ContinueAsNewInput input) { next.continueAsNew(input); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 3ed169b5bc..975570495f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -125,6 +125,8 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private Map runningUpdateHandlers = new HashMap<>(); // Map of all running signal handlers. Key is the event Id of the signal event. private Map runningSignalHandlers = new HashMap<>(); + // Current versions for the getVersion call that supports iterationId. + private final Map currentVersions = new HashMap<>(); public SyncWorkflowContext( @Nonnull String namespace, @@ -991,6 +993,46 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { } } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + Integer currentVersion = currentVersions.get(seriesId); + // When replaying check if there is a marker (by calling getVersion) for each iteration. + if (isReplaying()) { + int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported); + if (currentVersion != null) { + if (iVersion < currentVersion) { + throw new IllegalArgumentException( + "getVersion for changeId '" + + seriesId + + "/" + + iterationId + + "' returned " + + iVersion + + " which is smaller than previously found version of " + + currentVersion); + } + if (iVersion != DEFAULT_VERSION) { + currentVersions.put(seriesId, iVersion); + return iVersion; + } + return currentVersion; + } + return iVersion; + } else { + // When not replaying, only insert a marker (by calling getVersion) if the maxSupported is + // larger than the already recorded one. + if (currentVersion == null || (currentVersion != null && maxSupported > currentVersion)) { + int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported); + if (iVersion != maxSupported) { + throw new RuntimeException("getVersion returned wrong version: " + iVersion); + } + currentVersions.put(seriesId, iVersion); + return iVersion; + } + return currentVersion; + } + } + @Override public void registerQuery(RegisterQueryInput request) { queryDispatcher.registerQueryHandlers(request); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 6c4d55d29e..c367f78059 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -545,6 +545,13 @@ public static int getVersion(String changeId, int minSupported, int maxSupported return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported); } + public static int getVersion( + String seriesId, String iterationId, int minSupported, int maxSupported) { + assertNotReadOnly("get version"); + return getWorkflowOutboundInterceptor() + .getVersion(seriesId, iterationId, minSupported, maxSupported); + } + public static Promise promiseAllOf(Iterable> promises) { return new AllOfPromise(promises); } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index fa30673b57..6ada589823 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -1000,6 +1000,92 @@ public static int getVersion(String changeId, int minSupported, int maxSupported return WorkflowInternal.getVersion(changeId, minSupported, maxSupported); } + /** + * Used to perform workflow safe code changes in code that is called repeatedly like body of a + * loop or a signal handler. + * + *

Consider the following example: + * + *

+   * for (int i=0; i<100; i++) {
+   *     if (getVersion("fix1", DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else {
+   *         // NEW CODE
+   *     }
+   * }
+   * 
+ * + * If the change is introduced after the loop starts executing, all iterations are going to use + * the default version, even if most of them happen after the change. This happens because the * + * getVersion call returns the same version for all calls that share a changeId. The same issue * + * arises when changing code in callbacks like signal or update handlers. Frequently, there is a + * need for a new version used for newer iterations (or signal handler invocations). + * + *

The following solution supports updating the version of each iteration separately, as it + * uses a different changeId for each iteration: + * + *

+   * for (int i=0; i<100; i++) {
+   *     if (getVersion("fix1-" + i, DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else {
+   *         // NEW CODE
+   *     }
+   * }
+   * 
+ * + * The drawback is a marker event as well as a search attribute update for each iteration. So, it + * is not practical for the large number of iterations. + * + *

This method provides an efficient alternative to the solution that uses a different changeId + * for each iteration. It only inserts a marker when a version changes. + * + *

Here is how it could be used: + * + *

+   * for (int i=0; i<100; i++) {
+   *     if (getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else {
+   *         // NEW CODE
+   *     }
+   * }
+   * 
+ * + * Adding more branches later is OK, assuming that the series stays the same. During replay, the + * iterationId should return the same values as in the original execution. + * + *
+   * for (int i=0; i<100; i++) {
+   *     int v = getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 2);
+   *     if (v == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else if (v == 1) {
+   *         // CODE FOR THE FIRST CHANGE
+   *     } else {
+   *         // CODE FOR THE LAST CHANGE
+   *     }
+   * }
+   * 
+ * + * All calls with the same seriesId and iterationId argument return the same value. But only if + * they follow each other. The moment a call with a different iteration is made, the version + * changes. + * + * @param seriesId identifier of a series of changes. + * @param iterationId identifier of each iteration over the changed code. + * @param minSupported min version supported for the change + * @param maxSupported max version supported for the change, this version is used as the current + * one during the original execution. + * @return {@code maxSupported} when is originally executed. Original version recorded in the + * history on replays. + */ + public static int getVersion( + String seriesId, String iterationId, int minSupported, int maxSupported) { + return WorkflowInternal.getVersion(seriesId, iterationId, minSupported, maxSupported); + } + /** * Get scope for reporting business metrics in workflow logic. This should be used instead of * creating new metrics scopes as it is able to dedupe metrics during replay. diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java new file mode 100644 index 0000000000..1efa9da5a0 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.versionTests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowStub; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl; +import io.temporal.workflow.shared.TestActivities.VariousTestActivities; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import io.temporal.workflow.unsafe.WorkflowUnsafe; +import java.time.Duration; +import java.util.List; +import org.junit.Rule; +import org.junit.Test; + +public class GetVersionSeriesTest { + + private static boolean hasReplayed; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestGetVersionSeriesWorkflowImpl.class) + .setActivityImplementations(new TestActivitiesImpl()) + // Forcing a replay. Full history arrived from a normal queue causing a replay. + .setWorkerOptions( + WorkerOptions.newBuilder() + .setStickyQueueScheduleToStartTimeout(Duration.ZERO) + .build()) + .build(); + + @Test + public void testGetVersion() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + assertTrue(hasReplayed); + assertEquals("foo", result); + WorkflowStub untyped = WorkflowStub.fromTyped(workflowStub); + List markers = + testWorkflowRule.getHistoryEvents( + untyped.getExecution().getWorkflowId(), EventType.EVENT_TYPE_MARKER_RECORDED); + assertEquals(10, markers.size()); + } + + public static class TestGetVersionSeriesWorkflowImpl implements TestWorkflow1 { + + @Override + public String execute(String taskQueue) { + VariousTestActivities testActivities = + Workflow.newActivityStub( + VariousTestActivities.class, + SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue)); + + for (int i = 0; i < 20; i++) { + // Test adding a version check in non-replay code. + int maxSupported = i / 2 + 1; + int version = + Workflow.getVersion( + "s1", String.valueOf(maxSupported), Workflow.DEFAULT_VERSION, maxSupported); + assertEquals(version, maxSupported); + testActivities.activity2("activity2", 2); + } + + // Test adding a version check in replay code. + if (WorkflowUnsafe.isReplaying()) { + hasReplayed = true; + } + // Force replay + Workflow.sleep(1000); + return "foo"; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java index e59464faa4..97e160ee1c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java @@ -57,9 +57,6 @@ public class GetVersionTest { public void testGetVersion() { TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); - assertTrue(hasReplayed); - assertEquals("activity22activity1activity1activity1", result); testWorkflowRule .getInterceptor(TracingWorkerInterceptor.class) .setExpected( @@ -77,6 +74,10 @@ public void testGetVersion() { "getVersion", "executeActivity customActivity1", "activity customActivity1"); + + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + assertTrue(hasReplayed); + assertEquals("activity22activity1activity1activity1", result); } @Test diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index a8adb759d5..648af6ca7a 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -436,6 +436,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { throw new UnsupportedOperationException("not implemented"); } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void continueAsNew(ContinueAsNewInput input) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 6282b7d2a1..4449395989 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -282,6 +282,14 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { return next.getVersion(changeId, minSupported, maxSupported); } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("getVersionSeries"); + } + return next.getVersion(seriesId, iterationId, minSupported, maxSupported); + } + @Override public void continueAsNew(ContinueAsNewInput input) { if (!WorkflowUnsafe.isReplaying()) {