Skip to content

Commit d8bf112

Browse files
committed
Fix internal flags not always being written when they should be (#532)
* Update protos subtree to 84a8e55fa275185934f242616e3e0f0f84788971
1 parent c1728c0 commit d8bf112

File tree

26 files changed

+447
-210
lines changed

26 files changed

+447
-210
lines changed

client/src/raw.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -740,19 +740,19 @@ proxier! {
740740
}
741741
);
742742
(
743-
update_worker_build_id_ordering,
744-
UpdateWorkerBuildIdOrderingRequest,
745-
UpdateWorkerBuildIdOrderingResponse,
743+
update_worker_build_id_compatibility,
744+
UpdateWorkerBuildIdCompatibilityRequest,
745+
UpdateWorkerBuildIdCompatibilityResponse,
746746
|r| {
747747
let mut labels = AttachMetricLabels::namespace(r.get_ref().namespace.clone());
748748
labels.task_q_str(r.get_ref().task_queue.clone());
749749
r.extensions_mut().insert(labels);
750750
}
751751
);
752752
(
753-
get_worker_build_id_ordering,
754-
GetWorkerBuildIdOrderingRequest,
755-
GetWorkerBuildIdOrderingResponse,
753+
get_worker_build_id_compatibility,
754+
GetWorkerBuildIdCompatibilityRequest,
755+
GetWorkerBuildIdCompatibilityResponse,
756756
|r| {
757757
let mut labels = AttachMetricLabels::namespace(r.get_ref().namespace.clone());
758758
labels.task_q_str(r.get_ref().task_queue.clone());
@@ -768,6 +768,15 @@ proxier! {
768768
r.extensions_mut().insert(labels);
769769
}
770770
);
771+
(
772+
poll_workflow_execution_update,
773+
PollWorkflowExecutionUpdateRequest,
774+
PollWorkflowExecutionUpdateResponse,
775+
|r| {
776+
let labels = AttachMetricLabels::namespace(r.get_ref().namespace.clone());
777+
r.extensions_mut().insert(labels);
778+
}
779+
);
771780
(
772781
start_batch_operation,
773782
StartBatchOperationRequest,

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dashmap = "5.0"
2828
derive_builder = "0.12"
2929
derive_more = "0.99"
3030
enum_dispatch = "0.3"
31+
enum-iterator = "1.4"
3132
flate2 = "1.0"
3233
futures = "0.3"
3334
futures-util = "0.3"

core/src/core_tests/workflow_tasks.rs

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::{
2-
advance_fut, job_assert,
2+
advance_fut,
3+
internal_flags::CoreInternalFlags,
4+
job_assert,
35
replay::TestHistoryBuilder,
46
test_help::{
57
build_fake_worker, build_mock_pollers, build_multihist_mock_sg, canned_histories,
@@ -14,9 +16,9 @@ use crate::{
1416
use futures::{stream, FutureExt};
1517
use rstest::{fixture, rstest};
1618
use std::{
17-
collections::{HashMap, VecDeque},
19+
collections::{HashMap, HashSet, VecDeque},
1820
sync::{
19-
atomic::{AtomicBool, AtomicU64, Ordering},
21+
atomic::{AtomicU64, Ordering},
2022
Arc,
2123
},
2224
time::Duration,
@@ -54,9 +56,7 @@ use temporal_sdk_core_protos::{
5456
},
5557
DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE,
5658
};
57-
use temporal_sdk_core_test_utils::{
58-
fanout_tasks, schedule_activity_cmd, start_timer_cmd, WorkerTestHelpers,
59-
};
59+
use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, WorkerTestHelpers};
6060
use tokio::{
6161
join,
6262
sync::{Barrier, Semaphore},
@@ -2380,54 +2380,36 @@ async fn lang_internal_flags() {
23802380
core.shutdown().await;
23812381
}
23822382

2383-
// Verify we send flags to server when they're used
2383+
// Verify we send all core internal flags on the first non-replay WFT
23842384
#[tokio::test]
23852385
async fn core_internal_flags() {
23862386
let mut t = TestHistoryBuilder::default();
23872387
t.add_by_type(EventType::WorkflowExecutionStarted);
2388-
t.add_full_wf_task();
2389-
let act_scheduled_event_id = t.add_activity_task_scheduled("act-id");
2390-
let act_started_event_id = t.add_activity_task_started(act_scheduled_event_id);
2391-
t.add_activity_task_completed(
2392-
act_scheduled_event_id,
2393-
act_started_event_id,
2394-
Default::default(),
2395-
);
2396-
t.add_full_wf_task();
2397-
t.add_workflow_execution_completed();
2388+
t.add_workflow_task_scheduled_and_started();
23982389

23992390
let mut mh = MockPollCfg::from_resp_batches(
24002391
"fake_wf_id",
24012392
t,
2402-
[ResponseType::ToTaskNum(1), ResponseType::ToTaskNum(2)],
2393+
[ResponseType::ToTaskNum(1)],
24032394
mock_workflow_client(),
24042395
);
2405-
let first_poll = AtomicBool::new(true);
24062396
mh.completion_asserts = Some(Box::new(move |c| {
2407-
if !first_poll.load(Ordering::Acquire) {
2408-
assert_matches!(c.sdk_metadata.core_used_flags.as_slice(), &[1]);
2409-
}
2410-
first_poll.store(false, Ordering::Release);
2397+
assert_eq!(
2398+
c.sdk_metadata
2399+
.core_used_flags
2400+
.iter()
2401+
.copied()
2402+
.collect::<HashSet<_>>(),
2403+
CoreInternalFlags::all_except_too_high()
2404+
.into_iter()
2405+
.map(|f| f as u32)
2406+
.collect()
2407+
);
24112408
}));
24122409
let mut mock = build_mock_pollers(mh);
24132410
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
24142411
let core = mock_worker(mock);
24152412

2416-
let act = core.poll_workflow_activation().await.unwrap();
2417-
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
2418-
act.run_id,
2419-
schedule_activity_cmd(
2420-
1,
2421-
"whatever",
2422-
"act-id",
2423-
ActivityCancellationType::TryCancel,
2424-
Duration::from_secs(60),
2425-
Duration::from_secs(60),
2426-
),
2427-
))
2428-
.await
2429-
.unwrap();
2430-
24312413
let act = core.poll_workflow_activation().await.unwrap();
24322414
core.complete_execution(&act.run_id).await;
24332415
core.shutdown().await;

core/src/internal_flags.rs

Lines changed: 132 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
//! Utilities for and tracking of internal versions which alter history in incompatible ways
22
//! so that we can use older code paths for workflows executed on older core versions.
33
4-
use std::collections::{BTreeSet, HashSet};
4+
use itertools::Either;
5+
use std::{
6+
collections::{BTreeSet, HashSet},
7+
iter,
8+
};
59
use temporal_sdk_core_protos::temporal::api::{
610
history::v1::WorkflowTaskCompletedEventAttributes, sdk::v1::WorkflowTaskCompletedMetadata,
711
workflowservice::v1::get_system_info_response,
@@ -15,7 +19,7 @@ use temporal_sdk_core_protos::temporal::api::{
1519
/// that removing older variants does not create any change in existing values. Removed flag
1620
/// variants must be reserved forever (a-la protobuf), and should be called out in a comment.
1721
#[repr(u32)]
18-
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone, Debug)]
22+
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Copy, Clone, Debug, enum_iterator::Sequence)]
1923
pub(crate) enum CoreInternalFlags {
2024
/// In this flag additional checks were added to a number of state machines to ensure that
2125
/// the ID and type of activities, local activities, and child workflows match during replay.
@@ -28,83 +32,125 @@ pub(crate) enum CoreInternalFlags {
2832
}
2933

3034
#[derive(Debug, Clone, PartialEq, Eq)]
31-
pub(crate) struct InternalFlags {
32-
enabled: bool,
33-
core: BTreeSet<CoreInternalFlags>,
34-
lang: BTreeSet<u32>,
35-
core_since_last_complete: HashSet<CoreInternalFlags>,
36-
lang_since_last_complete: HashSet<u32>,
35+
pub(crate) enum InternalFlags {
36+
Enabled {
37+
core: BTreeSet<CoreInternalFlags>,
38+
lang: BTreeSet<u32>,
39+
core_since_last_complete: HashSet<CoreInternalFlags>,
40+
lang_since_last_complete: HashSet<u32>,
41+
},
42+
Disabled,
3743
}
3844

3945
impl InternalFlags {
4046
pub fn new(server_capabilities: &get_system_info_response::Capabilities) -> Self {
41-
Self {
42-
enabled: server_capabilities.sdk_metadata,
43-
core: Default::default(),
44-
lang: Default::default(),
45-
core_since_last_complete: Default::default(),
46-
lang_since_last_complete: Default::default(),
47+
match server_capabilities.sdk_metadata {
48+
true => Self::Enabled {
49+
core: Default::default(),
50+
lang: Default::default(),
51+
core_since_last_complete: Default::default(),
52+
lang_since_last_complete: Default::default(),
53+
},
54+
false => Self::Disabled,
4755
}
4856
}
4957

5058
pub fn add_from_complete(&mut self, e: &WorkflowTaskCompletedEventAttributes) {
51-
if !self.enabled {
52-
return;
53-
}
54-
55-
if let Some(metadata) = e.sdk_metadata.as_ref() {
56-
self.core.extend(
57-
metadata
58-
.core_used_flags
59-
.iter()
60-
.map(|u| CoreInternalFlags::from_u32(*u)),
61-
);
62-
self.lang.extend(metadata.lang_used_flags.iter());
59+
if let Self::Enabled { core, lang, .. } = self {
60+
if let Some(metadata) = e.sdk_metadata.as_ref() {
61+
core.extend(
62+
metadata
63+
.core_used_flags
64+
.iter()
65+
.map(|u| CoreInternalFlags::from_u32(*u)),
66+
);
67+
lang.extend(metadata.lang_used_flags.iter());
68+
}
6369
}
6470
}
6571

6672
pub fn add_lang_used(&mut self, flags: impl IntoIterator<Item = u32>) {
67-
if !self.enabled {
68-
return;
73+
if let Self::Enabled {
74+
lang_since_last_complete,
75+
..
76+
} = self
77+
{
78+
lang_since_last_complete.extend(flags.into_iter());
6979
}
70-
71-
self.lang_since_last_complete.extend(flags.into_iter());
7280
}
7381

7482
/// Returns true if this flag may currently be used. If `should_record` is true, always returns
7583
/// true and records the flag as being used, for taking later via
7684
/// [Self::gather_for_wft_complete].
7785
pub fn try_use(&mut self, core_patch: CoreInternalFlags, should_record: bool) -> bool {
78-
if !self.enabled {
86+
match self {
87+
Self::Enabled {
88+
core,
89+
core_since_last_complete,
90+
..
91+
} => {
92+
if should_record {
93+
core_since_last_complete.insert(core_patch);
94+
true
95+
} else {
96+
core.contains(&core_patch)
97+
}
98+
}
7999
// If the server does not support the metadata field, we must assume we can never use
80100
// any internal flags since they can't be recorded for future use
81-
return false;
101+
Self::Disabled => false,
82102
}
103+
}
83104

84-
if should_record {
85-
self.core_since_last_complete.insert(core_patch);
86-
true
87-
} else {
88-
self.core.contains(&core_patch)
105+
/// Writes all known core flags to the set which should be recorded in the current WFT if not
106+
/// already known. Must only be called if not replaying.
107+
pub fn write_all_known(&mut self) {
108+
if let Self::Enabled {
109+
core_since_last_complete,
110+
..
111+
} = self
112+
{
113+
core_since_last_complete.extend(CoreInternalFlags::all_except_too_high());
89114
}
90115
}
91116

92117
/// Wipes the recorded flags used during the current WFT and returns a partially filled
93118
/// sdk metadata message that can be combined with any existing data before sending the WFT
94119
/// complete
95120
pub fn gather_for_wft_complete(&mut self) -> WorkflowTaskCompletedMetadata {
96-
WorkflowTaskCompletedMetadata {
97-
core_used_flags: self
98-
.core_since_last_complete
99-
.drain()
100-
.map(|p| p as u32)
101-
.collect(),
102-
lang_used_flags: self.lang_since_last_complete.drain().collect(),
121+
match self {
122+
Self::Enabled {
123+
core_since_last_complete,
124+
lang_since_last_complete,
125+
core,
126+
lang,
127+
} => {
128+
let core_newly_used: Vec<_> = core_since_last_complete
129+
.iter()
130+
.filter(|f| !core.contains(f))
131+
.map(|p| *p as u32)
132+
.collect();
133+
let lang_newly_used: Vec<_> = lang_since_last_complete
134+
.iter()
135+
.filter(|f| !lang.contains(f))
136+
.copied()
137+
.collect();
138+
core.extend(core_since_last_complete.iter());
139+
lang.extend(lang_since_last_complete.iter());
140+
WorkflowTaskCompletedMetadata {
141+
core_used_flags: core_newly_used,
142+
lang_used_flags: lang_newly_used,
143+
}
144+
}
145+
Self::Disabled => WorkflowTaskCompletedMetadata::default(),
103146
}
104147
}
105148

106-
pub fn all_lang(&self) -> &BTreeSet<u32> {
107-
&self.lang
149+
pub fn all_lang(&self) -> impl Iterator<Item = u32> + '_ {
150+
match self {
151+
Self::Enabled { lang, .. } => Either::Left(lang.iter().copied()),
152+
Self::Disabled => Either::Right(iter::empty()),
153+
}
108154
}
109155
}
110156

@@ -116,6 +162,11 @@ impl CoreInternalFlags {
116162
_ => Self::TooHigh,
117163
}
118164
}
165+
166+
pub fn all_except_too_high() -> impl Iterator<Item = CoreInternalFlags> {
167+
enum_iterator::all::<CoreInternalFlags>()
168+
.filter(|f| !matches!(f, CoreInternalFlags::TooHigh))
169+
}
119170
}
120171

121172
#[cfg(test)]
@@ -138,4 +189,39 @@ mod tests {
138189
assert_matches!(gathered.core_used_flags.as_slice(), &[]);
139190
assert_matches!(gathered.lang_used_flags.as_slice(), &[]);
140191
}
192+
193+
#[test]
194+
fn all_have_u32_from_impl() {
195+
let all_known = CoreInternalFlags::all_except_too_high();
196+
for flag in all_known {
197+
let as_u32 = flag as u32;
198+
assert_eq!(CoreInternalFlags::from_u32(as_u32), flag);
199+
}
200+
}
201+
202+
#[test]
203+
fn only_writes_new_flags() {
204+
let mut f = InternalFlags::new(&Capabilities {
205+
sdk_metadata: true,
206+
..Default::default()
207+
});
208+
f.add_lang_used([1]);
209+
f.try_use(CoreInternalFlags::IdAndTypeDeterminismChecks, true);
210+
let gathered = f.gather_for_wft_complete();
211+
assert_matches!(gathered.core_used_flags.as_slice(), &[1]);
212+
assert_matches!(gathered.lang_used_flags.as_slice(), &[1]);
213+
214+
f.add_from_complete(&WorkflowTaskCompletedEventAttributes {
215+
sdk_metadata: Some(WorkflowTaskCompletedMetadata {
216+
core_used_flags: vec![2],
217+
lang_used_flags: vec![2],
218+
}),
219+
..Default::default()
220+
});
221+
f.add_lang_used([2]);
222+
f.try_use(CoreInternalFlags::UpsertSearchAttributeOnPatch, true);
223+
let gathered = f.gather_for_wft_complete();
224+
assert_matches!(gathered.core_used_flags.as_slice(), &[]);
225+
assert_matches!(gathered.lang_used_flags.as_slice(), &[]);
226+
}
141227
}

0 commit comments

Comments
 (0)