Skip to content

Commit 653f4ab

Browse files
committed
Avoid possible panic on autocompleting WFTs due to failure, and stop checking SA values for determinism (#527)
1 parent 28f627b commit 653f4ab

File tree

8 files changed

+135
-145
lines changed

8 files changed

+135
-145
lines changed

core/src/core_tests/determinism.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ use temporal_sdk::{
1313
ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, WfContext, WorkflowResult,
1414
};
1515
use temporal_sdk_core_protos::{
16-
temporal::api::{enums::v1::WorkflowTaskFailedCause, failure::v1::Failure},
17-
DEFAULT_ACTIVITY_TYPE,
16+
temporal::api::{
17+
enums::v1::{EventType, WorkflowTaskFailedCause},
18+
failure::v1::Failure,
19+
},
20+
TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE,
1821
};
1922

2023
static DID_FAIL: AtomicBool = AtomicBool::new(false);
@@ -268,3 +271,47 @@ async fn child_wf_id_or_type_change_is_nondeterministic(
268271
.unwrap();
269272
worker.run_until_done().await.unwrap();
270273
}
274+
275+
/// Repros a situation where if, upon completing a task there is some internal error which causes
276+
/// us to want to auto-fail the workflow task while there is also an outstanding eviction, the wf
277+
/// would get evicted but then try to send some info down the completion channel afterward, causing
278+
/// a panic.
279+
#[tokio::test]
280+
async fn repro_channel_missing_because_nondeterminism() {
281+
for _ in 1..50 {
282+
let wf_id = "fakeid";
283+
let wf_type = DEFAULT_WORKFLOW_TYPE;
284+
let mut t = TestHistoryBuilder::default();
285+
t.add_by_type(EventType::WorkflowExecutionStarted);
286+
t.add_full_wf_task();
287+
t.add_has_change_marker("patch-1", false);
288+
let _ts = t.add_by_type(EventType::TimerStarted);
289+
t.add_workflow_task_scheduled_and_started();
290+
291+
let mock = mock_workflow_client();
292+
let mut mh =
293+
MockPollCfg::from_resp_batches(wf_id, t, [1.into(), ResponseType::AllHistory], mock);
294+
mh.num_expected_fails = 1;
295+
let mut worker = mock_sdk_cfg(mh, |cfg| {
296+
cfg.max_cached_workflows = 2;
297+
cfg.ignore_evicts_on_shutdown = false;
298+
});
299+
300+
worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move {
301+
ctx.patched("wrongid");
302+
ctx.timer(Duration::from_secs(1)).await;
303+
Ok(().into())
304+
});
305+
306+
worker
307+
.submit_wf(
308+
wf_id.to_owned(),
309+
wf_type.to_owned(),
310+
vec![],
311+
WorkflowOptions::default(),
312+
)
313+
.await
314+
.unwrap();
315+
worker.run_until_done().await.unwrap();
316+
}
317+
}

core/src/internal_flags.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,6 @@ impl InternalFlags {
4747
}
4848
}
4949

