@@ -65,9 +65,6 @@ use databend_common_meta_app::schema::CatalogIdToName;
65
65
use databend_common_meta_app:: schema:: CatalogInfo ;
66
66
use databend_common_meta_app:: schema:: CatalogMeta ;
67
67
use databend_common_meta_app:: schema:: CatalogNameIdent ;
68
- use databend_common_meta_app:: schema:: CountTablesKey ;
69
- use databend_common_meta_app:: schema:: CountTablesReply ;
70
- use databend_common_meta_app:: schema:: CountTablesReq ;
71
68
use databend_common_meta_app:: schema:: CreateCatalogReply ;
72
69
use databend_common_meta_app:: schema:: CreateCatalogReq ;
73
70
use databend_common_meta_app:: schema:: CreateDatabaseReply ;
@@ -204,7 +201,6 @@ use databend_common_meta_types::TxnGetRequest;
204
201
use databend_common_meta_types:: TxnGetResponse ;
205
202
use databend_common_meta_types:: TxnOp ;
206
203
use databend_common_meta_types:: TxnRequest ;
207
- use databend_common_meta_types:: UpsertKV ;
208
204
use futures:: TryStreamExt ;
209
205
use log:: debug;
210
206
use log:: error;
@@ -1521,7 +1517,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1521
1517
1522
1518
debug ! ( req : ? =( & req) ; "SchemaApi: {}" , func_name!( ) ) ;
1523
1519
1524
- let tenant = req. name_ident . tenant ( ) ;
1525
1520
let tenant_dbname_tbname = & req. name_ident ;
1526
1521
let tenant_dbname = req. name_ident . db_name_ident ( ) ;
1527
1522
@@ -1554,15 +1549,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1554
1549
table_name : req. name_ident . table_name . clone ( ) ,
1555
1550
} ;
1556
1551
1557
- // fixed
1558
- let key_table_count = CountTablesKey :: new ( tenant. name ( ) ) ;
1559
-
1560
1552
// The keys of values to re-fetch for every retry in this txn.
1561
1553
let keys = vec ! [
1562
1554
key_dbid. to_string_key( ) ,
1563
1555
key_dbid_tbname. to_string_key( ) ,
1564
1556
key_table_id_list. to_string_key( ) ,
1565
- key_table_count. to_string_key( ) ,
1566
1557
] ;
1567
1558
1568
1559
// Initialize required key-values
@@ -1574,21 +1565,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1574
1565
. collect :: < Vec < _ > > ( )
1575
1566
} ;
1576
1567
1577
- // Initialize table count if needed
1578
- assert_eq ! ( data[ 3 ] . key, key_table_count. to_string_key( ) ) ;
1579
- if data[ 3 ] . value . is_none ( ) {
1580
- init_table_count ( self , & key_table_count) . await ?;
1581
-
1582
- // Re-fetch
1583
- data = {
1584
- let values = self . mget_kv ( & keys) . await ?;
1585
- keys. iter ( )
1586
- . zip ( values. into_iter ( ) )
1587
- . map ( |( k, v) | TxnGetResponse :: new ( k, v. map ( pb:: SeqV :: from) ) )
1588
- . collect :: < Vec < _ > > ( )
1589
- } ;
1590
- }
1591
-
1592
1568
let mut trials = txn_backoff ( None , func_name ! ( ) ) ;
1593
1569
loop {
1594
1570
trials. next ( ) . unwrap ( ) ?. await ;
@@ -1630,7 +1606,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1630
1606
1631
1607
let mut condition = vec ! [ ] ;
1632
1608
let mut if_then = vec ! [ ] ;
1633
- let mut tb_count = None ;
1634
1609
1635
1610
let opt = {
1636
1611
let d = data. remove ( 0 ) ;
@@ -1660,7 +1635,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1660
1635
db_id. data ,
1661
1636
false ,
1662
1637
false ,
1663
- & mut tb_count,
1664
1638
& mut condition,
1665
1639
& mut if_then,
1666
1640
)
@@ -1680,14 +1654,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1680
1654
v. unwrap_or_default ( )
1681
1655
} ;
1682
1656
1683
- let mut tb_count = {
1684
- let d = data. remove ( 0 ) ;
1685
- let ( k, v) = deserialize_id_get_response :: < CountTablesKey > ( d) ?;
1686
- assert_eq ! ( key_table_count, k) ;
1687
-
1688
- v. unwrap_or_default ( )
1689
- } ;
1690
-
1691
1657
// Table id is unique and does not need to re-generate in every loop.
1692
1658
if key_table_id. is_none ( ) {
1693
1659
let id = fetch_id ( self , IdGenerator :: table_id ( ) ) . await ?;
@@ -1733,15 +1699,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1733
1699
txn_op_put( & key_table_id_to_name, serialize_struct( & key_dbid_tbname) ?) , /* __fd_table_id_to_name/db_id/table_name -> DBIdTableName */
1734
1700
] ) ;
1735
1701
1736
- // tb_id_seq is 0 means that is a create operation, in this case need to update table count
1737
- if tb_id_seq == 0 {
1738
- tb_count. data . 0 += 1 ;
1739
- // update table count atomically
1740
- condition. push ( txn_cond_seq ( & key_table_count, Eq , tb_count. seq ) ) ;
1741
- // _fd_table_count/tenant -> tb_count
1742
- if_then. push ( txn_op_put ( & key_table_count, serialize_u64 ( tb_count. data ) ?) ) ;
1743
- }
1744
-
1745
1702
let txn_req = TxnRequest {
1746
1703
condition,
1747
1704
if_then,
@@ -1811,9 +1768,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1811
1768
1812
1769
let tenant_dbname_tbname = & req. name_ident ;
1813
1770
let tenant_dbname = req. name_ident . db_name_ident ( ) ;
1814
- let mut tbcount_found = false ;
1815
- let mut tb_count = 0 ;
1816
- let mut tb_count_seq;
1817
1771
1818
1772
let mut trials = txn_backoff ( None , func_name ! ( ) ) ;
1819
1773
loop {
@@ -1883,22 +1837,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1883
1837
let tbid = TableId { table_id } ;
1884
1838
let ( tb_meta_seq, tb_meta) : ( _ , Option < TableMeta > ) = get_pb_value ( self , & tbid) . await ?;
1885
1839
1886
- // get current table count from _fd_table_count/tenant
1887
- let tb_count_key = CountTablesKey {
1888
- tenant : tenant_dbname. tenant . name ( ) . to_string ( ) ,
1889
- } ;
1890
- ( tb_count_seq, tb_count) = {
1891
- let ( seq, count) = get_u64_value ( self , & tb_count_key) . await ?;
1892
- if seq > 0 {
1893
- ( seq, count)
1894
- } else if !tbcount_found {
1895
- // only count_tables for the first time.
1896
- tbcount_found = true ;
1897
- ( 0 , count_tables ( self , & tb_count_key) . await ?)
1898
- } else {
1899
- ( 0 , tb_count)
1900
- }
1901
- } ;
1902
1840
// add drop_on time on table meta
1903
1841
// (db_id, table_name) -> table_id
1904
1842
@@ -1928,8 +1866,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1928
1866
txn_cond_seq( & dbid_tbname, Eq , tb_id_seq) ,
1929
1867
// table is not changed
1930
1868
txn_cond_seq( & tbid, Eq , tb_meta_seq) ,
1931
- // update table count atomically
1932
- txn_cond_seq( & tb_count_key, Eq , tb_count_seq) ,
1933
1869
] ,
1934
1870
if_then : vec ! [
1935
1871
// Changing a table in a db has to update the seq of db_meta,
@@ -1938,7 +1874,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
1938
1874
txn_op_put( & dbid_tbname, serialize_u64( table_id) ?) , /* (tenant, db_id, tb_name) -> tb_id */
1939
1875
// txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?)?, // _fd_table_id_list/db_id/table_name -> tb_id_list
1940
1876
txn_op_put( & tbid, serialize_struct( & tb_meta) ?) , /* (tenant, db_id, tb_id) -> tb_meta */
1941
- txn_op_put( & tb_count_key, serialize_u64( tb_count + 1 ) ?) , /* _fd_table_count/tenant -> tb_count */
1942
1877
] ,
1943
1878
else_then : vec ! [ ] ,
1944
1879
} ;
@@ -2564,7 +2499,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
2564
2499
let table_id = req. tb_id ;
2565
2500
debug ! ( req : ? =( & table_id) ; "SchemaApi: {}" , func_name!( ) ) ;
2566
2501
2567
- let mut tb_count = None ;
2568
2502
let tenant = & req. tenant ;
2569
2503
2570
2504
let mut trials = txn_backoff ( None , func_name ! ( ) ) ;
@@ -2582,7 +2516,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
2582
2516
req. db_id ,
2583
2517
req. if_exists ,
2584
2518
true ,
2585
- & mut tb_count,
2586
2519
& mut condition,
2587
2520
& mut if_then,
2588
2521
)
@@ -3584,65 +3517,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
3584
3517
Ok ( GcDroppedTableResp { } )
3585
3518
}
3586
3519
3587
- /// Get the count of tables for one tenant.
3588
- ///
3589
- /// Accept tenant name and returns the count of tables for the tenant.
3590
- ///
3591
- /// It get the count from kv space first,
3592
- /// if not found, it will compute the count by listing all databases and table ids.
3593
- #[ logcall:: logcall( "debug" ) ]
3594
- #[ minitrace:: trace]
3595
- async fn count_tables ( & self , req : CountTablesReq ) -> Result < CountTablesReply , KVAppError > {
3596
- debug ! ( req : ? =( & req) ; "SchemaApi: {}" , func_name!( ) ) ;
3597
-
3598
- let key = CountTablesKey {
3599
- tenant : req. tenant . to_string_key ( ) ,
3600
- } ;
3601
-
3602
- let count = loop {
3603
- let ( seq, cnt) = {
3604
- // get the count from kv space first
3605
- let ( seq, c) = get_u64_value ( self , & key) . await ?;
3606
- if seq > 0 {
3607
- // if seq > 0, we can get the count directly
3608
- break c;
3609
- }
3610
-
3611
- // if not, we should compute the count from by listing all databases and table ids
3612
-
3613
- // this line of codes will only be executed once,
3614
- // because if `send_txn` failed, it means another txn will put the count value into the kv space,
3615
- // and then the next loop will get the count value through `get_u64_value`.
3616
- ( 0 , count_tables ( self , & key) . await ?)
3617
- } ;
3618
-
3619
- let key = CountTablesKey {
3620
- tenant : req. tenant . clone ( ) ,
3621
- } ;
3622
-
3623
- let txn_req = TxnRequest {
3624
- // table count should not be changed.
3625
- condition : vec ! [ txn_cond_seq( & key, Eq , seq) ] ,
3626
- if_then : vec ! [ txn_op_put( & key, serialize_u64( cnt) ?) ] ,
3627
- else_then : vec ! [ ] ,
3628
- } ;
3629
-
3630
- let ( succ, _) = send_txn ( self , txn_req) . await ?;
3631
- // if txn succeeds, count can be returned safely
3632
- if succ {
3633
- break cnt;
3634
- }
3635
- } ;
3636
-
3637
- debug ! (
3638
- tenant = & req. tenant,
3639
- count = count;
3640
- "count tables for a tenant"
3641
- ) ;
3642
-
3643
- Ok ( CountTablesReply { count } )
3644
- }
3645
-
3646
3520
#[ minitrace:: trace]
3647
3521
async fn list_lock_revisions (
3648
3522
& self ,
@@ -4275,7 +4149,6 @@ async fn construct_drop_table_txn_operations(
4275
4149
db_id : u64 ,
4276
4150
if_exists : bool ,
4277
4151
if_delete : bool ,
4278
- tb_count_opt : & mut Option < u64 > ,
4279
4152
condition : & mut Vec < TxnCondition > ,
4280
4153
if_then : & mut Vec < TxnOp > ,
4281
4154
) -> Result < ( Option < ( Vec < ShareSpec > , Vec < ShareTableInfoMap > ) > , u64 ) , KVAppError > {
@@ -4322,24 +4195,6 @@ async fn construct_drop_table_txn_operations(
4322
4195
} ;
4323
4196
}
4324
4197
4325
- // get current table count from _fd_table_count/<tenant>
4326
- let tb_count_key = CountTablesKey {
4327
- tenant : tenant. name ( ) . to_string ( ) ,
4328
- } ;
4329
- let ( tb_count_seq, tb_count) = {
4330
- let ( seq, count) = get_u64_value ( kv_api, & tb_count_key) . await ?;
4331
- if seq > 0 {
4332
- ( seq, count)
4333
- } else if tb_count_opt. is_none ( ) {
4334
- // only count_tables for the first time.
4335
- let count = count_tables ( kv_api, & tb_count_key) . await ?;
4336
- * tb_count_opt = Some ( count) ;
4337
- ( 0 , count)
4338
- } else {
4339
- ( 0 , tb_count_opt. unwrap ( ) )
4340
- }
4341
- } ;
4342
-
4343
4198
let ( db_meta_seq, db_meta) = get_db_by_id_or_err ( kv_api, db_id, "drop_table_by_id" ) . await ?;
4344
4199
4345
4200
// cannot operate on shared database
@@ -4385,11 +4240,6 @@ async fn construct_drop_table_txn_operations(
4385
4240
condition. push ( txn_cond_seq ( & dbid_tbname, Eq , tb_id_seq) ) ;
4386
4241
// (db_id, tb_name) -> tb_id
4387
4242
if_then. push ( txn_op_del ( & dbid_tbname) ) ;
4388
-
4389
- // update table count atomically
4390
- condition. push ( txn_cond_seq ( & tb_count_key, Eq , tb_count_seq) ) ;
4391
- // _fd_table_count/tenant -> tb_count
4392
- if_then. push ( txn_op_put ( & tb_count_key, serialize_u64 ( tb_count - 1 ) ?) ) ;
4393
4243
}
4394
4244
4395
4245
// remove table from share
@@ -4771,49 +4621,6 @@ fn table_has_to_not_exist(
4771
4621
}
4772
4622
}
4773
4623
4774
- /// Initialize count of tables for one tenant.
4775
- async fn init_table_count (
4776
- kv_api : & ( impl kvapi:: KVApi < Error = MetaError > + ?Sized ) ,
4777
- key : & CountTablesKey ,
4778
- ) -> Result < ( ) , KVAppError > {
4779
- let n = count_tables ( kv_api, key) . await ?;
4780
-
4781
- kv_api
4782
- . upsert_kv ( UpsertKV :: insert ( key. to_string_key ( ) , & serialize_u64 ( n) ?) )
4783
- . await ?;
4784
-
4785
- Ok ( ( ) )
4786
- }
4787
-
4788
- /// Get the count of tables for one tenant by listing databases and table ids.
4789
- ///
4790
- /// It returns (seq, `u64` value).
4791
- /// If the count value is not in the kv space, (0, `u64` value) is returned.
4792
- async fn count_tables (
4793
- kv_api : & ( impl kvapi:: KVApi < Error = MetaError > + ?Sized ) ,
4794
- key : & CountTablesKey ,
4795
- ) -> Result < u64 , KVAppError > {
4796
- // For backward compatibility:
4797
- // If the table count of a tenant is not found in kv space,,
4798
- // we should compute the count by listing all tables of the tenant.
4799
- let databases = kv_api
4800
- . list_databases ( ListDatabaseReq {
4801
- tenant : Tenant :: new_or_err ( & key. tenant , func_name ! ( ) ) ?,
4802
- filter : None ,
4803
- } )
4804
- . await ?;
4805
- let mut count = 0 ;
4806
- for db in databases. into_iter ( ) {
4807
- let dbid_tbname = DBIdTableName {
4808
- db_id : db. ident . db_id ,
4809
- table_name : "" . to_string ( ) ,
4810
- } ;
4811
- let ( _, ids) = list_u64_value ( kv_api, & dbid_tbname) . await ?;
4812
- count += ids. len ( ) as u64 ;
4813
- }
4814
- Ok ( count)
4815
- }
4816
-
4817
4624
async fn get_share_table_info_map (
4818
4625
kv_api : & ( impl kvapi:: KVApi < Error = MetaError > + ?Sized ) ,
4819
4626
table_meta : & TableMeta ,
0 commit comments