Skip to content

Commit 79d8f07

Browse files
authored
feat(meta-service): Add millisecond precision for expiration times (#18303)
* feat(meta-service): Add millisecond precision for expiration times Extend expiration time support to accept both seconds and milliseconds while maintaining backward compatibility. The meta-service now stores both time formats in the existing `expire_at` field and automatically detects the format based on value magnitude. Detection logic: - Values > 100_000_000_000 are treated as milliseconds - Values โ‰ค 100_000_000_000 are treated as seconds This threshold corresponds to timestamps after ~1973, ensuring reliable differentiation between the two formats while supporting both legacy second-based expiration times and new millisecond-precision requirements. This is a non-breaking change that maintains full backward compatibility with existing configurations using second-based expiration times. * M Cargo.lock * M src/meta/types/src/cmd/meta_spec.rs * M src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs * M src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs * M src/meta/service/tests/it/grpc/metasrv_grpc_kv_api_restart_cluster.rs * M src/meta/types/src/proto_display/seqv_display.rs * M src/meta/types/src/proto_ext/seq_v_ext.rs * M src/meta/types/src/proto_ext/watch_ext/event_ext.rs * fix tests
1 parent 250fcfc commit 79d8f07

File tree

23 files changed

+649
-131
lines changed

23 files changed

+649
-131
lines changed

โ€ŽCargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/meta/client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ tonic = { workspace = true }
4141
[dev-dependencies]
4242
anyhow = { workspace = true }
4343
databend-common-exception = { workspace = true }
44+
pretty_assertions = { workspace = true }
4445
rand = { workspace = true }
4546

4647
[lints]

โ€Žsrc/meta/client/src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,17 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
150150
/// - 2025-06-11: since 1.2.756
151151
/// ๐Ÿ–ฅ server: add `TxnPutResponse::current`
152152
///
153-
/// - 2025-06-24: since TODO: add when merge
153+
/// - 2025-06-24: since 1.2.764
154154
/// ๐Ÿ–ฅ server: add `FetchAddU64` operation to the `TxnOp`
155155
///
156-
/// - 2025-06-26: since TODO: add when merge
156+
/// - 2025-06-26: since 1.2.764
157157
/// ๐Ÿ–ฅ server: add `FetchAddU64.match_seq`
158158
///
159-
/// - 2025-07-01: since TODO: add when enables sequence v1
160-
/// ๐Ÿ‘ฅ client: new sequence API depends on `FetchAddU64`.
159+
/// - 2025-07-01: since TODO: add when enables sequence storage v1
160+
/// ๐Ÿ‘ฅ client: new sequence API v1: depends on `FetchAddU64`.
161+
///
162+
/// - 2025-07-03: since TODO: add when merged
163+
/// ๐Ÿ–ฅ server: adaptive `expire_at` support both seconds and milliseconds.
161164
///
162165
/// Server feature set:
163166
/// ```yaml

โ€Žsrc/meta/client/src/required.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,23 @@ pub type Features = BTreeMap<&'static str, VersionTuple>;
2626
pub mod features {
2727
use crate::FeatureSpec;
2828

29-
pub const KV_API: FeatureSpec = ("kv_api", (1, 2, 259));
30-
pub const KV_READ_V1: FeatureSpec = ("kv_read_v1", (1, 2, 259));
31-
pub const TRANSACTION: FeatureSpec = ("transaction", (1, 2, 259));
32-
pub const EXPORT: FeatureSpec = ("export", (1, 2, 259));
29+
pub const KV_API: FeatureSpec = ("kv_api", (1, 2, 259));
30+
pub const KV_READ_V1: FeatureSpec = ("kv_read_v1", (1, 2, 259));
31+
pub const TRANSACTION: FeatureSpec = ("transaction", (1, 2, 259));
32+
pub const EXPORT: FeatureSpec = ("export", (1, 2, 259));
3333
// `export_v1` is not a required feature because the client fallback to `export` if `export_v1` is not available.
34-
pub const EXPORT_V1: FeatureSpec = ("export_v1", (1, 2, 315));
35-
pub const WATCH: FeatureSpec = ("watch", (1, 2, 259));
36-
pub const WATCH_INITIAL_FLUSH: FeatureSpec = ("watch/initial_flush", (1, 2, 677));
34+
pub const EXPORT_V1: FeatureSpec = ("export_v1", (1, 2, 315));
35+
pub const WATCH: FeatureSpec = ("watch", (1, 2, 259));
36+
pub const WATCH_INITIAL_FLUSH: FeatureSpec = ("watch/initial_flush", (1, 2, 677));
3737
/// WatchResponse contains a flag to indicate whether the event is initialization event or change event.
38-
pub const WATCH_INIT_FLAG: FeatureSpec = ("watch/init_flag", (1, 2, 736));
39-
pub const MEMBER_LIST: FeatureSpec = ("member_list", (1, 2, 259));
40-
pub const GET_CLUSTER_STATUS: FeatureSpec = ("get_cluster_status", (1, 2, 259));
41-
pub const GET_CLIENT_INFO: FeatureSpec = ("get_client_info", (1, 2, 259));
38+
pub const WATCH_INIT_FLAG: FeatureSpec = ("watch/init_flag", (1, 2, 736));
39+
pub const MEMBER_LIST: FeatureSpec = ("member_list", (1, 2, 259));
40+
pub const GET_CLUSTER_STATUS: FeatureSpec = ("get_cluster_status", (1, 2, 259));
41+
pub const GET_CLIENT_INFO: FeatureSpec = ("get_client_info", (1, 2, 259));
42+
/// TxnPutResponse contains the `current` state of the key after the put operation.
43+
pub const PUT_RESPONSE_CURRENT: FeatureSpec = ("put_response/current", (1, 2, 756));
44+
/// Txn support a FetchAdd operation for json encoded u64 values.
45+
pub const FETCH_ADD_U64: FeatureSpec = ("fetch_add_u64", (1, 2, 764));
4246
}
4347

4448
/// All features that are ever defined and their least required server version.
@@ -56,6 +60,8 @@ pub fn all() -> &'static [FeatureSpec] {
5660
features::MEMBER_LIST,
5761
features::GET_CLUSTER_STATUS,
5862
features::GET_CLIENT_INFO,
63+
features::PUT_RESPONSE_CURRENT,
64+
features::FETCH_ADD_U64,
5965
];
6066

