Skip to content

Commit 6bb25f2

Browse files
authored
refactor: adopt get_pb() for get_background_task (#16308)
* refactor: get_db_or_error returns SeqV<T> * refactor: adopt `get_pb()` for get_background_task
1 parent 152589f commit 6bb25f2

File tree

6 files changed

+90
-69
lines changed

6 files changed

+90
-69
lines changed

src/meta/api/src/background_api_impl.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,11 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
293293
"BackgroundTaskApi: {}",
294294
func_name!()
295295
);
296-
let name = &req.name;
297-
let (_, resp) = get_background_task_by_name(self, name).await?;
298-
Ok(GetBackgroundTaskReply { task_info: resp })
296+
297+
let resp = self.get_pb(&req.name).await?;
298+
Ok(GetBackgroundTaskReply {
299+
task_info: resp.into_value(),
300+
})
299301
}
300302
}
301303

@@ -332,19 +334,18 @@ pub fn assert_background_job_exist(
332334
) -> Result<(), AppError> {
333335
if seq == 0 {
334336
debug!(seq = seq, name_ident :? =(name_ident); "background job does not exist");
335-
let unknown = UnknownBackgroundJob::new(name_ident.job_name(), format!("{:?}", name_ident));
336-
Err(AppError::UnknownBackgroundJob(unknown))
337+
let err = unknown_background_job(name_ident);
338+
Err(err)
337339
} else {
338340
Ok(())
339341
}
340342
}
341343

