Skip to content

Commit 97277ad

Browse files
authored
refactor: add right bound when listing expired keys to avoid unnecessary data copy (#15334)
1 parent 6abee1b commit 97277ad

File tree

4 files changed

+62
-7
lines changed

4 files changed

+62
-7
lines changed

src/meta/raft-store/src/applier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ impl<'a> Applier<'a> {
469469
info!("to clean expired kvs, log_time_ts: {}", log_time_ms);
470470

471471
let mut to_clean = vec![];
472-
let mut strm = self.sm.list_expire_index().await?;
472+
let mut strm = self.sm.list_expire_index(log_time_ms).await?;
473473

474474
{
475475
let mut strm = std::pin::pin!(strm);

src/meta/raft-store/src/sm_v002/leveled_store/level.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::ops::RangeBounds;
1919

2020
use databend_common_meta_types::KVMeta;
2121
use futures_util::StreamExt;
22+
use log::warn;
2223

2324
use crate::sm_v002::leveled_store::map_api::AsMap;
2425
use crate::sm_v002::leveled_store::map_api::KVResultStream;
@@ -109,6 +110,13 @@ impl MapApiRO<String> for Level {
109110
.map(|(k, v)| (k.clone(), v.clone()))
110111
.collect::<Vec<_>>();
111112

113+
if vec.len() > 1000 {
114+
warn!(
115+
"Level::<ExpireKey>::range() returns big range of len={}",
116+
vec.len()
117+
);
118+
}
119+
112120
let strm = futures::stream::iter(vec).map(Ok).boxed();
113121
Ok(strm)
114122
}
@@ -159,6 +167,13 @@ impl MapApiRO<ExpireKey> for Level {
159167
.map(|(k, v)| (*k, v.clone()))
160168
.collect::<Vec<_>>();
161169

170+
if vec.len() > 1000 {
171+
warn!(
172+
"Level::<ExpireKey>::range() returns big range of len={}",
173+
vec.len()
174+
);
175+
}
176+
162177
let strm = futures::stream::iter(vec).map(Ok).boxed();
163178
Ok(strm)
164179
}

src/meta/raft-store/src/sm_v002/sm_v002.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,12 +299,26 @@ impl SMV002 {
299299
self.expire_cursor = ExpireKey::new(log_time_ms, 0);
300300
}
301301

302-
/// List expiration index by expiration time.
302+
/// List expiration index by expiration time,
303+
/// upto current time(exclusive) in milli seconds.
304+
///
305+
/// Only records with expire time less than current time will be returned.
306+
/// Expire time that equals to current time is not considered expired.
303307
pub(crate) async fn list_expire_index(
304308
&self,
309+
curr_time_ms: u64,
305310
) -> Result<impl Stream<Item = Result<(ExpireKey, String), io::Error>> + '_, io::Error> {
306311
let start = self.expire_cursor;
307-
let strm = self.levels.expire_map().range(start..).await?;
312+
313+
// curr_time > expire_at => expired
314+
let end = ExpireKey::new(curr_time_ms, 0);
315+
316+
// There is chance the raft leader produce smaller timestamp for later logs.
317+
if start >= end {
318+
return Ok(futures::stream::empty().boxed());
319+
}
320+
321+
let strm = self.levels.expire_map().range(start..end).await?;
308322

309323
let strm = strm
310324
// Return only non-deleted records
@@ -313,7 +327,7 @@ impl SMV002 {
313327
future::ready(Ok(expire_entry))
314328
});
315329

316-
Ok(strm)
330+
Ok(strm.boxed())
317331
}
318332

319333
pub fn sys_data_ref(&self) -> &SysData {

src/meta/raft-store/src/sm_v002/sm_v002_test.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,25 @@ async fn test_internal_expire_index() -> anyhow::Result<()> {
221221
async fn test_list_expire_index() -> anyhow::Result<()> {
222222
let mut sm = build_sm_with_expire().await?;
223223

224+
let curr_time_ms = 5000;
224225
let got = sm
225-
.list_expire_index()
226+
.list_expire_index(curr_time_ms)
227+
.await?
228+
.try_collect::<Vec<_>>()
229+
.await?;
230+
assert!(got.is_empty());
231+
232+
let curr_time_ms = 5001;
233+
let got = sm
234+
.list_expire_index(curr_time_ms)
235+
.await?
236+
.try_collect::<Vec<_>>()
237+
.await?;
238+
assert_eq!(got, vec![(ExpireKey::new(5000, 2), s("b")),]);
239+
240+
let curr_time_ms = 20_001;
241+
let got = sm
242+
.list_expire_index(curr_time_ms)
226243
.await?
227244
.try_collect::<Vec<_>>()
228245
.await?;
@@ -234,8 +251,9 @@ async fn test_list_expire_index() -> anyhow::Result<()> {
234251

235252
sm.update_expire_cursor(15000);
236253

254+
let curr_time_ms = 20_001;
237255
let got = sm
238-
.list_expire_index()
256+
.list_expire_index(curr_time_ms)
239257
.await?
240258
.try_collect::<Vec<_>>()
241259
.await?;
@@ -260,8 +278,16 @@ async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> {
260278

261279
assert_eq!(sm.get_maybe_expired_kv("a").await?, None, "a is expired");
262280

281+
// List until 20_000 ms
282+
let got = sm
283+
.list_expire_index(20_000)
284+
.await?
285+
.try_collect::<Vec<_>>()
286+
.await?;
287+
assert!(got.is_empty());
288+
263289
let got = sm
264-
.list_expire_index()
290+
.list_expire_index(20_001)
265291
.await?
266292
.try_collect::<Vec<_>>()
267293
.await?;

0 commit comments

Comments
 (0)