Skip to content

Commit c45b4b9

Browse files
authored
fix(meta-service): Remove transient expire_cursor to prevent state inconsistency (#18318)
* refactor(meta-service): write test to expose a bug with expiration consistency Tests timestamp-based expiration consistency across snapshot boundaries. **Scenario:** - Log-1 has timestamp T+180s, included in snapshot - Log-2 has timestamp T+60s with expiration T+120s, not in snapshot **Issue:** After restart, Log-2 uses current time instead of last-seen time (from Log-1) for expiration checks, potentially causing state inconsistencies between nodes. This bug raises in real world if: - When meta-service restarts, there is one log(`A`) to re-apply that has smaller `expire_at`(`t1`) than the biggest timestamp(`LogEntry.time_ms`) in the snapshot, - and all the log entries before `A` have smaller timestamp(`LogEntry.time_ms`) than `t1`. * fix(meta-service): Remove transient `expire_cursor` to prevent state inconsistency Fix potential state machine inconsistencies caused by the non-persisted `expire_cursor` field. While inconsistencies were rare, the transient state could cause meta-service nodes to have divergent views of the expiration processing state. Root cause: The `expire_cursor` was maintained as transient state outside the replicated state machine, making it possible for nodes to have different cursor positions after restarts or network partitions. Solution: Remove the non-persisted `expire_cursor` entirely to ensure all state machine operations are fully deterministic and consistently replicated across all meta-service nodes. This change eliminates the race condition and guarantees state machine consistency regardless of node restart patterns or network conditions.
1 parent 820d63b commit c45b4b9

File tree

7 files changed

+158
-81
lines changed

7 files changed

+158
-81
lines changed

src/meta/raft-store/src/applier/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -688,8 +688,6 @@ where SM: StateMachineApi + 'static
688688
}
689689
}
690690

691-
self.sm.update_expire_cursor(log_time_ms);
692-
693691
Ok(())
694692
}
695693