342-
async fn get_background_task_by_name(
343-
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
344-
id: &BackgroundTaskIdent,
345-
) -> Result<(u64, Option<BackgroundTaskInfo>), KVAppError> {
346-
let (seq, res) = get_pb_value(kv_api, id).await?;
347-
Ok((seq, res))
344+
pub fn unknown_background_job(name_ident: &BackgroundJobIdent) -> AppError {
345+
AppError::UnknownBackgroundJob(UnknownBackgroundJob::new(
346+
name_ident.job_name(),
347+
format!("{:?}", name_ident),
348+
))
348349
}
349350

350351
async fn update_background_job<F: FnOnce(&mut BackgroundJobInfo) -> bool>(

src/meta/api/src/schema_api_impl.rs

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ use databend_common_meta_app::app_error::VirtualColumnAlreadyExists;
6464
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
6565
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
6666
use databend_common_meta_app::id_generator::IdGenerator;
67-
use databend_common_meta_app::primitive::Id;
6867
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
6968
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw;
7069
use databend_common_meta_app::schema::tenant_dictionary_ident::TenantDictionaryIdent;
@@ -753,10 +752,10 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
753752

754753
let name_key = &req.inner;
755754

756-
let (_, db_id, db_meta) = get_db_or_err(self, name_key, "get_database").await?;
755+
let (seq_db_id, db_meta) = get_db_or_err(self, name_key, "get_database").await?;
757756

758757
let db = DatabaseInfo {
759-
database_id: DatabaseId::new(db_id),
758+
database_id: seq_db_id.data,
760759
name_ident: name_key.clone(),
761760
meta: db_meta,
762761
};
@@ -1583,8 +1582,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
15831582
let seq_db_id = get_db_id_or_err(self, &tenant_dbname, "create_table").await?;
15841583

15851584
// fixed
1586-
let key_dbid = seq_db_id.data.into_inner();
1587-
let save_db_id = seq_db_id.data.into_inner();
1585+
let key_dbid = seq_db_id.data;
1586+
let save_db_id = seq_db_id.data;
15881587

15891588
// fixed
15901589
let key_dbid_tbname = DBIdTableName {
@@ -1899,7 +1898,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
18991898

19001899
// Get db by name to ensure presence
19011900

1902-
let (_, db_id, db_meta) = get_db_or_err(self, &tenant_dbname, "rename_table").await?;
1901+
let (seq_db_id, db_meta) = get_db_or_err(self, &tenant_dbname, "rename_table").await?;
19031902

19041903
// cannot operate on shared database
19051904
if let Some(from_share) = &db_meta.from_share {
@@ -1911,7 +1910,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
19111910
// Get table by db_id, table_name to assert presence.
19121911

19131912
let dbid_tbname = DBIdTableName {
1914-
db_id,
1913+
db_id: *seq_db_id.data,
19151914
table_name: tenant_dbname_tbname.table_name.clone(),
19161915
};
19171916

@@ -1934,7 +1933,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
19341933

19351934
// get table id list from _fd_table_id_list/db_id/table_name
19361935
let dbid_tbname_idlist = TableIdHistoryIdent {
1937-
database_id: db_id,
1936+
database_id: *seq_db_id.data,
19381937
table_name: req.name_ident.table_name.clone(),
19391938
};
19401939
let (tb_id_list_seq, tb_id_list_opt): (_, Option<TableIdList>) =
@@ -1981,20 +1980,20 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
19811980
// Get the renaming target db to ensure presence.
19821981

19831982
let tenant_newdbname = DatabaseNameIdent::new(tenant_dbname.tenant(), &req.new_db_name);
1984-
let (_, new_db_id, new_db_meta) =
1983+
let (new_seq_db_id, new_db_meta) =
19851984
get_db_or_err(self, &tenant_newdbname, "rename_table: new db").await?;
19861985

19871986
// Get the renaming target table to ensure absence
19881987

19891988
let newdbid_newtbname = DBIdTableName {
1990-
db_id: new_db_id,
1989+
db_id: *new_seq_db_id.data,
19911990
table_name: req.new_table_name.clone(),
19921991
};
19931992
let (new_tb_id_seq, _new_tb_id) = get_u64_value(self, &newdbid_newtbname).await?;
19941993
table_has_to_not_exist(new_tb_id_seq, &tenant_newdbname_newtbname, "rename_table")?;
19951994

19961995
let new_dbid_tbname_idlist = TableIdHistoryIdent {
1997-
database_id: new_db_id,
1996+
database_id: *new_seq_db_id.data,
19981997
table_name: req.new_table_name.clone(),
19991998
};
20001999
let (new_tb_id_list_seq, new_tb_id_list_opt): (_, Option<TableIdList>) =
@@ -2012,7 +2011,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
20122011
let (table_id_to_name_seq, _): (_, Option<DBIdTableName>) =
20132012
get_pb_value(self, &table_id_to_name_key).await?;
20142013
let db_id_table_name = DBIdTableName {
2015-
db_id: new_db_id,
2014+
db_id: *new_seq_db_id.data,
20162015
table_name: req.new_table_name.clone(),
20172016
};
20182017

@@ -2025,8 +2024,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
20252024
condition: vec![
20262025
// db has not to change, i.e., no new table is created.
20272026
// Renaming db is OK and does not affect the seq of db_meta.
2028-
txn_cond_seq(&DatabaseId { db_id }, Eq, db_meta.seq),
2029-
txn_cond_seq(&DatabaseId { db_id: new_db_id }, Eq, new_db_meta.seq),
2027+
txn_cond_seq(&seq_db_id.data, Eq, db_meta.seq),
2028+
txn_cond_seq(&new_seq_db_id.data, Eq, new_db_meta.seq),
20302029
// table_name->table_id does not change.
20312030
// Updating the table meta is ok.
20322031
txn_cond_seq(&dbid_tbname, Eq, tb_id_seq),
@@ -2041,18 +2040,18 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
20412040
txn_op_put(&newdbid_newtbname, serialize_u64(table_id)?), /* (db_id, new_tb_name) -> tb_id */
20422041
// Changing a table in a db has to update the seq of db_meta,
20432042
// to block the batch-delete-tables when deleting a db.
2044-
txn_op_put(&DatabaseId { db_id }, serialize_struct(&*db_meta)?), /* (db_id) -> db_meta */
2043+
txn_op_put(&seq_db_id.data, serialize_struct(&*db_meta)?), /* (db_id) -> db_meta */
20452044
txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?), /* _fd_table_id_list/db_id/old_table_name -> tb_id_list */
20462045
txn_op_put(&new_dbid_tbname_idlist, serialize_struct(&new_tb_id_list)?), /* _fd_table_id_list/db_id/new_table_name -> tb_id_list */
20472046
txn_op_put(&table_id_to_name_key, serialize_struct(&db_id_table_name)?), /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
20482047
],
20492048
else_then: vec![],
20502049
};
20512050

2052-
if db_id != new_db_id {
2051+
if *seq_db_id.data != *new_seq_db_id.data {
20532052
txn.if_then.push(
20542053
txn_op_put(
2055-
&DatabaseId { db_id: new_db_id },
2054+
&new_seq_db_id.data,
20562055
serialize_struct(&*new_db_meta)?,
20572056
), // (db_id) -> db_meta
20582057
);
@@ -2104,7 +2103,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
21042103
txn.if_then
21052104
.push(txn_op_put(&tbid, serialize_struct(&table_meta)?));
21062105

2107-
let share_object = ReplyShareObject::Table(db_id, table_id);
2106+
let share_object = ReplyShareObject::Table(*seq_db_id.data, table_id);
21082107
Some((spec_vec, share_object))
21092108
} else {
21102109
None
@@ -2153,7 +2152,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
21532152
)
21542153
.await;
21552154

2156-
let (_db_id_seq, db_id, db_meta) = match res {
2155+
let (seq_db_id, db_meta) = match res {
21572156
Ok(x) => x,
21582157
Err(e) => {
21592158
return Err(e);
@@ -2176,7 +2175,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
21762175
// Get table by tenant,db_id, table_name to assert presence.
21772176

21782177
let dbid_tbname = DBIdTableName {
2179-
db_id,
2178+
db_id: *seq_db_id.data,
21802179
table_name: tenant_dbname_tbname.table_name.clone(),
21812180
};
21822181

@@ -2245,7 +2244,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
22452244
)
22462245
.await;
22472246

2248-
let (_db_id_seq, db_id, _db_meta) = match res {
2247+
let (seq_db_id, _db_meta) = match res {
22492248
Ok(x) => x,
22502249
Err(e) => {
22512250
return Err(e);
@@ -2254,7 +2253,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
22542253

22552254
// List tables by tenant, db_id, table_name.
22562255
let table_id_history_ident = TableIdHistoryIdent {
2257-
database_id: db_id,
2256+
database_id: *seq_db_id.data,
22582257
table_name: "dummy".to_string(),
22592258
};
22602259

@@ -2268,7 +2267,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
22682267
.iter()
22692268
.map(|table_id_list_key| {
22702269
TableIdHistoryIdent {
2271-
database_id: db_id,
2270+
database_id: *seq_db_id.data,
22722271
table_name: table_id_list_key.table_name.clone(),
22732272
}
22742273
.to_string_key()
@@ -2373,15 +2372,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
23732372
)
23742373
.await;
23752374

2376-
let (_db_id_seq, db_id, db_meta) = match res {
2375+
let (seq_db_id, db_meta) = match res {
23772376
Ok(x) => x,
23782377
Err(e) => {
23792378
return Err(e);
23802379
}
23812380
};
23822381

23832382
let tb_infos = match &db_meta.from_share {
2384-
None => list_tables_from_unshare_db(self, db_id, tenant_dbname).await?,
2383+
None => list_tables_from_unshare_db(self, *seq_db_id.data, tenant_dbname).await?,
23852384
Some(share) => {
23862385
let share_ident = share.clone().to_tident(());
23872386
error!(
@@ -3594,7 +3593,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
35943593
)
35953594
.await;
35963595

3597-
let (_db_id_seq, db_id, db_meta) = match res {
3596+
let (seq_db_id, db_meta) = match res {
35983597
Ok(x) => x,
35993598
Err(e) => {
36003599
return Err(e);
@@ -3610,7 +3609,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
36103609
}
36113610

36123611
let db_info = Arc::new(DatabaseInfo {
3613-
database_id: DatabaseId::new(db_id),
3612+
database_id: seq_db_id.data,
36143613
name_ident: req.inner.clone(),
36153614
meta: db_meta,
36163615
});
@@ -4752,7 +4751,7 @@ async fn drop_database_meta(
47524751
)
47534752
.await;
47544753

4755-
let (db_id_seq, db_id, mut db_meta) = match res {
4754+
let (seq_db_id, mut db_meta) = match res {
47564755
Ok(x) => x,
47574756
Err(e) => {
47584757
if let KVAppError::AppError(AppError::UnknownDatabase(_)) = e {
@@ -4768,14 +4767,15 @@ async fn drop_database_meta(
47684767
// remove db_name -> db id
47694768
if drop_name_key {
47704769
txn.condition
4771-
.push(txn_cond_seq(tenant_dbname, Eq, db_id_seq));
4770+
.push(txn_cond_seq(tenant_dbname, Eq, seq_db_id.seq));
47724771
txn.if_then.push(txn_op_del(tenant_dbname)); // (tenant, db_name) -> db_id
47734772
}
47744773

47754774
// remove db from share
47764775
let mut share_specs = Vec::with_capacity(db_meta.shared_by.len());
47774776
for share_id in &db_meta.shared_by {
4778-
let res = remove_db_from_share(kv_api, *share_id, db_id, tenant_dbname, txn).await;
4777+
let res =
4778+
remove_db_from_share(kv_api, *share_id, *seq_db_id.data, tenant_dbname, txn).await;
47794779

47804780
match res {
47814781
Ok((share_name, share_meta)) => {
@@ -4800,7 +4800,7 @@ async fn drop_database_meta(
48004800

48014801
let (removed, _from_share) = is_db_need_to_be_remove(
48024802
kv_api,
4803-
db_id,
4803+
*seq_db_id.data,
48044804
// remove db directly if created from share
48054805
|db_meta| db_meta.from_share.is_some(),
48064806
txn,
@@ -4811,7 +4811,7 @@ async fn drop_database_meta(
48114811
// if db create from share then remove it directly and remove db id from share
48124812
debug!(
48134813
name :? =(tenant_dbname),
4814-
id :? =(&DatabaseId { db_id });
4814+
id :? =(&seq_db_id );
48154815
"drop_database from share"
48164816
);
48174817

@@ -4827,7 +4827,7 @@ async fn drop_database_meta(
48274827
db_id_list_opt.unwrap_or(DbIdList::new())
48284828
};
48294829
if let Some(last_db_id) = db_id_list.last() {
4830-
if *last_db_id == db_id {
4830+
if *last_db_id == *seq_db_id.data {
48314831
db_id_list.pop();
48324832
txn.condition
48334833
.push(txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq));
@@ -4840,10 +4840,10 @@ async fn drop_database_meta(
48404840
// del (tenant, db_name) -> db_id
48414841
// set db_meta.drop_on = now and update (db_id) -> db_meta
48424842

4843-
let db_id_key = DatabaseId { db_id };
4843+
let db_id_key = seq_db_id.data;
48444844

48454845
debug!(
4846-
db_id = db_id,
4846+
seq_db_id :? = seq_db_id,
48474847
name_key :? =(tenant_dbname);
48484848
"drop_database"
48494849
);
@@ -4873,12 +4873,12 @@ async fn drop_database_meta(
48734873

48744874
if db_id_list_seq == 0 || db_id_list_opt.is_none() {
48754875
warn!(
4876-
"drop db:{:?}, db_id:{:?} has no DbIdListKey",
4877-
tenant_dbname, db_id
4876+
"drop db:{:?}, seq_db_id:{:?} has no DbIdListKey",
4877+
tenant_dbname, seq_db_id
48784878
);
48794879

48804880
let mut db_id_list = DbIdList::new();
4881-
db_id_list.append(db_id);
4881+
db_id_list.append(*seq_db_id.data);
48824882

48834883
txn.condition
48844884
.push(txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq));
@@ -4889,9 +4889,9 @@ async fn drop_database_meta(
48894889
}
48904890

48914891
if share_specs.is_empty() {
4892-
Ok((db_id, None))
4892+
Ok((*seq_db_id.data, None))
48934893
} else {
4894-
Ok((db_id, Some(share_specs)))
4894+
Ok((*seq_db_id.data, Some(share_specs)))
48954895
}
48964896
}
48974897

@@ -4975,20 +4975,20 @@ pub(crate) async fn get_db_id_or_err(
49754975
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
49764976
name_key: &DatabaseNameIdent,
49774977
msg: impl Display,
4978-
) -> Result<SeqV<Id<DatabaseId>>, KVAppError> {
4978+
) -> Result<SeqV<DatabaseId>, KVAppError> {
49794979
let seq_db_id = kv_api.get_pb(name_key).await?;
49804980

49814981
let seq_db_id = seq_db_id.ok_or_else(|| unknown_database_error(name_key, msg))?;
4982-
Ok(seq_db_id)
4982+
4983+
Ok(seq_db_id.map(|x| x.into_inner()))
49834984
}
49844985

49854986
/// Returns (db_id_seq, db_id, db_meta_seq, db_meta)
49864987
pub(crate) async fn get_db_or_err(
49874988
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
49884989
name_key: &DatabaseNameIdent,
49894990
msg: impl Display,
4990-
) -> Result<(u64, u64, SeqV<DatabaseMeta>), KVAppError> {
4991-
// TODO: change to returning SeqV
4991+
) -> Result<(SeqV<DatabaseId>, SeqV<DatabaseMeta>), KVAppError> {
49924992
let seq_db_id = kv_api.get_pb(name_key).await?;
49934993
let seq_db_id = seq_db_id.ok_or_else(|| unknown_database_error(name_key, &msg))?;
49944994

@@ -4997,7 +4997,7 @@ pub(crate) async fn get_db_or_err(
49974997
let seq_db_meta = kv_api.get_pb(&id_key).await?;
49984998
let seq_db_meta = seq_db_meta.ok_or_else(|| unknown_database_error(name_key, &msg))?;
49994999

5000-
Ok((seq_db_id.seq, *seq_db_id.data, seq_db_meta))
5000+
Ok((seq_db_id.map(|x| x.into_inner()), seq_db_meta))
50015001
}
50025002

50035003
/// Returns (db_meta_seq, db_meta)
@@ -5867,9 +5867,9 @@ impl UndropTableStrategy for UndropTableReq {
58675867
kv_api: &'a (impl kvapi::KVApi<Error = MetaError> + ?Sized),
58685868
) -> Result<(u64, SeqV<DatabaseMeta>), KVAppError> {
58695869
// for plain un-drop table (by name), database meta is refreshed by name
5870-
let (_, db_id, db_meta) =
5870+
let (seq_db_id, db_meta) =
58715871
get_db_or_err(kv_api, &self.name_ident.db_name_ident(), "undrop_table").await?;
5872-
Ok((db_id, db_meta))
5872+
Ok((*seq_db_id.data, db_meta))
58735873
}
58745874

58755875
fn extract_and_validate_table_id(

0 commit comments

Comments
 (0)