Skip to content

Commit 09530b8

Browse files
authored
refactor: fuse table funcs (#16149)
* refact: rename generic type names * wip * refact: fuse_block * refact: fuse_column * wip * wip * introduce common arg func * refactor: fuse_block/columns * refactor: fuse_segment * refactor: fuse_snapshot * resolve merge conflict * refact: simple arg func * refactor: cluster infor funcs * refactor: fuse_encoding/stats * refactor: squash mods * cargo clippy * cleanup * fix logic test 09_0014_func_clustering_information_function.test * fix: logic test 09_0006_func_fuse_history.test Change error code from 1001 (internal error) to 4000( storage error), if fuse_snapshot is applied to non-fuse tables
1 parent 5048be8 commit 09530b8

File tree

37 files changed

+925
-2297
lines changed

37 files changed

+925
-2297
lines changed

src/query/service/src/interpreters/interpreter_select.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,13 @@ impl SelectInterpreter {
145145
.main_pipeline
146146
.set_on_finished(move |info: &ExecutionInfo| match &info.res {
147147
Ok(_) => GlobalIORuntime::instance().block_on(async move {
148-
info!("Updating the stream meta to consume data");
149-
150148
match update_stream_metas {
151149
Some(streams) => {
152150
let r = UpdateMultiTableMetaReq {
153151
update_table_metas: streams.update_table_metas,
154152
..Default::default()
155153
};
154+
info!("Updating the stream meta to consume data");
156155
catalog.update_multi_table_meta(r).await.map(|_| ())
157156
}
158157
None => Ok(()),

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ use databend_common_catalog::table_args::TableArgs;
1919
use databend_common_exception::ErrorCode;
2020
use databend_common_exception::Result;
2121
use databend_common_meta_types::MetaId;
22-
use databend_common_storages_fuse::table_functions::ClusteringStatisticsTable;
22+
use databend_common_storages_fuse::table_functions::ClusteringStatisticsFunc;
2323
use databend_common_storages_fuse::table_functions::FuseAmendTable;
24-
use databend_common_storages_fuse::table_functions::FuseColumnTable;
25-
use databend_common_storages_fuse::table_functions::FuseEncodingTable;
26-
use databend_common_storages_fuse::table_functions::SetCacheCapacity;
24+
use databend_common_storages_fuse::table_functions::FuseBlockFunc;
25+
use databend_common_storages_fuse::table_functions::FuseColumnFunc;
26+
use databend_common_storages_fuse::table_functions::FuseEncodingFunc;
27+
use databend_common_storages_fuse::table_functions::FuseStatisticsFunc;
2728
use databend_common_storages_fuse::table_functions::TableFunctionTemplate;
2829
use databend_common_storages_stream::stream_status_table_func::StreamStatusTable;
2930
use itertools::Itertools;
@@ -35,11 +36,10 @@ use super::SuggestedBackgroundTasksTable;
3536
use super::TenantQuotaTable;
3637
use crate::catalogs::SYS_TBL_FUC_ID_END;
3738
use crate::catalogs::SYS_TBL_FUNC_ID_BEGIN;
38-
use crate::storages::fuse::table_functions::ClusteringInformationTable;
39-
use crate::storages::fuse::table_functions::FuseBlockTable;
40-
use crate::storages::fuse::table_functions::FuseSegmentTable;
41-
use crate::storages::fuse::table_functions::FuseSnapshotTable;
42-
use crate::storages::fuse::table_functions::FuseStatisticTable;
39+
use crate::storages::fuse::table_functions::ClusteringInformationFunc;
40+
use crate::storages::fuse::table_functions::FuseSegmentFunc;
41+
use crate::storages::fuse::table_functions::FuseSnapshotFunc;
42+
use crate::storages::fuse::table_functions::SetCacheCapacityFunc;
4343
use crate::table_functions::async_crash_me::AsyncCrashMeTable;
4444
use crate::table_functions::cloud::TaskDependentsEnableTable;
4545
use crate::table_functions::cloud::TaskDependentsTable;
@@ -121,7 +121,10 @@ impl TableFunctionFactory {
121121

122122
creators.insert(
123123
"fuse_snapshot".to_string(),
124-
(next_id(), Arc::new(FuseSnapshotTable::create)),
124+
(
125+
next_id(),
126+
Arc::new(TableFunctionTemplate::<FuseSnapshotFunc>::create),
127+
),
125128
);
126129

127130
creators.insert(
@@ -136,34 +139,56 @@ impl TableFunctionFactory {
136139
"set_cache_capacity".to_string(),
137140
(
138141
next_id(),
139-
Arc::new(TableFunctionTemplate::<SetCacheCapacity>::create),
142+
Arc::new(TableFunctionTemplate::<SetCacheCapacityFunc>::create),
140143
),
141144
);
142145

143146
creators.insert(
144147
"fuse_segment".to_string(),
145-
(next_id(), Arc::new(FuseSegmentTable::create)),
148+
(
149+
next_id(),
150+
Arc::new(TableFunctionTemplate::<FuseSegmentFunc>::create),
151+
),
146152
);
153+
147154
creators.insert(
148155
"fuse_block".to_string(),
149-
(next_id(), Arc::new(FuseBlockTable::create)),
156+
(
157+
next_id(),
158+
Arc::new(TableFunctionTemplate::<FuseBlockFunc>::create),
159+
),
150160
);
161+
151162
creators.insert(
152163
"fuse_column".to_string(),
153-
(next_id(), Arc::new(FuseColumnTable::create)),
164+
(
165+
next_id(),
166+
Arc::new(TableFunctionTemplate::<FuseColumnFunc>::create),
167+
),
154168
);
169+
155170
creators.insert(
156171
"fuse_statistic".to_string(),
157-
(next_id(), Arc::new(FuseStatisticTable::create)),
172+
(
173+
next_id(),
174+
Arc::new(TableFunctionTemplate::<FuseStatisticsFunc>::create),
175+
),
158176
);
159177

160178
creators.insert(
161179
"clustering_information".to_string(),
162-
(next_id(), Arc::new(ClusteringInformationTable::create)),
180+
(
181+
next_id(),
182+
Arc::new(TableFunctionTemplate::<ClusteringInformationFunc>::create),
183+
),
163184
);
185+
164186
creators.insert(
165187
"clustering_statistics".to_string(),
166-
(next_id(), Arc::new(ClusteringStatisticsTable::create)),
188+
(
189+
next_id(),
190+
Arc::new(TableFunctionTemplate::<ClusteringStatisticsFunc>::create),
191+
),
167192
);
168193

169194
creators.insert(
@@ -232,7 +257,10 @@ impl TableFunctionFactory {
232257

233258
creators.insert(
234259
"fuse_encoding".to_string(),
235-
(next_id(), Arc::new(FuseEncodingTable::create)),
260+
(
261+
next_id(),
262+
Arc::new(TableFunctionTemplate::<FuseEncodingFunc>::create),
263+
),
236264
);
237265

238266
creators.insert(

src/query/storages/fuse/src/table_functions/cache_admin/mod.rs

Lines changed: 0 additions & 17 deletions
This file was deleted.
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use std::collections::HashMap;
1919
use std::sync::Arc;
2020

2121
use chrono::Utc;
22+
use databend_common_catalog::catalog::CATALOG_DEFAULT;
23+
use databend_common_catalog::plan::DataSourcePlan;
24+
use databend_common_catalog::table_args::TableArgs;
2225
use databend_common_exception::ErrorCode;
2326
use databend_common_exception::Result;
2427
use databend_common_expression::compare_scalars;
@@ -42,6 +45,7 @@ use databend_common_expression::Scalar;
4245
use databend_common_expression::TableDataType;
4346
use databend_common_expression::TableField;
4447
use databend_common_expression::TableSchema;
48+
use databend_common_expression::TableSchemaRef;
4549
use databend_common_expression::TableSchemaRefExt;
4650
use databend_common_expression::Value;
4751
use databend_common_functions::BUILTIN_FUNCTIONS;
@@ -57,9 +61,81 @@ use serde_json::Value as JsonValue;
5761
use crate::io::SegmentsIO;
5862
use crate::sessions::TableContext;
5963
use crate::table_functions::cmp_with_null;
64+
use crate::table_functions::parse_db_tb_opt_args;
65+
use crate::table_functions::string_literal;
66+
use crate::table_functions::SimpleArgFunc;
67+
use crate::table_functions::SimpleArgFuncTemplate;
6068
use crate::FuseTable;
6169
use crate::Table;
6270

71+
pub struct ClusteringInformationArgs {
72+
database_name: String,
73+
table_name: String,
74+
cluster_key: Option<String>,
75+
}
76+
77+
impl From<&ClusteringInformationArgs> for TableArgs {
78+
fn from(args: &ClusteringInformationArgs) -> Self {
79+
let mut tbl_args = Vec::new();
80+
tbl_args.push(string_literal(args.database_name.as_str()));
81+
tbl_args.push(string_literal(args.table_name.as_str()));
82+
if let Some(arg_cluster_key) = &args.cluster_key {
83+
tbl_args.push(string_literal(arg_cluster_key));
84+
}
85+
TableArgs::new_positioned(tbl_args)
86+
}
87+
}
88+
impl TryFrom<(&str, TableArgs)> for ClusteringInformationArgs {
89+
type Error = ErrorCode;
90+
fn try_from(
91+
(func_name, table_args): (&str, TableArgs),
92+
) -> std::result::Result<Self, Self::Error> {
93+
let (database_name, table_name, cluster_key) =
94+
parse_db_tb_opt_args(&table_args, func_name)?;
95+
96+
Ok(Self {
97+
database_name,
98+
table_name,
99+
cluster_key,
100+
})
101+
}
102+
}
103+
104+
pub type ClusteringInformationFunc = SimpleArgFuncTemplate<ClusteringInformationNew>;
105+
pub struct ClusteringInformationNew;
106+
107+
#[async_trait::async_trait]
108+
impl SimpleArgFunc for ClusteringInformationNew {
109+
type Args = ClusteringInformationArgs;
110+
111+
fn schema() -> TableSchemaRef {
112+
ClusteringInformation::schema()
113+
}
114+
115+
async fn apply(
116+
ctx: &Arc<dyn TableContext>,
117+
args: &Self::Args,
118+
_plan: &DataSourcePlan,
119+
) -> Result<DataBlock> {
120+
let tenant_id = ctx.get_tenant();
121+
let tbl = ctx
122+
.get_catalog(CATALOG_DEFAULT)
123+
.await?
124+
.get_table(
125+
&tenant_id,
126+
args.database_name.as_str(),
127+
args.table_name.as_str(),
128+
)
129+
.await?;
130+
131+
let tbl = FuseTable::try_from_table(tbl.as_ref())?;
132+
133+
ClusteringInformation::new(ctx.clone(), tbl, args.cluster_key.clone())
134+
.get_clustering_info()
135+
.await
136+
}
137+
}
138+
63139
pub struct ClusteringInformation<'a> {
64140
pub ctx: Arc<dyn TableContext>,
65141
pub table: &'a FuseTable,

0 commit comments

Comments
 (0)