Skip to content

Commit ef0fdc9

Browse files
authored
refactor: get_table_by_id() returns an Option<Seqv<TableMeta>> (#15388)
* refactor: get_table_by_id() returns an `Option<Seqv<TableMeta>>` Make `get` methods behave in a conventional way: return `None` if the subject is not found. Other changes: Extend expire time to survive slow CI * chore: fix typos
1 parent 81ecce8 commit ef0fdc9

File tree

30 files changed

+92
-91
lines changed

30 files changed

+92
-91
lines changed

โ€Žsrc/common/hashtable/src/traits.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ pub trait HashJoinHashtableLike {
556556
/// 3. `vec_ptr` is RowPtr Array, we use this one to record the matched row in chunks
557557
/// 4. `occupied` is the length for vec_ptr
558558
/// 5. `capacity` is the capacity of vec_ptr
559-
/// 6. return macthed rows count and next ptr which need to test in the future.
559+
/// 6. return matched rows count and next ptr which need to test in the future.
560560
/// if the capacity is enough, the next ptr is zero, otherwise next ptr is valid.
561561
fn next_probe(
562562
&self,

โ€Žsrc/common/hashtable/src/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ pub mod sse {
214214
// potentially leading to the target table being unsuitable for use as a build table in the future.
215215
// 2. Requires a significant amount of memory to be efficient and currently does not support spill operations.
216216
// for now we just support sql like below:
217-
// `merge into t using source on xxx when matched then update xxx when not macthed then insert xxx.
217+
// `merge into t using source on xxx when matched then update xxx when not matched then insert xxx.
218218
// for merge into:
219219
// we use MergeIntoBlockInfoIndex to maintain an index for the block info in chunks.
220220

@@ -331,7 +331,7 @@ impl MergeIntoBlockInfoIndex {
331331
let left = self.search_idx(start as u32);
332332
let right = self.search_idx((end - 1) as u32);
333333
if left == right {
334-
// macthed only one block.
334+
// matched only one block.
335335
if self.intervals[left].0 == (start as u32)
336336
&& self.intervals[right].1 == (end - 1) as u32
337337
{

โ€Žsrc/common/metrics/src/metrics/storage.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -416,12 +416,12 @@ pub fn metrics_inc_merge_into_split_milliseconds(c: u64) {
416416
MERGE_INTO_SPLIT_MILLISECONDS.observe(c as f64);
417417
}
418418

419-
// after merge_source_split, record the time of not macthed clauses (processor_merge_into_not_matched)
419+
// after merge_source_split, record the time of not matched clauses (processor_merge_into_not_matched)
420420
pub fn merge_into_not_matched_operation_milliseconds(c: u64) {
421421
MERGE_INTO_NOT_MATCHED_OPERATION_MILLISECONDS.observe(c as f64);
422422
}
423423

424-
// after merge_source_split, record the time of macthed clauses (processor_merge_into_matched_and_split)
424+
// after merge_source_split, record the time of matched clauses (processor_merge_into_matched_and_split)
425425
pub fn merge_into_matched_operation_milliseconds(c: u64) {
426426
MERGE_INTO_MATCHED_OPERATION_MILLISECONDS.observe(c as f64);
427427
}

โ€Žsrc/meta/api/src/schema_api.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ use databend_common_meta_app::schema::SetLVTReply;
7676
use databend_common_meta_app::schema::SetLVTReq;
7777
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
7878
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
79-
use databend_common_meta_app::schema::TableIdent;
8079
use databend_common_meta_app::schema::TableInfo;
8180
use databend_common_meta_app::schema::TableMeta;
8281
use databend_common_meta_app::schema::TruncateTableReply;
@@ -96,7 +95,9 @@ use databend_common_meta_app::schema::UpdateVirtualColumnReq;
9695
use databend_common_meta_app::schema::UpsertTableOptionReply;
9796
use databend_common_meta_app::schema::UpsertTableOptionReq;
9897
use databend_common_meta_app::schema::VirtualColumnMeta;
98+
use databend_common_meta_types::MetaError;
9999
use databend_common_meta_types::MetaId;
100+
use databend_common_meta_types::SeqV;
100101

101102
use crate::kv_app_error::KVAppError;
102103

@@ -203,10 +204,11 @@ pub trait SchemaApi: Send + Sync {
203204

204205
async fn list_tables(&self, req: ListTableReq) -> Result<Vec<Arc<TableInfo>>, KVAppError>;
205206

206-
async fn get_table_by_id(
207-
&self,
208-
table_id: MetaId,
209-
) -> Result<(TableIdent, Arc<TableMeta>), KVAppError>;
207+
/// Return TableMeta by table_id.
208+
///
209+
/// It returns None instead of KVAppError, if table_id does not exist
210+
async fn get_table_by_id(&self, table_id: MetaId)
211+
-> Result<Option<SeqV<TableMeta>>, MetaError>;
210212

211213
async fn mget_table_names_by_ids(
212214
&self,

โ€Žsrc/meta/api/src/schema_api_impl.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ use crate::get_share_table_info;
226226
use crate::get_u64_value;
227227
use crate::is_db_need_to_be_remove;
228228
use crate::kv_app_error::KVAppError;
229+
use crate::kv_pb_api::KVPbApi;
229230
use crate::list_keys;
230231
use crate::list_u64_value;
231232
use crate::remove_db_from_share;
@@ -2235,25 +2236,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
22352236
async fn get_table_by_id(
22362237
&self,
22372238
table_id: MetaId,
2238-
) -> Result<(TableIdent, Arc<TableMeta>), KVAppError> {
2239+
) -> Result<Option<SeqV<TableMeta>>, MetaError> {
22392240
debug!(req :? =(&table_id); "SchemaApi: {}", func_name!());
22402241

2241-
let tbid = TableId { table_id };
2242-
2243-
let (tb_meta_seq, table_meta): (_, Option<TableMeta>) = get_pb_value(self, &tbid).await?;
2244-
2245-
debug!(ident :% =(&tbid); "get_table_by_id");
2246-
2247-
if tb_meta_seq == 0 || table_meta.is_none() {
2248-
return Err(KVAppError::AppError(AppError::UnknownTableId(
2249-
UnknownTableId::new(table_id, "get_table_by_id"),
2250-
)));
2251-
}
2242+
let id = TableId { table_id };
22522243

2253-
Ok((
2254-
TableIdent::new(table_id, tb_meta_seq),
2255-
Arc::new(table_meta.unwrap()),
2256-
))
2244+
let seq_table_meta = self.get_pb(&id).await?;
2245+
Ok(seq_table_meta)
22572246
}
22582247

22592248
#[logcall::logcall("debug")]

โ€Žsrc/meta/api/src/schema_api_test_suite.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5029,20 +5029,16 @@ impl SchemaApiTestSuite {
50295029
.await
50305030
.unwrap();
50315031

5032-
let (table_id, table_meta) = mt.get_table_by_id(table.ident.table_id).await?;
5033-
5032+
let seqv = mt.get_table_by_id(table.ident.table_id).await?.unwrap();
5033+
let table_meta = seqv.data;
50345034
assert_eq!(table_meta.options.get("optโ€1"), Some(&"val-1".into()));
5035-
assert_eq!(table_id.table_id, table.ident.table_id);
50365035
}
50375036

50385037
info!("--- get_table_by_id with not exists table_id");
50395038
{
5040-
let got = mt.get_table_by_id(1024).await;
5039+
let got = mt.get_table_by_id(1024).await?;
50415040

5042-
let err = got.unwrap_err();
5043-
let err = ErrorCode::from(err);
5044-
5045-
assert_eq!(ErrorCode::UNKNOWN_TABLE_ID, err.code());
5041+
assert!(got.is_none());
50465042
}
50475043
}
50485044
Ok(())
@@ -5910,7 +5906,8 @@ impl SchemaApiTestSuite {
59105906

59115907
{
59125908
info!("--- check table index");
5913-
let (_, table_meta) = mt.get_table_by_id(table_id).await?;
5909+
let seqv = mt.get_table_by_id(table_id).await?.unwrap();
5910+
let table_meta = seqv.data;
59145911
assert_eq!(table_meta.indexes.len(), 2);
59155912

59165913
let index1 = table_meta.indexes.get(&index_name_1);
@@ -5953,7 +5950,8 @@ impl SchemaApiTestSuite {
59535950

59545951
{
59555952
info!("--- check table index after drop");
5956-
let (_, table_meta) = mt.get_table_by_id(table_id).await?;
5953+
let seqv = mt.get_table_by_id(table_id).await?.unwrap();
5954+
let table_meta = seqv.data;
59575955
assert_eq!(table_meta.indexes.len(), 1);
59585956

59595957
let index1 = table_meta.indexes.get(&index_name_1);

โ€Žsrc/meta/raft-store/tests/it/state_machine/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_upsert_get() -> anyhow::Res
236236
"wow",
237237
MatchSeq::GE(0),
238238
"y",
239-
Some(1000),
239+
Some(5_000),
240240
None,
241241
Some((6, "y")),
242242
now_ms,

โ€Žsrc/query/catalog/src/catalog/interface.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ use databend_common_meta_app::schema::RenameTableReply;
7373
use databend_common_meta_app::schema::RenameTableReq;
7474
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
7575
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
76-
use databend_common_meta_app::schema::TableIdent;
7776
use databend_common_meta_app::schema::TableInfo;
7877
use databend_common_meta_app::schema::TableMeta;
7978
use databend_common_meta_app::schema::TruncateTableReply;
@@ -95,6 +94,7 @@ use databend_common_meta_app::schema::UpsertTableOptionReq;
9594
use databend_common_meta_app::schema::VirtualColumnMeta;
9695
use databend_common_meta_app::tenant::Tenant;
9796
use databend_common_meta_types::MetaId;
97+
use databend_common_meta_types::SeqV;
9898
use dyn_clone::DynClone;
9999

100100
use crate::database::Database;
@@ -195,8 +195,8 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
195195
// Build a `Arc<dyn Table>` from `TableInfo`.
196196
fn get_table_by_info(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>>;
197197

198-
// Get the table meta by meta id.
199-
async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<(TableIdent, Arc<TableMeta>)>;
198+
/// Get the table meta by table id.
199+
async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<Option<SeqV<TableMeta>>>;
200200

201201
// Get the table name by meta id.
202202
async fn get_table_name_by_id(&self, table_id: MetaId) -> Result<String>;

โ€Žsrc/query/catalog/src/catalog/session_catalog.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ use databend_common_meta_app::schema::RenameTableReply;
7272
use databend_common_meta_app::schema::RenameTableReq;
7373
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
7474
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
75-
use databend_common_meta_app::schema::TableIdent;
7675
use databend_common_meta_app::schema::TableInfo;
7776
use databend_common_meta_app::schema::TableMeta;
7877
use databend_common_meta_app::schema::TruncateTableReply;
@@ -94,6 +93,7 @@ use databend_common_meta_app::schema::UpsertTableOptionReq;
9493
use databend_common_meta_app::schema::VirtualColumnMeta;
9594
use databend_common_meta_app::tenant::Tenant;
9695
use databend_common_meta_types::MetaId;
96+
use databend_common_meta_types::SeqV;
9797
use databend_storages_common_txn::TxnManagerRef;
9898
use databend_storages_common_txn::TxnState;
9999

@@ -225,13 +225,13 @@ impl Catalog for SessionCatalog {
225225
}
226226

227227
// Get the table meta by meta id.
228-
async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<(TableIdent, Arc<TableMeta>)> {
228+
async fn get_table_meta_by_id(&self, table_id: MetaId) -> Result<Option<SeqV<TableMeta>>> {
229229
let state = self.txn_mgr.lock().state();
230230
match state {
231231
TxnState::Active => {
232232
let mutated_table = self.txn_mgr.lock().get_table_from_buffer_by_id(table_id);
233233
if let Some(t) = mutated_table {
234-
Ok((t.ident, Arc::new(t.meta.clone())))
234+
Ok(Some(SeqV::new(t.ident.seq, t.meta.clone())))
235235
} else {
236236
self.inner.get_table_meta_by_id(table_id).await
237237
}

โ€Žsrc/query/catalog/src/table.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_expression::TableSchema;
2828
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
2929
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
3030
use databend_common_io::constants::DEFAULT_BLOCK_MIN_ROWS;
31+
use databend_common_meta_app::schema::TableIdent;
3132
use databend_common_meta_app::schema::TableInfo;
3233
use databend_common_meta_app::schema::UpdateStreamMetaReq;
3334
use databend_common_meta_app::schema::UpsertTableCopiedFileReq;
@@ -414,10 +415,15 @@ pub trait TableExt: Table {
414415
let table_info = self.get_table_info();
415416
let tid = table_info.ident.table_id;
416417
let catalog = ctx.get_catalog(table_info.catalog()).await?;
417-
let (ident, meta) = catalog.get_table_meta_by_id(tid).await?;
418+
419+
let seqv = catalog.get_table_meta_by_id(tid).await?.ok_or_else(|| {
420+
let err = UnknownTableId::new(tid, "TableExt::refresh");
421+
AppError::from(err)
422+
})?;
423+
418424
let table_info = TableInfo {
419-
ident,
420-
meta: meta.as_ref().clone(),
425+
ident: TableIdent::new(tid, seqv.seq),
426+
meta: seqv.data,
421427
..table_info.clone()
422428
};
423429
catalog.get_table_by_info(&table_info)
@@ -515,6 +521,9 @@ pub struct NavigationDescriptor {
515521

516522
use std::collections::HashMap;
517523

524+
use databend_common_meta_app::app_error::AppError;
525+
use databend_common_meta_app::app_error::UnknownTableId;
526+
518527
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
519528
pub struct ParquetTableColumnStatisticsProvider {
520529
column_stats: HashMap<ColumnId, Option<BasicColumnStatistics>>,

0 commit comments

Comments
ย (0)