Skip to content

Commit 6c961a0

Browse files
Make CancellationScopeImpl more deterministic (#2512)
Make CancellationScopeImpl more deterministic
1 parent e7a7f0c commit 6c961a0

File tree

7 files changed

+354
-13
lines changed

7 files changed

+354
-13
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/SdkFlag.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ public enum SdkFlag {
1414
* Changes behavior of GetVersion to never yield.
1515
*/
1616
SKIP_YIELD_ON_VERSION(2),
17+
/*
18+
* Changes behavior of CancellationScope to cancel children in a deterministic order.
19+
*/
20+
DETERMINISTIC_CANCELLATION_SCOPE_ORDER(3),
1721
UNKNOWN(Integer.MAX_VALUE);
1822

1923
private final int value;

temporal-sdk/src/main/java/io/temporal/internal/sync/CancellationScopeImpl.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package io.temporal.internal.sync;
22

33
import io.temporal.workflow.*;
4-
import java.util.ArrayDeque;
5-
import java.util.Deque;
6-
import java.util.HashSet;
7-
import java.util.Set;
4+
import java.util.*;
85

96
class CancellationScopeImpl implements CancellationScope {
107

@@ -36,7 +33,9 @@ private static void popCurrent(CancellationScopeImpl expected) {
3633

3734
private final Runnable runnable;
3835
private CancellationScopeImpl parent;
39-
private final Set<CancellationScopeImpl> children = new HashSet<>();
36+
// We use a LinkedHashSet because we will iterate through the children, so we need to keep a
37+
// deterministic order.
38+
private final Set<CancellationScopeImpl> children;
4039

4140
/**
4241
* When disconnected scope has no parent and thus doesn't receive cancellation requests from it.
@@ -45,20 +44,37 @@ private static void popCurrent(CancellationScopeImpl expected) {
4544

4645
private String reason;
4746

48-
CancellationScopeImpl(boolean ignoreParentCancellation, Runnable runnable) {
49-
this(ignoreParentCancellation, runnable, current());
47+
CancellationScopeImpl(
48+
boolean ignoreParentCancellation, boolean deterministicOrder, Runnable runnable) {
49+
this(ignoreParentCancellation, deterministicOrder, runnable, current());
5050
}
5151

52-
CancellationScopeImpl(boolean detached, Runnable runnable, CancellationScopeImpl parent) {
52+
CancellationScopeImpl(
53+
boolean detached,
54+
boolean deterministicOrder,
55+
Runnable runnable,
56+
CancellationScopeImpl parent) {
5357
this.detached = detached;
5458
this.runnable = runnable;
59+
if (deterministicOrder) {
60+
this.children = new LinkedHashSet<>();
61+
} else {
62+
this.children = new HashSet<>();
63+
}
5564
setParent(parent);
5665
}
5766

5867
public CancellationScopeImpl(
59-
boolean ignoreParentCancellation, Functions.Proc1<CancellationScope> proc) {
68+
boolean ignoreParentCancellation,
69+
boolean deterministicOrder,
70+
Functions.Proc1<CancellationScope> proc) {
6071
this.detached = ignoreParentCancellation;
6172
this.runnable = () -> proc.apply(this);
73+
if (deterministicOrder) {
74+
this.children = new LinkedHashSet<>();
75+
} else {
76+
this.children = new HashSet<>();
77+
}
6278
setParent(current());
6379
}
6480

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.common.primitives.Ints;
55
import io.temporal.common.context.ContextPropagator;
66
import io.temporal.internal.WorkflowThreadMarker;
7+
import io.temporal.internal.common.SdkFlag;
78
import io.temporal.internal.context.ContextThreadLocal;
89
import io.temporal.internal.worker.WorkflowExecutorCache;
910
import io.temporal.serviceclient.CheckedExceptionWrapper;
@@ -157,7 +158,12 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
157158
// a bad practice
158159
this.workflowContext.setRunner(this);
159160
this.cache = cache;
160-
this.runnerCancellationScope = new CancellationScopeImpl(true, null, null);
161+
boolean deterministicCancellationScopeOrder =
162+
workflowContext
163+
.getReplayContext()
164+
.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
165+
this.runnerCancellationScope =
166+
new CancellationScopeImpl(true, deterministicCancellationScopeOrder, null, null);
161167
this.rootRunnable = root;
162168
}
163169

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.temporal.internal.WorkflowThreadMarker;
2525
import io.temporal.internal.common.ActivityOptionUtils;
2626
import io.temporal.internal.common.NonIdempotentHandle;
27+
import io.temporal.internal.common.SdkFlag;
2728
import io.temporal.internal.common.SearchAttributesUtil;
2829
import io.temporal.internal.logging.ReplayAwareLogger;
2930
import io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest;
@@ -546,12 +547,20 @@ public static Promise<Object> promiseAnyOf(Promise<?>... promises) {
546547
}
547548

548549
public static CancellationScope newCancellationScope(boolean detached, Runnable runnable) {
549-
return new CancellationScopeImpl(detached, runnable);
550+
boolean deterministicCancellationScopeOrder =
551+
getRootWorkflowContext()
552+
.getReplayContext()
553+
.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
554+
return new CancellationScopeImpl(detached, deterministicCancellationScopeOrder, runnable);
550555
}
551556

552557
public static CancellationScope newCancellationScope(
553558
boolean detached, Functions.Proc1<CancellationScope> proc) {
554-
return new CancellationScopeImpl(detached, proc);
559+
boolean deterministicCancellationScopeOrder =
560+
getRootWorkflowContext()
561+
.getReplayContext()
562+
.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
563+
return new CancellationScopeImpl(detached, deterministicCancellationScopeOrder, proc);
555564
}
556565

557566
public static CancellationScopeImpl currentCancellationScope() {

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.temporal.common.context.ContextPropagator;
55
import io.temporal.failure.CanceledFailure;
66
import io.temporal.internal.common.NonIdempotentHandle;
7+
import io.temporal.internal.common.SdkFlag;
78
import io.temporal.internal.context.ContextThreadLocal;
89
import io.temporal.internal.logging.LoggerTag;
910
import io.temporal.internal.replay.ReplayWorkflowContext;
@@ -55,7 +56,11 @@ class RunnableWrapper implements Runnable {
5556
this.threadContext = threadContext;
5657
this.replayWorkflowContext = replayWorkflowContext;
5758
this.name = name;
58-
this.cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
59+
boolean deterministicCancellationScopeOrder =
60+
replayWorkflowContext.checkSdkFlag(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER);
61+
this.cancellationScope =
62+
new CancellationScopeImpl(
63+
detached, deterministicCancellationScopeOrder, runnable, parent);
5964
Preconditions.checkState(
6065
context.getStatus() == Status.CREATED, "threadContext not in CREATED state");
6166
this.contextPropagators = contextPropagators;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package io.temporal.workflow.cancellationTests;
2+
3+
import io.temporal.activity.ActivityInterface;
4+
import io.temporal.activity.ActivityOptions;
5+
import io.temporal.client.WorkflowClient;
6+
import io.temporal.client.WorkflowStub;
7+
import io.temporal.common.WorkflowExecutionHistory;
8+
import io.temporal.internal.common.SdkFlag;
9+
import io.temporal.internal.statemachines.WorkflowStateMachines;
10+
import io.temporal.testing.WorkflowReplayer;
11+
import io.temporal.testing.internal.SDKTestWorkflowRule;
12+
import io.temporal.workflow.*;
13+
import java.time.Duration;
14+
import java.util.Arrays;
15+
import java.util.Collections;
16+
import org.junit.Before;
17+
import org.junit.Rule;
18+
import org.junit.Test;
19+
20+
public class WorkflowCancellationScopeDeterminism {
21+
@Rule
22+
public SDKTestWorkflowRule testWorkflowRule =
23+
SDKTestWorkflowRule.newBuilder()
24+
.setWorkflowTypes(TestWorkflowImpl.class)
25+
.setActivityImplementations(new TestActivityImpl())
26+
.build();
27+
28+
@Before
29+
public void setUp() {
30+
WorkflowStateMachines.initialFlags =
31+
Collections.unmodifiableList(Arrays.asList(SdkFlag.DETERMINISTIC_CANCELLATION_SCOPE_ORDER));
32+
}
33+
34+
@Test(timeout = 60000)
35+
public void replayCanceledWorkflow() throws Exception {
36+
for (int i = 0; i < 100; i++) {
37+
TestWorkflow testWorkflow = testWorkflowRule.newWorkflowStub(TestWorkflow.class);
38+
39+
WorkflowClient.start(testWorkflow::start);
40+
41+
WorkflowStub stub = WorkflowStub.fromTyped(testWorkflow);
42+
stub.cancel();
43+
try {
44+
stub.getResult(Void.class);
45+
} catch (Exception e) {
46+
// ignore; just blocking to make sure workflow is actually finished
47+
}
48+
49+
WorkflowExecutionHistory history =
50+
testWorkflowRule
51+
.getWorkflowClient()
52+
.fetchHistory(stub.getExecution().getWorkflowId(), stub.getExecution().getRunId());
53+
WorkflowReplayer.replayWorkflowExecution(history, testWorkflowRule.getWorker());
54+
}
55+
}
56+
57+
@Test
58+
public void replayTest() throws Exception {
59+
WorkflowReplayer.replayWorkflowExecutionFromResource(
60+
"cancellationScopeDeterminism.json", TestWorkflowImpl.class);
61+
}
62+
63+
@WorkflowInterface
64+
public interface TestWorkflow {
65+
@WorkflowMethod
66+
void start();
67+
}
68+
69+
@ActivityInterface
70+
public interface TestActivity {
71+
void doActivity();
72+
}
73+
74+
public static class TestActivityImpl implements TestActivity {
75+
@Override
76+
public void doActivity() {
77+
try {
78+
Thread.sleep(5000);
79+
} catch (InterruptedException e) {
80+
throw new RuntimeException(e);
81+
}
82+
}
83+
}
84+
85+
public static class TestWorkflowImpl implements TestWorkflow {
86+
87+
TestActivity activity =
88+
Workflow.newActivityStub(
89+
TestActivity.class,
90+
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(60)).build());
91+
92+
@Override
93+
public void start() {
94+
CancellationScope scope = Workflow.newCancellationScope(() -> activity.doActivity());
95+
96+
Async.procedure(
97+
() -> {
98+
Workflow.sleep(Duration.ofMinutes(5));
99+
});
100+
101+
scope.run();
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)