50-
#[cfg(test)]
51-
pub fn all_core_enabled() -> Self {
52-
Self {
53-
enabled: true,
54-
core: BTreeSet::from([
55-
CoreInternalFlags::IdAndTypeDeterminismChecks,
56-
CoreInternalFlags::UpsertSearchAttributeOnPatch,
57-
]),
58-
lang: Default::default(),
59-
core_since_last_complete: Default::default(),
60-
lang_since_last_complete: Default::default(),
61-
}
62-
}
63-
6450
pub fn add_from_complete(&mut self, e: &WorkflowTaskCompletedEventAttributes) {
6551
if !self.enabled {
6652
return;

core/src/worker/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ impl Worker {
538538
self.workflows
539539
.activation_completed(
540540
completion,
541+
false,
541542
self.post_activate_hook
542543
.as_ref()
543544
.map(|h| |data: PostActivateHookData| h(self, data)),

core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs

Lines changed: 23 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ fsm! {
3838
// upon observing a history event indicating that the command has been recorded. Note that this
3939
// does not imply that the command has been _executed_, only that it _will be_ executed at some
4040
// point in the future.
41-
CommandIssued --(CommandRecorded(CmdRecDat), shared on_command_recorded) --> Done;
41+
CommandIssued --(CommandRecorded, on_command_recorded) --> Done;
4242
}
4343

4444
/// Instantiates an UpsertSearchAttributesMachine and packs it together with an initial command
@@ -62,37 +62,21 @@ pub(super) fn upsert_search_attrs(
6262
);
6363
// We must still create the command to preserve compatability with anyone previously doing
6464
// this.
65-
create_new(Default::default(), true, internal_flags)
65+
create_new(Default::default())
6666
} else {
67-
create_new(attribs.search_attributes.into(), false, internal_flags)
67+
create_new(attribs.search_attributes.into())
6868
}
6969
}
7070

7171
/// May be used by other state machines / internal needs which desire upserting search attributes.
7272
pub(super) fn upsert_search_attrs_internal(
7373
attribs: UpsertWorkflowSearchAttributesCommandAttributes,
74-
internal_flags: InternalFlagsRef,
7574
) -> NewMachineWithCommand {
76-
create_new(
77-
attribs.search_attributes.unwrap_or_default(),
78-
true,
79-
internal_flags,
80-
)
75+
create_new(attribs.search_attributes.unwrap_or_default())
8176
}
8277

83-
fn create_new(
84-
sa_map: SearchAttributes,
85-
should_skip_determinism: bool,
86-
internal_flags: InternalFlagsRef,
87-
) -> NewMachineWithCommand {
88-
let sm = UpsertSearchAttributesMachine::from_parts(
89-
Created {}.into(),
90-
SharedState {
91-
sa_map: sa_map.clone(),
92-
should_skip_determinism,
93-
internal_flags,
94-
},
95-
);
78+
fn create_new(sa_map: SearchAttributes) -> NewMachineWithCommand {
79+
let sm = UpsertSearchAttributesMachine::from_parts(Created {}.into(), SharedState {});
9680
let cmd = Command {
9781
command_type: CommandType::UpsertWorkflowSearchAttributes as i32,
9882
attributes: Some(
@@ -110,11 +94,7 @@ fn create_new(
11094
}
11195

11296
#[derive(Clone)]
113-
pub(super) struct SharedState {
114-
should_skip_determinism: bool,
115-
sa_map: SearchAttributes,
116-
internal_flags: InternalFlagsRef,
117-
}
97+
pub(super) struct SharedState {}
11898

11999
/// The state-machine-specific set of commands that are the results of state transition in the
120100
/// UpsertSearchAttributesMachine. There are none of these because this state machine emits the
@@ -131,30 +111,9 @@ pub(super) struct Created {}
131111
/// higher-level machinery, it transitions into this state.
132112
#[derive(Debug, Default, Clone, derive_more::Display)]
133113
pub(super) struct CommandIssued {}
134-
pub(super) struct CmdRecDat {
135-
sa_map: Option<SearchAttributes>,
136-
replaying: bool,
137-
}
138114

139115
impl CommandIssued {
140-
pub(super) fn on_command_recorded(
141-
self,
142-
shared: &mut SharedState,
143-
dat: CmdRecDat,
144-
) -> UpsertSearchAttributesMachineTransition<Done> {
145-
if shared.internal_flags.borrow_mut().try_use(
146-
CoreInternalFlags::UpsertSearchAttributeOnPatch,
147-
!dat.replaying,
148-
) {
149-
let sa = dat.sa_map.unwrap_or_default();
150-
if !shared.should_skip_determinism && shared.sa_map != sa {
151-
return TransitionResult::Err(WFMachinesError::Nondeterminism(format!(
152-
"Search attribute upsert calls must remain deterministic, but {:?} does not \
153-
match the attributes from history: {:?}",
154-
shared.sa_map, sa
155-
)));
156-
}
157-
}
116+
pub(super) fn on_command_recorded(self) -> UpsertSearchAttributesMachineTransition<Done> {
158117
TransitionResult::default()
159118
}
160119
}
@@ -198,14 +157,9 @@ impl TryFrom<HistEventData> for UpsertSearchAttributesMachineEvents {
198157

199158
fn try_from(e: HistEventData) -> Result<Self, Self::Error> {
200159
match e.event.attributes {
201-
Some(history_event::Attributes::UpsertWorkflowSearchAttributesEventAttributes(
202-
attrs,
203-
)) => Ok(UpsertSearchAttributesMachineEvents::CommandRecorded(
204-
CmdRecDat {
205-
sa_map: attrs.search_attributes,
206-
replaying: e.replaying,
207-
},
208-
)),
160+
Some(history_event::Attributes::UpsertWorkflowSearchAttributesEventAttributes(_)) => {
161+
Ok(UpsertSearchAttributesMachineEvents::CommandRecorded)
162+
}
209163
_ => Err(Self::Error::Nondeterminism(format!(
210164
"UpsertWorkflowSearchAttributesMachine does not handle {e}"
211165
))),
@@ -241,16 +195,15 @@ impl From<Created> for CommandIssued {
241195
mod tests {
242196
use super::{super::OnEventWrapper, *};
243197
use crate::{
244-
internal_flags::InternalFlags,
245198
replay::TestHistoryBuilder,
246199
test_help::{build_mock_pollers, mock_worker, MockPollCfg, ResponseType},
247200
worker::{
248201
client::mocks::mock_workflow_client,
249202
workflow::{machines::patch_state_machine::VERSION_SEARCH_ATTR_KEY, ManagedWFFunc},
250203
},
251204
};
252-
use rustfsm::{MachineError, StateMachine};
253-
use std::{cell::RefCell, collections::HashMap, rc::Rc};
205+
use rustfsm::StateMachine;
206+
use std::collections::HashMap;
254207
use temporal_sdk::{WfContext, WorkflowFunction};
255208
use temporal_sdk_core_api::Worker;
256209
use temporal_sdk_core_protos::{
@@ -317,25 +270,14 @@ mod tests {
317270
}
318271

319272
#[rstest::rstest]
320-
fn upsert_search_attrs_sm(#[values(true, false)] nondetermistic: bool) {
321-
let mut sm = UpsertSearchAttributesMachine::from_parts(
322-
Created {}.into(),
323-
SharedState {
324-
sa_map: Default::default(),
325-
should_skip_determinism: false,
326-
internal_flags: Rc::new(RefCell::new(InternalFlags::all_core_enabled())),
327-
},
328-
);
329-
330-
let sa_attribs = if nondetermistic {
331-
UpsertWorkflowSearchAttributesEventAttributes {
332-
workflow_task_completed_event_id: 0,
333-
search_attributes: Some(SearchAttributes {
334-
indexed_fields: HashMap::from([("Yo".to_string(), Payload::default())]),
335-
}),
336-
}
337-
} else {
338-
Default::default()
273+
fn upsert_search_attrs_sm() {
274+
let mut sm = UpsertSearchAttributesMachine::from_parts(Created {}.into(), SharedState {});
275+
276+
let sa_attribs = UpsertWorkflowSearchAttributesEventAttributes {
277+
workflow_task_completed_event_id: 0,
278+
search_attributes: Some(SearchAttributes {
279+
indexed_fields: HashMap::from([("Yo".to_string(), Payload::default())]),
280+
}),
339281
};
340282
let recorded_history_event = HistoryEvent {
341283
event_type: EventType::UpsertWorkflowSearchAttributes as i32,
@@ -365,15 +307,8 @@ mod tests {
365307
assert_eq!(CommandIssued {}.to_string(), sm.state().to_string());
366308

367309
let recorded_res = OnEventWrapper::on_event_mut(&mut sm, cmd_recorded_sm_event);
368-
if nondetermistic {
369-
assert_matches!(
370-
recorded_res.unwrap_err(),
371-
MachineError::Underlying(WFMachinesError::Nondeterminism(_))
372-
);
373-
} else {
374-
recorded_res.expect("CommandRecorded should transition CommandIssued -> Done");
375-
assert_eq!(Done {}.to_string(), sm.state().to_string());
376-
}
310+
recorded_res.expect("CommandRecorded should transition CommandIssued -> Done");
311+
assert_eq!(Done {}.to_string(), sm.state().to_string());
377312
}
378313

379314
#[rstest::rstest]

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -924,10 +924,7 @@ impl WorkflowMachines {
924924
}
925925
ProtoCmdAttrs::UpsertWorkflowSearchAttributesCommandAttributes(attrs) => {
926926
self.add_cmd_to_wf_task(
927-
upsert_search_attrs_internal(
928-
attrs,
929-
self.observed_internal_flags.clone(),
930-
),
927+
upsert_search_attrs_internal(attrs),
931928
CommandIdKind::NeverResolves,
932929
);
933930
}

core/src/worker/workflow/managed_run.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,15 @@ impl ManagedRun {
550550

551551
/// Delete the currently tracked workflow activation and return it, if any. Should be called
552552
/// after the processing of the activation completion, and WFT reporting.
553-
pub(super) fn delete_activation(&mut self) -> Option<OutstandingActivation> {
554-
self.activation.take()
553+
pub(super) fn delete_activation(
554+
&mut self,
555+
pred: impl FnOnce(&OutstandingActivation) -> bool,
556+
) -> Option<OutstandingActivation> {
557+
if self.activation().map(pred).unwrap_or_default() {
558+
self.activation.take()
559+
} else {
560+
None
561+
}
555562
}
556563

557564
/// Called when local activities resolve
@@ -929,6 +936,7 @@ impl ManagedRun {
929936
}
930937

931938
fn insert_outstanding_activation(&mut self, act: &ActivationOrAuto) {
939+
warn!("Inserting {:?}", act);
932940
let act_type = match &act {
933941
ActivationOrAuto::LangActivation(act) | ActivationOrAuto::ReadyForQueries(act) => {
934942
if act.is_legacy_query() {

0 commit comments

Comments
 (0)