Skip to content

Commit 9cdff7a

Browse files
Fix UnsupportedOperationException in handleSingleEventLookahead (#2061)
1 parent a41c64e commit 9cdff7a

File tree

2 files changed

+87
-51
lines changed

2 files changed

+87
-51
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public void setReplaying(boolean replaying) {
260260
}
261261

262262
public void setMessages(List<Message> messages) {
263-
this.messages = messages;
263+
this.messages = new ArrayList<>(messages);
264264
}
265265

266266
/**

temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java

Lines changed: 86 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@
4343
import io.temporal.internal.common.ProtobufTimeUtils;
4444
import io.temporal.internal.common.UpdateMessage;
4545
import java.time.Duration;
46-
import java.util.ArrayList;
47-
import java.util.Arrays;
48-
import java.util.List;
49-
import java.util.Optional;
46+
import java.util.*;
5047
import org.junit.AfterClass;
5148
import org.junit.Test;
5249

@@ -98,6 +95,12 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
9895
builder
9996
.add(
10097
(r) -> {
98+
if (message.getMessage().getId().startsWith("reject")) {
99+
message
100+
.getCallbacks()
101+
.reject(converter.exceptionToFailure(new RuntimeException()));
102+
return;
103+
}
101104
message.getCallbacks().accept();
102105
})
103106
.<HistoryEvent>add1(
@@ -122,8 +125,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
122125
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
123126
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
124127
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
125-
5: EVENT_TYPE_TIMER_STARTED
126-
6: EVENT_TYPE_WORKFLOW_EXECUTION_ACCEPTED
128+
5: EVENT_TYPE_WORKFLOW_EXECUTION_ACCEPTED
129+
6: EVENT_TYPE_TIMER_STARTED
127130
7: EVENT_TYPE_TIMER_FIRED
128131
8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
129132
9: EVENT_TYPE_WORKFLOW_TASK_STARTED
@@ -194,6 +197,27 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
194197
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines);
195198
assertEquals(0, commands.size());
196199
}
200+
{
201+
TestEntityManagerListenerBase listener = new TestUpdateListener();
202+
stateMachines = newStateMachines(listener);
203+
stateMachines.setMessages(
204+
Collections.unmodifiableList(
205+
Arrays.asList(
206+
Message.newBuilder()
207+
.setProtocolInstanceId("reject_update_id")
208+
.setId("reject")
209+
.setEventId(9)
210+
.setBody(
211+
Any.pack(
212+
Request.newBuilder()
213+
.setInput(Input.newBuilder().setName("updateName").build())
214+
.build()))
215+
.build())));
216+
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
217+
assertEquals(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE, commands.get(0).getCommandType());
218+
assertEquals(
219+
CommandType.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, commands.get(1).getCommandType());
220+
}
197221
}
198222

199223
@Test
@@ -205,17 +229,19 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
205229
builder.add(
206230
(r) -> {
207231
stateMachines.setMessages(
208-
Arrays.asList(
209-
Message.newBuilder()
210-
.setProtocolInstanceId("protocol_id")
211-
.setId("id")
212-
.setEventId(2)
213-
.setBody(
214-
Any.pack(
215-
Request.newBuilder()
216-
.setInput(Input.newBuilder().setName("updateName").build())
217-
.build()))
218-
.build()));
232+
Collections.unmodifiableList(
233+
Arrays.asList(
234+
Message.newBuilder()
235+
.setProtocolInstanceId("protocol_id")
236+
.setId("id")
237+
.setEventId(2)
238+
.setBody(
239+
Any.pack(
240+
Request.newBuilder()
241+
.setInput(
242+
Input.newBuilder().setName("updateName").build())
243+
.build()))
244+
.build())));
219245
});
220246
}
221247

@@ -387,15 +413,16 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
387413
.setArgs(converter.toPayloads("arg").get()))
388414
.build());
389415
stateMachines.setMessages(
390-
Arrays.asList(
391-
new Message[] {
392-
Message.newBuilder()
393-
.setProtocolInstanceId("protocol_id")
394-
.setId("id")
395-
.setEventId(0)
396-
.setBody(messageBody)
397-
.build(),
398-
}));
416+
Collections.unmodifiableList(
417+
Arrays.asList(
418+
new Message[] {
419+
Message.newBuilder()
420+
.setProtocolInstanceId("protocol_id")
421+
.setId("id")
422+
.setEventId(0)
423+
.setBody(messageBody)
424+
.build(),
425+
})));
399426
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1);
400427
assertEquals(0, commands.size());
401428
}
@@ -494,17 +521,19 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
494521
builder.add(
495522
(r) -> {
496523
stateMachines.setMessages(
497-
Arrays.asList(
498-
Message.newBuilder()
499-
.setProtocolInstanceId("message_updateID")
500-
.setId("message_updateID/request")
501-
.setEventId(6)
502-
.setBody(
503-
Any.pack(
504-
Request.newBuilder()
505-
.setInput(Input.newBuilder().setName("updateName").build())
506-
.build()))
507-
.build()));
524+
Collections.unmodifiableList(
525+
Arrays.asList(
526+
Message.newBuilder()
527+
.setProtocolInstanceId("message_updateID")
528+
.setId("message_updateID/request")
529+
.setEventId(6)
530+
.setBody(
531+
Any.pack(
532+
Request.newBuilder()
533+
.setInput(
534+
Input.newBuilder().setName("updateName").build())
535+
.build()))
536+
.build())));
508537
});
509538
}
510539

@@ -528,8 +557,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
528557
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
529558
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
530559
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
531-
5: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
532-
6: EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED
560+
5: EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED
561+
6: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
533562
7: EVENT_TYPE_WORKFLOW_TASK_STARTED
534563
*/
535564
TestHistoryBuilder h = new TestHistoryBuilder();
@@ -551,6 +580,12 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
551580
h.addWorkflowTaskScheduled();
552581
h.addWorkflowTaskStarted();
553582
}
583+
{
584+
// Full replay
585+
TestEntityManagerListenerBase listener = new TestUpdateListener();
586+
stateMachines = newStateMachines(listener);
587+
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
588+
}
554589
{
555590
// Full replay
556591
TestEntityManagerListenerBase listener = new TestUpdateListener();
@@ -840,17 +875,18 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
840875
TestEntityManagerListenerBase listener = new TestUpdateListener();
841876
stateMachines = newStateMachines(listener);
842877
stateMachines.setMessages(
843-
Arrays.asList(
844-
Message.newBuilder()
845-
.setProtocolInstanceId("message_update")
846-
.setId("message_update/request")
847-
.setEventId(7)
848-
.setBody(
849-
Any.pack(
850-
Request.newBuilder()
851-
.setInput(Input.newBuilder().setName("updateName").build())
852-
.build()))
853-
.build()));
878+
Collections.unmodifiableList(
879+
Arrays.asList(
880+
Message.newBuilder()
881+
.setProtocolInstanceId("message_update")
882+
.setId("message_update/request")
883+
.setEventId(7)
884+
.setBody(
885+
Any.pack(
886+
Request.newBuilder()
887+
.setInput(Input.newBuilder().setName("updateName").build())
888+
.build()))
889+
.build())));
854890
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
855891
assertEquals(6, commands.size());
856892
assertEquals(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE, commands.get(0).getCommandType());

0 commit comments

Comments
 (0)