src/meta/raft-store/src/sm_v003/sm_v003.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,6 @@ pub struct SMV003 {
4646
impl StateMachineApi for SMV003 {
4747
type Map = LeveledMap;
4848

49-
fn get_expire_cursor(&self) -> ExpireKey {
50-
self.expire_cursor
51-
}
52-
53-
fn set_expire_cursor(&mut self, cursor: ExpireKey) {
54-
self.expire_cursor = cursor;
55-
}
56-
5749
fn map_ref(&self) -> &Self::Map {
5850
&self.levels
5951
}

src/meta/raft-store/src/sm_v003/sm_v003_test.rs

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use databend_common_meta_types::seq_value::SeqV;
1616
use databend_common_meta_types::seq_value::SeqValue;
17+
use databend_common_meta_types::CmdContext;
1718
use databend_common_meta_types::UpsertKV;
1819
use futures_util::TryStreamExt;
1920
use map_api::map_api_ro::MapApiRO;
@@ -137,26 +138,6 @@ async fn test_two_level_upsert_get_range() -> anyhow::Result<()> {
137138
Ok(())
138139
}
139140

140-
#[tokio::test]
141-
async fn test_update_expire_index() -> anyhow::Result<()> {
142-
let mut sm = SMV003::default();
143-
144-
sm.update_expire_cursor(1);
145-
assert_eq!(sm.get_expire_cursor(), ExpireKey::new(1, 0));
146-
147-
sm.update_expire_cursor(2);
148-
assert_eq!(sm.get_expire_cursor(), ExpireKey::new(2, 0));
149-
150-
sm.update_expire_cursor(1);
151-
assert_eq!(
152-
sm.get_expire_cursor(),
153-
ExpireKey::new(2, 0),
154-
"expire cursor can not go back"
155-
);
156-
157-
Ok(())
158-
}
159-
160141
/// The subscript is internal_seq:
161142
///
162143
/// | kv | expire
@@ -221,7 +202,7 @@ async fn test_internal_expire_index() -> anyhow::Result<()> {
221202

222203
#[tokio::test]
223204
async fn test_list_expire_index() -> anyhow::Result<()> {
224-
let mut sm = build_sm_with_expire().await?;
205+
let sm = build_sm_with_expire().await?;
225206

226207
let curr_time_ms = 5000;
227208
let got = sm
@@ -251,15 +232,14 @@ async fn test_list_expire_index() -> anyhow::Result<()> {
251232
(ExpireKey::new(20000, 3), s("c")),
252233
]);
253234

254-
sm.update_expire_cursor(15000);
255-
256235
let curr_time_ms = 20_001;
257236
let got = sm
258237
.list_expire_index(curr_time_ms)
259238
.await?
260239
.try_collect::<Vec<_>>()
261240
.await?;
262241
assert_eq!(got, vec![
242+
(ExpireKey::new(5000, 2), s("b")),
263243
(ExpireKey::new(15000, 4), s("a")),
264244
(ExpireKey::new(20000, 3), s("c")),
265245
]);
@@ -270,9 +250,8 @@ async fn test_list_expire_index() -> anyhow::Result<()> {
270250
async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> {
271251
let mut sm = build_sm_with_expire().await?;
272252

273-
sm.update_expire_cursor(15_000);
274-
275253
let mut a = sm.new_applier();
254+
a.cmd_ctx = CmdContext::from_millis(15_000);
276255

277256
// Inserting an expired entry will delete it.
278257
a.upsert_kv(&UpsertKV::update("a", b"a1").with_expire_sec(10))
@@ -290,7 +269,7 @@ async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> {
290269
.await?
291270
.try_collect::<Vec<_>>()
292271
.await?;
293-
assert!(got.is_empty());
272+
assert_eq!(got, vec![(ExpireKey::new(5_000, 2), s("b")),]);
294273

295274
let got = sm
296275
.list_expire_index(20_001)
@@ -299,6 +278,7 @@ async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> {
299278
.await?;
300279
assert_eq!(got, vec![
301280
//
281+
(ExpireKey::new(5_000, 2), s("b")),
302282
(ExpireKey::new(20_000, 3), s("c")),
303283
]);
304284

src/meta/raft-store/src/state_machine_api.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,6 @@ where T: SMEventSender
5050
pub trait StateMachineApi: Send + Sync {
5151
type Map: MapApi<String, KVMeta> + MapApi<ExpireKey, KVMeta> + 'static;
5252

53-
/// Returns the current expire key cursor position.
54-
///
55-
/// The expiry key cursor marks a boundary in the key space:
56-
/// - All keys before this cursor (exclusive) have already been processed and deleted
57-
/// - This cursor position is used to track progress when incrementally cleaning up expired keys
58-
fn get_expire_cursor(&self) -> ExpireKey;
59-
60-
/// Updates the expiry key cursor position.
61-
///
62-
/// This method is called after a batch of expired keys have been processed and deleted.
63-
/// The new cursor position indicates that all keys before it (exclusive) have been
64-
/// successfully cleaned up.
65-
fn set_expire_cursor(&mut self, cursor: ExpireKey);
66-
6753
/// Returns a reference to the map that stores application data.
6854
///
6955
/// This method provides read-only access to the underlying key-value store

src/meta/raft-store/src/state_machine_api_ext.rs

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,12 @@ pub trait StateMachineApiExt: StateMachineApi {
7373
};
7474

7575
let expire_ms = kv_meta.expires_at_ms();
76-
if expire_ms < self.get_expire_cursor().time_ms {
76+
let curr_time_ms = cmd_ctx.time().millis();
77+
if expire_ms < curr_time_ms {
7778
warn!(
78-
"upsert_kv_primary_index: expired key inserted: {} < expire-cursor: {}; key: {}",
79+
"upsert_kv_primary_index: expired key inserted: {} < timestamp in log entry: {}; key: {}",
7980
Duration::from_millis(expire_ms).display_unix_timestamp_short(),
80-
Duration::from_millis(self.get_expire_cursor().time_ms)
81+
Duration::from_millis(curr_time_ms)
8182
.display_unix_timestamp_short(),
8283
upsert_kv.key
8384
);
@@ -177,42 +178,21 @@ pub trait StateMachineApiExt: StateMachineApi {
177178
&self,
178179
curr_time_ms: u64,
179180
) -> Result<IOResultStream<(ExpireKey, String)>, io::Error> {
180-
let start = self.get_expire_cursor();
181-
182181
// curr_time > expire_at => expired
183182
let end = ExpireKey::new(curr_time_ms, 0);
184183

185-
// There is chance the raft leader produce smaller timestamp for later logs.
186-
if start >= end {
187-
return Ok(futures::stream::empty().boxed());
188-
}
189-
190-
let strm = self.map_ref().expire_map().range(start..end).await?;
184+
let strm = self.map_ref().expire_map().range(..end).await?;
191185

192-
let strm =
193-
add_cooperative_yielding(strm, format!("list_expire_index since {start} to {end}"))
194-
// Return only non-deleted records
195-
.try_filter_map(|(k, marked)| {
196-
let expire_entry = marked.unpack().map(|(v, _v_meta)| (k, v));
197-
future::ready(Ok(expire_entry))
198-
});
186+
let strm = add_cooperative_yielding(strm, format!("list_expire_index up to {end}"))
187+
// Return only non-deleted records
188+
.try_filter_map(|(k, marked)| {
189+
let expire_entry = marked.unpack().map(|(v, _v_meta)| (k, v));
190+
future::ready(Ok(expire_entry))
191+
});
199192

200193
Ok(strm.boxed())
201194
}
202195

203-
fn update_expire_cursor(&mut self, log_time_ms: u64) {
204-
if log_time_ms < self.get_expire_cursor().time_ms {
205-
warn!(
206-
"update_last_cleaned: log_time_ms {} < last_cleaned_expire.time_ms {}",
207-
log_time_ms,
208-
self.get_expire_cursor().time_ms
209-
);
210-
return;
211-
}
212-
213-
self.set_expire_cursor(ExpireKey::new(log_time_ms, 0));
214-
}
215-
216196
/// Get a cloned value by key.
217197
///
218198
/// It does not check expiration of the returned entry.

src/meta/service/tests/it/meta_node/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ pub(crate) mod meta_node_kv_api_expire;
1717
pub(crate) mod meta_node_lifecycle;
1818
pub(crate) mod meta_node_replication;
1919
pub(crate) mod meta_node_request_forwarding;
20+
pub(crate) mod t90_time_revert_cross_snapshot_boundary;
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::Duration;
16+
17+
use databend_common_meta_kvapi::kvapi::KVApi;
18+
use databend_common_meta_types::raft_types::new_log_id;
19+
use databend_common_meta_types::Cmd;
20+
use databend_common_meta_types::LogEntry;
21+
use databend_common_meta_types::SeqV;
22+
use databend_common_meta_types::TxnOp;
23+
use databend_common_meta_types::TxnRequest;
24+
use databend_meta::meta_service::MetaNode;
25+
use log::info;
26+
use maplit::btreeset;
27+
use test_harness::test;
28+
use tokio::time::sleep;
29+
30+
use crate::testing::meta_service_test_harness;
31+
use crate::tests::meta_node::start_meta_node_cluster;
32+
use crate::tests::meta_node::timeout;
33+
34+
/// Tests timestamp-based expiration consistency across snapshot boundaries.
35+
///
36+
/// **Scenario:**
37+
/// - Log-1 has timestamp T+180s, included in snapshot
38+
/// - Log-2 has timestamp T+60s with expiration T+120s, not in snapshot
39+
///
40+
/// **Issue:** After restart, Log-2 uses current time instead of last-seen time (from Log-1)
41+
/// for expiration checks, potentially causing state inconsistencies between nodes.
42+
///
43+
/// This bug raises in real world if:
44+
/// - When meta-service restarts, there is one log(`A`) to re-apply that has smaller `expire_at`(`t1`)
45+
/// than the biggest timestamp(`LogEntry.time_ms`) in the snapshot,
46+
/// - and all the log entries before `A` have smaller timestamp(`LogEntry.time_ms`) than `t1`.
47+
#[test(harness = meta_service_test_harness)]
48+
#[fastrace::trace]
49+
async fn test_meta_node_log_time_revert_cross_snapshot_boundary() -> anyhow::Result<()> {
50+
let now_ms = SeqV::<()>::now_ms();
51+
52+
// Log with later timestamp (T+180s) - will be included in snapshot
53+
let log_later = LogEntry {
54+
txid: None,
55+
time_ms: Some(now_ms + 180_000),
56+
cmd: Cmd::Transaction(TxnRequest::new(vec![], vec![TxnOp::put(
57+
"k1",
58+
b"v1".to_vec(),
59+
)])),
60+
};
61+
62+
// Log with earlier timestamp (T+60s) but expires at T+120s - not in snapshot
63+
let log_earlier = LogEntry {
64+
txid: None,
65+
time_ms: Some(now_ms + 60_000),
66+
cmd: Cmd::Transaction(TxnRequest::new(vec![], vec![TxnOp::put(
67+
"k1",
68+
b"v2".to_vec(),
69+
)
70+
.with_expires_at_ms(Some(now_ms + 120_000))])),
71+
};
72+
73+
let result_with_restart = write_two_logs(log_later.clone(), log_earlier.clone(), true).await?;
74+
let result_without_restart = write_two_logs(log_later, log_earlier, false).await?;
75+
76+
assert_eq!(
77+
result_with_restart.is_some(),
78+
result_without_restart.is_some()
79+
);
80+
81+
Ok(())
82+
}
83+
84+
/// Writes two logs with optional snapshot and restart between them.
85+
///
86+
/// Tests how state machine behavior differs when last-seen time context is lost
87+
/// during restart. Returns the final value of key "k1".
88+
async fn write_two_logs(
89+
first_log: LogEntry,
90+
second_log: LogEntry,
91+
restart: bool,
92+
) -> anyhow::Result<Option<SeqV>> {
93+
info!("Testing log sequence with restart={}", restart);
94+
95+
let (mut log_index, mut test_contexts) =
96+
start_meta_node_cluster(btreeset![0], btreeset![]).await?;
97+
98+
let mut tc0 = test_contexts.remove(0);
99+
let mut meta_node = tc0.meta_node.take().unwrap();
100+
101+
// Apply first log
102+
meta_node.raft.client_write(first_log).await?;
103+
log_index += 1;
104+
105+
// Restart node with snapshot if requested
106+
if restart {
107+
info!("Taking snapshot at log_index={}", log_index);
108+
meta_node.raft.trigger().snapshot().await?;
109+
meta_node
110+
.raft
111+
.wait(timeout())
112+
.snapshot(new_log_id(1, 0, log_index), "purged")
113+
.await?;
114+
115+
info!("Restarting meta-node");
116+
meta_node.stop().await?;
117+
drop(meta_node);
118+
119+
sleep(Duration::from_secs(2)).await;
120+
121+
meta_node = MetaNode::open(&tc0.config.raft_config).await?;
122+
}
123+
124+
// Apply second log
125+
meta_node.raft.client_write(second_log).await?;
126+
127+
// Check final state
128+
let result = meta_node
129+
.raft_store
130+
.state_machine
131+
.read()
132+
.await
133+
.kv_api()
134+
.get_kv("k1")
135+
.await?;
136+
137+
info!("Final state for k1 (restart={}): {:?}", restart, result);
138+
139+
Ok(result)
140+
}

0 commit comments

Comments
 (0)