6167
REQUIRES

โ€Žsrc/meta/kvapi-test-suite/src/test_suite.rs

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use fastrace::func_name;
4848
use fastrace::func_path;
4949
use log::debug;
5050
use log::info;
51+
use tokio::time::sleep;
5152

5253
pub struct TestSuite {}
5354

@@ -86,6 +87,7 @@ impl TestSuite {
8687
self.kv_delete(&builder.build().await).await?;
8788
self.kv_update(&builder.build().await).await?;
8889
self.kv_timeout(&builder.build().await).await?;
90+
self.kv_expire_sec_or_ms(&builder.build().await).await?;
8991
self.kv_upsert_with_ttl(&builder.build().await).await?;
9092
self.kv_meta(&builder.build().await).await?;
9193
self.kv_list(&builder.build().await).await?;
@@ -244,7 +246,7 @@ impl TestSuite {
244246

245247
#[fastrace::trace]
246248
pub async fn kv_timeout<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
247-
info!("--- kvapi::KVApiTestSuite::kv_timeout() start");
249+
info!("--- {} start", func_name!());
248250

249251
// - Test get expired and non-expired.
250252
// - Test mget expired and non-expired.
@@ -307,7 +309,7 @@ impl TestSuite {
307309
assert_eq!(v2.seq, 3);
308310
assert_eq!(v2.data, b("v2"));
309311
let v2_meta = v2.meta.unwrap();
310-
let expire_at_sec = v2_meta.get_expire_at_ms().unwrap() / 1000;
312+
let expire_at_sec = v2_meta.expires_at_sec_opt().unwrap();
311313
let want = now_sec + 10;
312314
assert!((want..want + 2).contains(&expire_at_sec));
313315
}
@@ -337,6 +339,80 @@ impl TestSuite {
337339
Ok(())
338340
}
339341

342+
/// Test expire time in seconds or milliseconds.
343+
#[fastrace::trace]
344+
pub async fn kv_expire_sec_or_ms<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
345+
info!("--- {} start", func_name!());
346+
347+
// 100_000_000_000 is the max value in seconds.
348+
kv.upsert_kv(UpsertKV::update("k1", b"v1").with(MetaSpec::new_expire(100_000_000_000)))
349+
.await?;
350+
// dbg!("upsert non expired k1", _res);
351+
352+
info!("--- get unexpired");
353+
{
354+
let res = kv.get_kv("k1").await?;
355+
let meta = res.unwrap().meta.unwrap();
356+
assert_eq!(meta.expires_at_sec_opt(), Some(100_000_000_000));
357+
}
358+
359+
info!("--- expired in milliseconds");
360+
{
361+
let now_sec = SeqV::<()>::now_sec();
362+
363+
// expire time in milliseconds
364+
kv.upsert_kv(
365+
UpsertKV::update("k1", b"v1").with(MetaSpec::new_expire((now_sec - 2) * 1000)),
366+
)
367+
.await?;
368+
369+
let res = kv.get_kv("k1").await?;
370+
assert!(res.is_none(), "expired");
371+
}
372+
373+
info!("--- unexpired in milliseconds");
374+
{
375+
// 2035-01-04 18:12:23
376+
let millis = 2051518343000;
377+
378+
// expire time in milliseconds
379+
kv.upsert_kv(UpsertKV::update("k1", b"v1").with(MetaSpec::new_expire(millis)))
380+
.await?;
381+
382+
let res = kv.get_kv("k1").await?;
383+
let meta = res.unwrap().meta.unwrap();
384+
assert_eq!(meta.get_expire_at_ms(), Some(millis));
385+
}
386+
387+
info!("--- ttl in milliseconds: 1 sec");
388+
{
389+
kv.upsert_kv(
390+
UpsertKV::update("k1", b"v1").with(MetaSpec::new_ttl(Duration::from_secs(1))),
391+
)
392+
.await?;
393+
394+
sleep(Duration::from_millis(2_000)).await;
395+
396+
let res = kv.get_kv("k1").await?;
397+
assert!(res.is_none());
398+
}
399+
400+
info!("--- ttl in milliseconds: 5 sec");
401+
{
402+
kv.upsert_kv(
403+
UpsertKV::update("k1", b"v1").with(MetaSpec::new_ttl(Duration::from_secs(5))),
404+
)
405+
.await?;
406+
407+
sleep(Duration::from_millis(2_000)).await;
408+
409+
let res = kv.get_kv("k1").await?;
410+
assert!(res.is_some());
411+
}
412+
413+
Ok(())
414+
}
415+
340416
#[fastrace::trace]
341417
pub async fn kv_upsert_with_ttl<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
342418
// - Add with ttl
@@ -409,7 +485,7 @@ impl TestSuite {
409485
assert_eq!(res.data, b("v1"));
410486

411487
let meta = res.meta.unwrap();
412-
let expire_at_sec = meta.get_expire_at_ms().unwrap() / 1000;
488+
let expire_at_sec = meta.expires_at_sec_opt().unwrap();
413489
let want = now_sec + 20;
414490
assert!((want..want + 2).contains(&expire_at_sec));
415491
}
@@ -425,7 +501,7 @@ impl TestSuite {
425501
assert_eq!(res.data, b("v1"));
426502

427503
let meta = res.meta.unwrap();
428-
let expire_at_sec = meta.get_expire_at_ms().unwrap() / 1000;
504+
let expire_at_sec = meta.expires_at_sec_opt().unwrap();
429505
let want = now_sec + 20;
430506
assert!((want..want + 2).contains(&expire_at_sec));
431507
}

โ€Žsrc/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -379,18 +379,18 @@ async fn build_2_level_with_meta() -> anyhow::Result<LeveledMap> {
379379

380380
// internal_seq: 0
381381
l.str_map_mut()
382-
.set(s("a"), Some((b("a0"), Some(KVMeta::new_expire(1)))))
382+
.set(s("a"), Some((b("a0"), Some(KVMeta::new_expires_at(1)))))
383383
.await?;
384384
l.str_map_mut().set(s("b"), Some((b("b0"), None))).await?;
385385
l.str_map_mut()
386-
.set(s("c"), Some((b("c0"), Some(KVMeta::new_expire(2)))))
386+
.set(s("c"), Some((b("c0"), Some(KVMeta::new_expires_at(2)))))
387387
.await?;
388388

389389
l.freeze_writable();
390390

391391
// internal_seq: 3
392392
l.str_map_mut()
393-
.set(s("b"), Some((b("b1"), Some(KVMeta::new_expire(10)))))
393+
.set(s("b"), Some((b("b1"), Some(KVMeta::new_expires_at(10)))))
394394
.await?;
395395
l.str_map_mut().set(s("c"), Some((b("c1"), None))).await?;
396396

@@ -455,17 +455,17 @@ async fn test_two_level_update_value() -> anyhow::Result<()> {
455455
let (prev, result) = MapApiExt::upsert_value(&mut l, s("a"), b("a1")).await?;
456456
assert_eq!(
457457
prev,
458-
Marked::new_with_meta(1, b("a0"), Some(KVMeta::new_expire(1)))
458+
Marked::new_with_meta(1, b("a0"), Some(KVMeta::new_expires_at(1)))
459459
);
460460
assert_eq!(
461461
result,
462-
Marked::new_with_meta(6, b("a1"), Some(KVMeta::new_expire(1)))
462+
Marked::new_with_meta(6, b("a1"), Some(KVMeta::new_expires_at(1)))
463463
);
464464

465465
let got = l.str_map().get(&s("a")).await?;
466466
assert_eq!(
467467
got,
468-
Marked::new_with_meta(6, b("a1"), Some(KVMeta::new_expire(1)))
468+
Marked::new_with_meta(6, b("a1"), Some(KVMeta::new_expires_at(1)))
469469
);
470470
}
471471

@@ -476,17 +476,17 @@ async fn test_two_level_update_value() -> anyhow::Result<()> {
476476
let (prev, result) = MapApiExt::upsert_value(&mut l, s("b"), b("x1")).await?;
477477
assert_eq!(
478478
prev,
479-
Marked::new_normal(4, b("b1")).with_meta(Some(KVMeta::new_expire(10)))
479+
Marked::new_normal(4, b("b1")).with_meta(Some(KVMeta::new_expires_at(10)))
480480
);
481481
assert_eq!(
482482
result,
483-
Marked::new_normal(6, b("x1")).with_meta(Some(KVMeta::new_expire(10)))
483+
Marked::new_normal(6, b("x1")).with_meta(Some(KVMeta::new_expires_at(10)))
484484
);
485485

486486
let got = l.str_map().get(&s("b")).await?;
487487
assert_eq!(
488488
got,
489-
Marked::new_normal(6, b("x1")).with_meta(Some(KVMeta::new_expire(10)))
489+
Marked::new_normal(6, b("x1")).with_meta(Some(KVMeta::new_expires_at(10)))
490490
);
491491
}
492492

@@ -512,20 +512,20 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> {
512512
let mut l = build_2_level_with_meta().await?;
513513

514514
let (prev, result) =
515-
MapApiExt::update_meta(&mut l, s("a"), Some(KVMeta::new_expire(2))).await?;
515+
MapApiExt::update_meta(&mut l, s("a"), Some(KVMeta::new_expires_at(2))).await?;
516516
assert_eq!(
517517
prev,
518-
Marked::new_with_meta(1, b("a0"), Some(KVMeta::new_expire(1)))
518+
Marked::new_with_meta(1, b("a0"), Some(KVMeta::new_expires_at(1)))
519519
);
520520
assert_eq!(
521521
result,
522-
Marked::new_with_meta(6, b("a0"), Some(KVMeta::new_expire(2)))
522+
Marked::new_with_meta(6, b("a0"), Some(KVMeta::new_expires_at(2)))
523523
);
524524

525525
let got = l.str_map().get(&s("a")).await?;
526526
assert_eq!(
527527
got,
528-
Marked::new_with_meta(6, b("a0"), Some(KVMeta::new_expire(2)))
528+
Marked::new_with_meta(6, b("a0"), Some(KVMeta::new_expires_at(2)))
529529
);
530530
}
531531

@@ -536,7 +536,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> {
536536
let (prev, result) = MapApiExt::update_meta(&mut l, s("b"), None).await?;
537537
assert_eq!(
538538
prev,
539-
Marked::new_with_meta(4, b("b1"), Some(KVMeta::new_expire(10)))
539+
Marked::new_with_meta(4, b("b1"), Some(KVMeta::new_expires_at(10)))
540540
);
541541
assert_eq!(result, Marked::new_with_meta(6, b("b1"), None));
542542

@@ -549,17 +549,17 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> {
549549
let mut l = build_2_level_with_meta().await?;
550550

551551
let (prev, result) =
552-
MapApiExt::update_meta(&mut l, s("c"), Some(KVMeta::new_expire(20))).await?;
552+
MapApiExt::update_meta(&mut l, s("c"), Some(KVMeta::new_expires_at(20))).await?;
553553
assert_eq!(prev, Marked::new_with_meta(5, b("c1"), None));
554554
assert_eq!(
555555
result,
556-
Marked::new_normal(6, b("c1")).with_meta(Some(KVMeta::new_expire(20)))
556+
Marked::new_normal(6, b("c1")).with_meta(Some(KVMeta::new_expires_at(20)))
557557
);
558558

559559
let got = l.str_map().get(&s("c")).await?;
560560
assert_eq!(
561561
got,
562-
Marked::new_normal(6, b("c1")).with_meta(Some(KVMeta::new_expire(20)))
562+
Marked::new_normal(6, b("c1")).with_meta(Some(KVMeta::new_expires_at(20)))
563563
);
564564
}
565565

@@ -568,7 +568,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> {
568568
let mut l = build_2_level_with_meta().await?;
569569

570570
let (prev, result) =
571-
MapApiExt::update_meta(&mut l, s("d"), Some(KVMeta::new_expire(2))).await?;
571+
MapApiExt::update_meta(&mut l, s("d"), Some(KVMeta::new_expires_at(2))).await?;
572572
assert_eq!(prev, Marked::new_tombstone(0));
573573
assert_eq!(result, Marked::new_tombstone(0));
574574

โ€Žsrc/meta/raft-store/src/leveled_store/rotbl_seq_mark_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ mod tests {
133133
);
134134

135135
t_string_try_from(
136-
Marked::<String>::new_with_meta(1, s("hello"), Some(KVMeta::new_expire(20))),
136+
Marked::<String>::new_with_meta(1, s("hello"), Some(KVMeta::new_expires_at(20))),
137137
SeqMarked::new_normal(1, b("\x01\x10{\"expire_at\":20}\x05hello")),
138138
);
139139

@@ -160,7 +160,7 @@ mod tests {
160160
);
161161

162162
t_try_from(
163-
Marked::new_with_meta(1, b("hello"), Some(KVMeta::new_expire(20))),
163+
Marked::new_with_meta(1, b("hello"), Some(KVMeta::new_expires_at(20))),
164164
SeqMarked::new_normal(1, b("\x01\x10{\"expire_at\":20}\x05hello")),
165165
);
166166

โ€Žsrc/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ async fn test_compact_expire_index() -> anyhow::Result<()> {
101101
//
102102
(
103103
s("a"),
104-
Marked::new_with_meta(4, b("a1"), Some(KVMeta::new_expire(15)))
104+
Marked::new_with_meta(4, b("a1"), Some(KVMeta::new_expires_at(15)))
105105
),
106106
(
107107
s("b"),
108-
Marked::new_with_meta(2, b("b0"), Some(KVMeta::new_expire(5)))
108+
Marked::new_with_meta(2, b("b0"), Some(KVMeta::new_expires_at(5)))
109109
),
110110
(
111111
s("c"),
112-
Marked::new_with_meta(3, b("c0"), Some(KVMeta::new_expire(20)))
112+
Marked::new_with_meta(3, b("c0"), Some(KVMeta::new_expires_at(20)))
113113
),
114114
]);
115115

0 commit comments

Comments
ย (0)