Skip to content

Commit 6533304

Browse files
authored
feat(meta): Add conflict-free sequence generator v1 (#18273)
Introduce a new conflict-free sequence generator (storage version 1) alongside the existing version 0 generator. The v1 generator eliminates conflicts during concurrent sequence number generation by using a new `FetchAddU64` operation instead of check-and-set semantics. Key changes: - V1 generator stores sequence values in external standalone keys - Uses atomic FetchAddU64 operation for conflict-free updates - V0 generator remains unchanged for backward compatibility The FetchAddU64 operation performs in-place atomic updates without requiring metadata assertions, eliminating conflicts between concurrent sequence requests and improving throughput for sequence generation workloads. Both storage versions coexist, allowing gradual migration from the conflict-prone v0 implementation to the high-concurrency v1 design. **Non-Breaking**: This Commit just add this function but does not used anywhere.
1 parent e486e3f commit 6533304

20 files changed

+576
-104
lines changed

src/meta/api/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub mod util;
3737

3838
pub mod crud;
3939
mod sequence_api_impl;
40+
pub(crate) mod sequence_nextval_impl;
4041

4142
pub use data_mask_api::DatamaskApi;
4243
pub use schema_api::SchemaApi;

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use databend_common_meta_app::schema::index_id_ident::IndexId;
4949
use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
5050
use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent;
5151
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
52+
use databend_common_meta_app::schema::sequence_storage::SequenceStorageIdent;
5253
use databend_common_meta_app::schema::table_niv::TableNIV;
5354
use databend_common_meta_app::schema::CatalogMeta;
5455
use databend_common_meta_app::schema::CatalogNameIdent;
@@ -346,7 +347,9 @@ impl SchemaApiTestSuite {
346347

347348
suite.get_table_name_by_id(&b.build().await).await?;
348349
suite.get_db_name_by_id(&b.build().await).await?;
349-
suite.test_sequence(&b.build().await).await?;
350+
351+
suite.test_sequence_0(&b.build().await).await?;
352+
suite.test_sequence_1(&b.build().await).await?;
350353

351354
suite.dictionary_create_list_drop(&b.build().await).await?;
352355
Ok(())
@@ -5764,9 +5767,28 @@ impl SchemaApiTestSuite {
57645767
}
57655768

57665769
#[fastrace::trace]
5767-
async fn test_sequence<MT: SchemaApi + SequenceApi + kvapi::AsKVApi<Error = MetaError>>(
5770+
async fn test_sequence_0<MT: SchemaApi + SequenceApi + kvapi::AsKVApi<Error = MetaError>>(
5771+
&self,
5772+
mt: &MT,
5773+
) -> anyhow::Result<()> {
5774+
self.test_sequence_with_version(mt, 0).await
5775+
}
5776+
5777+
#[fastrace::trace]
5778+
async fn test_sequence_1<MT: SchemaApi + SequenceApi + kvapi::AsKVApi<Error = MetaError>>(
57685779
&self,
57695780
mt: &MT,
5781+
) -> anyhow::Result<()> {
5782+
self.test_sequence_with_version(mt, 1).await
5783+
}
5784+
5785+
#[fastrace::trace]
5786+
async fn test_sequence_with_version<
5787+
MT: SchemaApi + SequenceApi + kvapi::AsKVApi<Error = MetaError>,
5788+
>(
5789+
&self,
5790+
mt: &MT,
5791+
storage_version: u64,
57705792
) -> anyhow::Result<()> {
57715793
let tenant = Tenant {
57725794
tenant: "tenant1".to_string(),
@@ -5782,6 +5804,7 @@ impl SchemaApiTestSuite {
57825804
ident: SequenceIdent::new(&tenant, sequence_name),
57835805
create_on,
57845806
comment: Some("seq".to_string()),
5807+
storage_version,
57855808
};
57865809

57875810
let _resp = mt.create_sequence(req).await?;
@@ -5803,14 +5826,16 @@ impl SchemaApiTestSuite {
58035826
ident: SequenceIdent::new(&tenant, "seq1"),
58045827
create_on,
58055828
comment: Some("seq1".to_string()),
5829+
storage_version,
58065830
};
58075831

5808-
let seqs = ["seq", "seq1"];
58095832
let _resp = mt.create_sequence(req).await?;
58105833
let values = mt.list_sequences(&tenant).await?;
5811-
for (i, (name, _)) in values.iter().enumerate() {
5812-
assert_eq!(name, seqs[i]);
5813-
}
5834+
assert_eq!(values.len(), 2);
5835+
assert_eq!(values[0].0, sequence_name);
5836+
assert_eq!(values[0].1.current, 1);
5837+
assert_eq!(values[1].0, "seq1");
5838+
assert_eq!(values[1].1.current, 1);
58145839
}
58155840

58165841
info!("--- get sequence nextval");
@@ -5821,7 +5846,7 @@ impl SchemaApiTestSuite {
58215846
};
58225847
let resp = mt.get_sequence_next_value(req).await?;
58235848
assert_eq!(resp.start, 1);
5824-
assert_eq!(resp.end, 10);
5849+
assert_eq!(resp.end, 11);
58255850
}
58265851

58275852
info!("--- get sequence after nextval");
@@ -5833,13 +5858,24 @@ impl SchemaApiTestSuite {
58335858
assert_eq!(resp.current, 11);
58345859
}
58355860

5861+
info!("--- list sequence after nextval");
5862+
{
5863+
let values = mt.list_sequences(&tenant).await?;
5864+
assert_eq!(values.len(), 2);
5865+
assert_eq!(values[0].0, sequence_name);
5866+
assert_eq!(values[0].1.current, 11);
5867+
assert_eq!(values[1].0, "seq1");
5868+
assert_eq!(values[1].1.current, 1);
5869+
}
5870+
58365871
info!("--- replace sequence");
58375872
{
58385873
let req = CreateSequenceReq {
58395874
create_option: CreateOption::CreateOrReplace,
58405875
ident: SequenceIdent::new(&tenant, sequence_name),
58415876
create_on,
58425877
comment: Some("seq1".to_string()),
5878+
storage_version,
58435879
};
58445880

58455881
let _resp = mt.create_sequence(req).await?;
@@ -5852,6 +5888,7 @@ impl SchemaApiTestSuite {
58525888
assert_eq!(resp.current, 1);
58535889
}
58545890

5891+
info!("--- drop sequence");
58555892
{
58565893
let req = DropSequenceReq {
58575894
ident: SequenceIdent::new(&tenant, sequence_name),
@@ -5863,6 +5900,24 @@ impl SchemaApiTestSuite {
58635900
let req = SequenceIdent::new(&tenant, sequence_name);
58645901
let resp = mt.get_sequence(&req).await?;
58655902
assert!(resp.is_none());
5903+
5904+
let storage_ident = SequenceStorageIdent::new_from(req);
5905+
let got = mt
5906+
.as_kv_api()
5907+
.get_kv(&storage_ident.to_string_key())
5908+
.await?;
5909+
assert!(
5910+
got.is_none(),
5911+
"storage_version==1 storage should be removed too"
5912+
);
5913+
}
5914+
5915+
info!("--- list sequence after drop");
5916+
{
5917+
let values = mt.list_sequences(&tenant).await?;
5918+
assert_eq!(values.len(), 1);
5919+
assert_eq!(values[0].0, "seq1");
5920+
assert_eq!(values[0].1.current, 1);
58665921
}
58675922

58685923
Ok(())

0 commit comments

Comments
 (0)