Skip to content

Commit b182d78

Browse files
Ignore history events with worker_may_ignore: true. (#2000)
## What was changed * Unknown event types with `worker_may_ignore: true` are skipped * All other unknown event types continue to throw * Added new replay tests verifying that `worker_may_ignore` is handled correctly ## Why? The new `worker_may_ignore` flag is intended to mark events that can be handled as no-ops if the SDK doesn't know the event type.
1 parent ad1dabc commit b182d78

File tree

5 files changed

+564
-51
lines changed

5 files changed

+564
-51
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
import com.google.common.base.Strings;
3131
import com.google.protobuf.Any;
3232
import io.temporal.api.command.v1.*;
33-
import io.temporal.api.common.v1.Payloads;
34-
import io.temporal.api.common.v1.SearchAttributes;
35-
import io.temporal.api.common.v1.WorkflowExecution;
33+
import io.temporal.api.common.v1.*;
3634
import io.temporal.api.enums.v1.EventType;
3735
import io.temporal.api.failure.v1.Failure;
3836
import io.temporal.api.history.v1.*;
@@ -60,7 +58,8 @@ enum HandleEventStatus {
6058

6159
/** Initial set of SDK flags that will be set on all new workflow executions. */
6260
private static final List<SdkFlag> initialFlags =
63-
Collections.unmodifiableList(Arrays.asList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
61+
Collections.unmodifiableList(
62+
Collections.singletonList(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION));
6463

6564
/**
6665
* EventId of the WorkflowTaskStarted event of the Workflow Task that was picked up by a worker
@@ -159,7 +158,7 @@ enum HandleEventStatus {
159158

160159
private final WFTBuffer wftBuffer = new WFTBuffer();
161160

162-
private List<Message> messages = new ArrayList<Message>();
161+
private List<Message> messages = new ArrayList<>();
163162

164163
private final SdkFlags flags;
165164

@@ -358,8 +357,8 @@ private void handleSingleEventLookahead(HistoryEvent event) {
358357
}
359358

360359
private List<Message> takeLTE(long eventId) {
361-
List<Message> m = new ArrayList<Message>();
362-
List<Message> remainingMessages = new ArrayList<Message>();
360+
List<Message> m = new ArrayList<>();
361+
List<Message> remainingMessages = new ArrayList<>();
363362
for (Message msg : this.messages) {
364363
if (msg.getEventId() > eventId) {
365364
remainingMessages.add(msg);
@@ -430,12 +429,16 @@ private void handleSingleEvent(HistoryEvent event, boolean hasNextEvent) {
430429
replaying = false;
431430
}
432431

433-
Long initialCommandEventId = getInitialCommandEventId(event);
434-
EntityStateMachine c = stateMachines.get(initialCommandEventId);
432+
final OptionalLong initialCommandEventId = getInitialCommandEventId(event);
433+
if (!initialCommandEventId.isPresent()) {
434+
return;
435+
}
436+
437+
EntityStateMachine c = stateMachines.get(initialCommandEventId.getAsLong());
435438
if (c != null) {
436439
c.handleEvent(event, hasNextEvent);
437440
if (c.isFinalState()) {
438-
stateMachines.remove(initialCommandEventId);
441+
stateMachines.remove(initialCommandEventId.getAsLong());
439442
}
440443
} else {
441444
handleNonStatefulEvent(event, hasNextEvent);
@@ -585,9 +588,7 @@ public void sendMessage(Message message) {
585588

586589
public List<Message> takeMessages() {
587590
List<Message> result = new ArrayList<>(messageOutbox.size());
588-
for (Message message : messageOutbox) {
589-
result.add(message);
590-
}
591+
result.addAll(messageOutbox);
591592
messageOutbox.clear();
592593
return result;
593594
}
@@ -960,10 +961,9 @@ public boolean getVersion(
960961
VersionStateMachine stateMachine =
961962
versions.computeIfAbsent(
962963
changeId,
963-
(idKey) -> {
964-
return VersionStateMachine.newInstance(
965-
changeId, this::isReplaying, commandSink, stateMachineSink);
966-
});
964+
(idKey) ->
965+
VersionStateMachine.newInstance(
966+
changeId, this::isReplaying, commandSink, stateMachineSink));
967967
return stateMachine.getVersion(
968968
minSupported,
969969
maxSupported,
@@ -1194,60 +1194,85 @@ public void updateRunId(String currentRunId) {
11941194
}
11951195
}
11961196

1197-
private long getInitialCommandEventId(HistoryEvent event) {
1197+
/**
1198+
* Extracts the eventId of the "initial command" for the given event.
1199+
*
1200+
* <p>The "initial command" is the event which started a group of related events:
1201+
* ActivityTaskScheduled, TimerStarted, and so on; for events which are not part of a group, the
1202+
* event's own eventId is returned. If the event has an unknown type but is marked as ignorable,
1203+
* then {@link OptionalLong#empty()} is returned instead.
1204+
*
1205+
* @return the eventId of the initial command, or {@link OptionalLong#empty()}
1206+
*/
1207+
private OptionalLong getInitialCommandEventId(HistoryEvent event) {
11981208
switch (event.getEventType()) {
11991209
case EVENT_TYPE_ACTIVITY_TASK_STARTED:
1200-
return event.getActivityTaskStartedEventAttributes().getScheduledEventId();
1210+
return OptionalLong.of(event.getActivityTaskStartedEventAttributes().getScheduledEventId());
12011211
case EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1202-
return event.getActivityTaskCompletedEventAttributes().getScheduledEventId();
1212+
return OptionalLong.of(
1213+
event.getActivityTaskCompletedEventAttributes().getScheduledEventId());
12031214
case EVENT_TYPE_ACTIVITY_TASK_FAILED:
1204-
return event.getActivityTaskFailedEventAttributes().getScheduledEventId();
1215+
return OptionalLong.of(event.getActivityTaskFailedEventAttributes().getScheduledEventId());
12051216
case EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1206-
return event.getActivityTaskTimedOutEventAttributes().getScheduledEventId();
1217+
return OptionalLong.of(
1218+
event.getActivityTaskTimedOutEventAttributes().getScheduledEventId());
12071219
case EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED:
1208-
return event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId();
1220+
return OptionalLong.of(
1221+
event.getActivityTaskCancelRequestedEventAttributes().getScheduledEventId());
12091222
case EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1210-
return event.getActivityTaskCanceledEventAttributes().getScheduledEventId();
1223+
return OptionalLong.of(
1224+
event.getActivityTaskCanceledEventAttributes().getScheduledEventId());
12111225
case EVENT_TYPE_TIMER_FIRED:
1212-
return event.getTimerFiredEventAttributes().getStartedEventId();
1226+
return OptionalLong.of(event.getTimerFiredEventAttributes().getStartedEventId());
12131227
case EVENT_TYPE_TIMER_CANCELED:
1214-
return event.getTimerCanceledEventAttributes().getStartedEventId();
1228+
return OptionalLong.of(event.getTimerCanceledEventAttributes().getStartedEventId());
12151229
case EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1216-
return event
1217-
.getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
1218-
.getInitiatedEventId();
1230+
return OptionalLong.of(
1231+
event
1232+
.getRequestCancelExternalWorkflowExecutionFailedEventAttributes()
1233+
.getInitiatedEventId());
12191234
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
1220-
return event
1221-
.getExternalWorkflowExecutionCancelRequestedEventAttributes()
1222-
.getInitiatedEventId();
1235+
return OptionalLong.of(
1236+
event
1237+
.getExternalWorkflowExecutionCancelRequestedEventAttributes()
1238+
.getInitiatedEventId());
12231239
case EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED:
1224-
return event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1240+
return OptionalLong.of(
1241+
event.getStartChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
12251242
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1226-
return event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId();
1243+
return OptionalLong.of(
1244+
event.getChildWorkflowExecutionStartedEventAttributes().getInitiatedEventId());
12271245
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1228-
return event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId();
1246+
return OptionalLong.of(
1247+
event.getChildWorkflowExecutionCompletedEventAttributes().getInitiatedEventId());
12291248
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1230-
return event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId();
1249+
return OptionalLong.of(
1250+
event.getChildWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
12311251
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1232-
return event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId();
1252+
return OptionalLong.of(
1253+
event.getChildWorkflowExecutionCanceledEventAttributes().getInitiatedEventId());
12331254
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1234-
return event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId();
1255+
return OptionalLong.of(
1256+
event.getChildWorkflowExecutionTimedOutEventAttributes().getInitiatedEventId());
12351257
case EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1236-
return event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId();
1258+
return OptionalLong.of(
1259+
event.getChildWorkflowExecutionTerminatedEventAttributes().getInitiatedEventId());
12371260
case EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED:
1238-
return event
1239-
.getSignalExternalWorkflowExecutionFailedEventAttributes()
1240-
.getInitiatedEventId();
1261+
return OptionalLong.of(
1262+
event.getSignalExternalWorkflowExecutionFailedEventAttributes().getInitiatedEventId());
12411263
case EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED:
1242-
return event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId();
1264+
return OptionalLong.of(
1265+
event.getExternalWorkflowExecutionSignaledEventAttributes().getInitiatedEventId());
12431266
case EVENT_TYPE_WORKFLOW_TASK_STARTED:
1244-
return event.getWorkflowTaskStartedEventAttributes().getScheduledEventId();
1267+
return OptionalLong.of(event.getWorkflowTaskStartedEventAttributes().getScheduledEventId());
12451268
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
1246-
return event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId();
1269+
return OptionalLong.of(
1270+
event.getWorkflowTaskCompletedEventAttributes().getScheduledEventId());
12471271
case EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
1248-
return event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId();
1272+
return OptionalLong.of(
1273+
event.getWorkflowTaskTimedOutEventAttributes().getScheduledEventId());
12491274
case EVENT_TYPE_WORKFLOW_TASK_FAILED:
1250-
return event.getWorkflowTaskFailedEventAttributes().getScheduledEventId();
1275+
return OptionalLong.of(event.getWorkflowTaskFailedEventAttributes().getScheduledEventId());
12511276

12521277
case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
12531278
case EVENT_TYPE_TIMER_STARTED:
@@ -1266,12 +1291,14 @@ private long getInitialCommandEventId(HistoryEvent event) {
12661291
case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT:
12671292
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED:
12681293
case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
1269-
return event.getEventId();
1270-
case UNRECOGNIZED:
1271-
case EVENT_TYPE_UNSPECIFIED:
1294+
return OptionalLong.of(event.getEventId());
1295+
1296+
default:
1297+
if (event.getWorkerMayIgnore()) {
1298+
return OptionalLong.empty();
1299+
}
12721300
throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
12731301
}
1274-
throw new IllegalStateException("unreachable");
12751302
}
12761303

12771304
/**
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.replay;
22+
23+
import static io.temporal.testing.WorkflowHistoryLoader.readHistoryFromResource;
24+
25+
import io.temporal.activity.*;
26+
import io.temporal.client.WorkflowClient;
27+
import io.temporal.client.WorkflowOptions;
28+
import io.temporal.common.WorkflowExecutionHistory;
29+
import io.temporal.testing.TestWorkflowEnvironment;
30+
import io.temporal.worker.Worker;
31+
import io.temporal.workflow.*;
32+
import java.time.Duration;
33+
import org.junit.*;
34+
import org.junit.rules.Timeout;
35+
36+
public class UnknownHistoryEventReplayerTest {
37+
38+
public static final String TASK_QUEUE = "unknown-history-event";
39+
public static final String RES_CLEAN = "testUnknownHistoryEventClean.json";
40+
public static final String RES_MAY_IGNORE = "testUnknownHistoryEventMayIgnore.json";
41+
public static final String RES_MAY_NOT_IGNORE = "testUnknownHistoryEventMayNotIgnore.json";
42+
43+
@Rule public Timeout testTimeout = Timeout.seconds(10);
44+
45+
private TestWorkflowEnvironment testEnvironment;
46+
private Worker worker;
47+
48+
@Before
49+
public void setUp() {
50+
testEnvironment = TestWorkflowEnvironment.newInstance();
51+
worker = testEnvironment.newWorker(TASK_QUEUE);
52+
worker.registerWorkflowImplementationTypes(MyWorkflowImpl.class);
53+
worker.registerActivitiesImplementations(new MyActivityImpl());
54+
testEnvironment.start();
55+
}
56+
57+
@After
58+
public void tearDown() {
59+
testEnvironment.close();
60+
}
61+
62+
@Test
63+
public void testRun() {
64+
WorkflowClient client = testEnvironment.getWorkflowClient();
65+
WorkflowOptions options =
66+
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId("plain-run").build();
67+
MyWorkflow stub = client.newWorkflowStub(MyWorkflow.class, options);
68+
stub.execute();
69+
WorkflowExecutionHistory history = client.fetchHistory("plain-run");
70+
System.out.println(history.toJson(true));
71+
}
72+
73+
@Test
74+
public void testClean() throws Exception {
75+
WorkflowExecutionHistory history = readHistoryFromResource(RES_CLEAN);
76+
worker.replayWorkflowExecution(history);
77+
}
78+
79+
@Test
80+
public void testMayIgnore() throws Exception {
81+
WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_IGNORE);
82+
worker.replayWorkflowExecution(history);
83+
}
84+
85+
@Test(expected = RuntimeException.class)
86+
public void testMayNotIgnore() throws Exception {
87+
WorkflowExecutionHistory history = readHistoryFromResource(RES_MAY_NOT_IGNORE);
88+
worker.replayWorkflowExecution(history);
89+
}
90+
91+
@WorkflowInterface
92+
public interface MyWorkflow {
93+
94+
@WorkflowMethod
95+
void execute();
96+
}
97+
98+
@ActivityInterface
99+
public interface MyActivity {
100+
101+
@ActivityMethod
102+
void execute();
103+
}
104+
105+
public static class MyWorkflowImpl implements MyWorkflow {
106+
107+
@Override
108+
public void execute() {
109+
MyActivity activity =
110+
Workflow.newLocalActivityStub(
111+
MyActivity.class,
112+
LocalActivityOptions.newBuilder()
113+
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
114+
.build());
115+
activity.execute();
116+
}
117+
}
118+
119+
public static class MyActivityImpl implements MyActivity {
120+
121+
@Override
122+
public void execute() {}
123+
}
124+
}

0 commit comments

Comments
 (0)