Skip to content

Commit 820d63b

Browse files
authored
refactor: replace find_database_table_filter to extract_leveled_strings (#18297)
delete find_eq_filter
1 parent 7c985c2 commit 820d63b

File tree

10 files changed

+98
-141
lines changed

10 files changed

+98
-141
lines changed

src/query/storages/system/src/columns_table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use log::warn;
5151
use crate::generate_catalog_meta;
5252
use crate::table::AsyncOneBlockSystemTable;
5353
use crate::table::AsyncSystemTable;
54-
use crate::util::find_database_table_filters;
54+
use crate::util::extract_leveled_strings;
5555

5656
pub struct ColumnsTable {
5757
table_info: TableInfo,
@@ -352,7 +352,7 @@ pub(crate) async fn dump_tables(
352352
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
353353
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
354354
(filtered_db_names, filtered_table_names) =
355-
find_database_table_filters(&expr, &func_ctx)?;
355+
extract_leveled_strings(&expr, &["database", "table"], &func_ctx)?;
356356
}
357357
}
358358

src/query/storages/system/src/databases_table.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use databend_common_expression::types::TimestampType;
2626
use databend_common_expression::types::UInt64Type;
2727
use databend_common_expression::utils::FromData;
2828
use databend_common_expression::DataBlock;
29-
use databend_common_expression::Scalar;
3029
use databend_common_expression::TableDataType;
3130
use databend_common_expression::TableField;
3231
use databend_common_expression::TableSchemaRefExt;
@@ -44,7 +43,7 @@ use log::warn;
4443

4544
use crate::table::AsyncOneBlockSystemTable;
4645
use crate::table::AsyncSystemTable;
47-
use crate::util::find_eq_filter;
46+
use crate::util::extract_leveled_strings;
4847
use crate::util::generate_catalog_meta;
4948

5049
pub type DatabasesTableWithHistory = DatabasesTable<true>;
@@ -112,14 +111,9 @@ where DatabasesTable<WITH_HISTORY>: HistoryAware
112111
if let Some(push_downs) = push_downs {
113112
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
114113
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
115-
find_eq_filter(&expr, &mut |col_name, scalar| {
116-
if col_name == "catalog" {
117-
if let Scalar::String(catalog) = scalar {
118-
filter_catalog_name = Some(catalog.clone());
119-
}
120-
}
121-
Ok(())
122-
});
114+
let (catalog_name, _) =
115+
extract_leveled_strings(&expr, &["catalog"], &ctx.get_function_context()?)?;
116+
filter_catalog_name = Some(catalog_name);
123117
}
124118
}
125119

@@ -132,15 +126,16 @@ where DatabasesTable<WITH_HISTORY>: HistoryAware
132126
let catalog_dbs = visibility_checker
133127
.as_ref()
134128
.and_then(|c| c.get_visibility_database());
135-
let catalogs = if let Some(filter_catalog_name) = filter_catalog_name {
136-
let mut res = vec![];
137-
if filter_catalog_name == self.get_table_info().catalog() {
138-
res.push((filter_catalog_name, ctl));
139-
}
140-
// If empty return empty result
141-
res
142-
} else {
129+
130+
let current_catalog_name = self.get_table_info().catalog();
131+
// If filter_catalog_name is None (i.e., there is no filtering condition), the current directory is always included.
132+
// If filter_catalog_name is Empty (i.e., where name like '%sys%'), the current directory is always included.
133+
let catalogs = if filter_catalog_name.as_ref().is_none_or(|filter_names| {
134+
filter_names.is_empty() || filter_names.iter().any(|name| name == current_catalog_name)
135+
}) {
143136
vec![(ctl.name(), ctl)]
137+
} else {
138+
vec![]
144139
};
145140

146141
let user_api = UserApiProvider::instance();

src/query/storages/system/src/indexes_table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use log::warn;
3636

3737
use crate::table::AsyncOneBlockSystemTable;
3838
use crate::table::AsyncSystemTable;
39-
use crate::util::find_database_table_filters;
39+
use crate::util::extract_leveled_strings;
4040

4141
const POINT_GET_TABLE_LIMIT: usize = 20;
4242

@@ -64,7 +64,7 @@ impl AsyncSystemTable for IndexesTable {
6464
if let Some(filters) = push_downs.and_then(|info| info.filters) {
6565
let expr = filters.filter.as_expr(&BUILTIN_FUNCTIONS);
6666
(filtered_db_names, filtered_table_names) =
67-
find_database_table_filters(&expr, &func_ctx)?;
67+
extract_leveled_strings(&expr, &["database", "table"], &func_ctx)?;
6868
}
6969

7070
let filtered_db_names = if filtered_db_names.is_empty() {

src/query/storages/system/src/locks_table.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ use databend_common_catalog::plan::PushDownInfo;
1919
use databend_common_catalog::table::Table;
2020
use databend_common_catalog::table_context::TableContext;
2121
use databend_common_exception::Result;
22+
use databend_common_expression::filter_helper::FilterHelpers;
23+
use databend_common_expression::type_check::check_number;
2224
use databend_common_expression::types::number::UInt64Type;
2325
use databend_common_expression::types::NumberDataType;
2426
use databend_common_expression::types::StringType;
2527
use databend_common_expression::types::TimestampType;
2628
use databend_common_expression::utils::FromData;
29+
use databend_common_expression::Constant;
2730
use databend_common_expression::DataBlock;
28-
use databend_common_expression::Scalar;
31+
use databend_common_expression::Expr;
2932
use databend_common_expression::TableDataType;
3033
use databend_common_expression::TableField;
3134
use databend_common_expression::TableSchemaRef;
@@ -41,7 +44,6 @@ use databend_common_meta_app::tenant::Tenant;
4144

4245
use crate::table::AsyncOneBlockSystemTable;
4346
use crate::table::AsyncSystemTable;
44-
use crate::util::find_eq_filter;
4547
use crate::util::generate_catalog_meta;
4648

4749
pub struct LocksTable {
@@ -88,18 +90,30 @@ impl AsyncSystemTable for LocksTable {
8890
if let Some(push_downs) = &push_downs {
8991
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
9092
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
91-
find_eq_filter(&expr, &mut |col_name, scalar| {
92-
if col_name == "table_id" {
93-
if let Scalar::Number(s) = scalar {
94-
if let Some(v) = s.as_u_int64() {
95-
if !table_ids.contains(v) {
96-
table_ids.push(*v);
97-
}
98-
}
93+
let func_ctx = ctx.get_function_context()?;
94+
95+
let leveld_results = FilterHelpers::find_leveled_eq_filters(
96+
&expr,
97+
&["table_id"],
98+
&func_ctx,
99+
&BUILTIN_FUNCTIONS,
100+
)?;
101+
102+
for scalars in leveld_results {
103+
for r in scalars.iter() {
104+
let e = Expr::Constant(Constant {
105+
span: None,
106+
scalar: r.clone(),
107+
data_type: r.as_ref().infer_data_type(),
108+
});
109+
110+
if let Ok(s) =
111+
check_number::<u64, usize>(None, &func_ctx, &e, &BUILTIN_FUNCTIONS)
112+
{
113+
table_ids.push(s);
99114
}
100115
}
101-
Ok(())
102-
});
116+
}
103117
}
104118
}
105119

src/query/storages/system/src/notification_history_table.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use databend_common_expression::types::StringType;
3131
use databend_common_expression::types::TimestampType;
3232
use databend_common_expression::DataBlock;
3333
use databend_common_expression::FromData;
34-
use databend_common_expression::Scalar;
3534
use databend_common_functions::BUILTIN_FUNCTIONS;
3635
use databend_common_meta_app::schema::TableIdent;
3736
use databend_common_meta_app::schema::TableInfo;
@@ -40,11 +39,9 @@ use databend_common_sql::plans::notification_history_schema;
4039

4140
use crate::table::AsyncOneBlockSystemTable;
4241
use crate::table::AsyncSystemTable;
43-
use crate::util::find_eq_filter;
42+
use crate::util::extract_leveled_strings;
4443

45-
fn parse_history_to_block(
46-
histories: Vec<NotificationHistory>,
47-
) -> databend_common_exception::Result<DataBlock> {
44+
fn parse_history_to_block(histories: Vec<NotificationHistory>) -> Result<DataBlock> {
4845
let mut created_on: Vec<i64> = Vec::with_capacity(histories.len());
4946
let mut processed: Vec<Option<i64>> = Vec::with_capacity(histories.len());
5047
let mut message_source: Vec<String> = Vec::with_capacity(histories.len());
@@ -112,14 +109,13 @@ impl AsyncSystemTable for NotificationHistoryTable {
112109
if let Some(push_downs) = push_downs {
113110
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
114111
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
115-
find_eq_filter(&expr, &mut |col_name, scalar| {
116-
if col_name == "integration_name" {
117-
if let Scalar::String(s) = scalar {
118-
notification_name = Some(s.clone());
119-
}
120-
}
121-
Ok(())
122-
});
112+
let func_ctx = ctx.get_function_context()?;
113+
let (name, _) = extract_leveled_strings(&expr, &["integration_name"], &func_ctx)?;
114+
// find_filters will collect integration_name = xx or integration_name = yy.
115+
// So if name.len() != 1 integration_name should be None.
116+
if name.len() == 1 {
117+
notification_name = Some(name[0].clone())
118+
};
123119
}
124120
}
125121

src/query/storages/system/src/streams_table.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use databend_common_expression::types::StringType;
3030
use databend_common_expression::types::TimestampType;
3131
use databend_common_expression::utils::FromData;
3232
use databend_common_expression::DataBlock;
33-
use databend_common_expression::Scalar;
3433
use databend_common_expression::TableDataType;
3534
use databend_common_expression::TableField;
3635
use databend_common_expression::TableSchemaRef;
@@ -51,7 +50,7 @@ use log::warn;
5150

5251
use crate::table::AsyncOneBlockSystemTable;
5352
use crate::table::AsyncSystemTable;
54-
use crate::util::find_eq_filter;
53+
use crate::util::extract_leveled_strings;
5554
use crate::util::generate_catalog_meta;
5655

5756
pub type FullStreamsTable = StreamsTable<true>;
@@ -111,19 +110,10 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
111110

112111
let mut dbs = Vec::new();
113112
if let Some(push_downs) = &push_downs {
114-
let mut db_name = Vec::new();
115113
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
116114
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
117-
find_eq_filter(&expr, &mut |col_name, scalar| {
118-
if col_name == "database" {
119-
if let Scalar::String(database) = scalar {
120-
if !db_name.contains(database) {
121-
db_name.push(database.clone());
122-
}
123-
}
124-
}
125-
Ok(())
126-
});
115+
let func_ctx = ctx.get_function_context()?;
116+
let (db_name, _) = extract_leveled_strings(&expr, &["database"], &func_ctx)?;
127117
for db in db_name {
128118
match ctl.get_database(&tenant, db.as_str()).await {
129119
Ok(database) => dbs.push(database),

src/query/storages/system/src/tables_table.rs

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use databend_common_expression::utils::FromData;
3737
use databend_common_expression::Constant;
3838
use databend_common_expression::DataBlock;
3939
use databend_common_expression::Expr;
40-
use databend_common_expression::Scalar;
40+
use databend_common_expression::FunctionContext;
4141
use databend_common_expression::TableDataType;
4242
use databend_common_expression::TableField;
4343
use databend_common_expression::TableSchemaRef;
@@ -62,7 +62,7 @@ use log::warn;
6262

6363
use crate::table::AsyncOneBlockSystemTable;
6464
use crate::table::AsyncSystemTable;
65-
use crate::util::find_eq_filter;
65+
use crate::util::extract_leveled_strings;
6666
use crate::util::generate_catalog_meta;
6767

6868
pub struct TablesTable<const WITH_HISTORY: bool, const WITHOUT_VIEW: bool> {
@@ -176,8 +176,9 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
176176
.disable_table_info_refresh()?;
177177

178178
// Optimization target: Fast path for known iceberg catalog SHOW TABLES
179+
let func_ctx = ctx.get_function_context()?;
179180
if let Some((catalog_name, db_name)) =
180-
self.is_external_show_tables_query(&push_downs, &catalog)
181+
self.is_external_show_tables_query(&func_ctx, &push_downs, &catalog)
181182
{
182183
self.show_tables_from_external_catalog(ctx, catalog_name, db_name)
183184
.await
@@ -1047,11 +1048,11 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
10471048

10481049
fn is_external_show_tables_query(
10491050
&self,
1051+
func_ctx: &FunctionContext,
10501052
push_downs: &Option<PushDownInfo>,
10511053
catalog: &Arc<dyn Catalog>,
10521054
) -> Option<(String, String)> {
10531055
if !WITH_HISTORY && WITHOUT_VIEW {
1054-
let mut database_name = None;
10551056
// Check projection
10561057
if let Some(push_downs) = push_downs {
10571058
if let Some(Projection::Columns(projection_indices)) = &push_downs.projection {
@@ -1074,34 +1075,26 @@ where TablesTable<WITH_HISTORY, WITHOUT_VIEW>: HistoryAware
10741075
return None;
10751076
}
10761077

1077-
// Check filters (catalog name)
1078-
let mut catalog_name = None;
1078+
let mut filtered_catalog_names = vec![];
1079+
let mut filtered_db_names = vec![];
10791080

10801081
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
10811082
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
1082-
find_eq_filter(&expr, &mut |col_name, scalar| {
1083-
if col_name == "catalog" {
1084-
if let Scalar::String(catalog) = scalar {
1085-
catalog_name = Some(catalog.to_string());
1086-
}
1087-
}
1088-
if col_name == "database" {
1089-
if let Scalar::String(db) = scalar {
1090-
database_name = Some(db.to_string());
1091-
}
1092-
}
1093-
Ok(())
1094-
});
1083+
(filtered_catalog_names, filtered_db_names) =
1084+
extract_leveled_strings(&expr, &["catalog", "database"], func_ctx)
1085+
.ok()?;
10951086
}
10961087

10971088
// Check iceberg catalog existence
1098-
if let Some(catalog_name) = catalog_name {
1099-
if let Some(database_name) = database_name {
1100-
if catalog.name() == catalog_name {
1101-
if let CatalogType::Iceberg = catalog.info().catalog_type() {
1102-
return Some((catalog_name, database_name));
1103-
}
1104-
}
1089+
if filtered_catalog_names.len() == 1
1090+
&& filtered_db_names.len() == 1
1091+
&& catalog.name() == filtered_catalog_names[0].clone()
1092+
{
1093+
if let CatalogType::Iceberg = catalog.info().catalog_type() {
1094+
return Some((
1095+
filtered_catalog_names[0].clone(),
1096+
filtered_db_names[0].clone(),
1097+
));
11051098
}
11061099
}
11071100
}

src/query/storages/system/src/task_history_table.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use jiff::tz::TimeZone;
4444

4545
use crate::table::AsyncOneBlockSystemTable;
4646
use crate::table::AsyncSystemTable;
47-
use crate::util::find_eq_filter;
47+
use crate::util::extract_leveled_strings;
4848
use crate::util::find_gt_filter;
4949
use crate::util::find_lt_filter;
5050

@@ -152,14 +152,13 @@ impl AsyncSystemTable for TaskHistoryTable {
152152
if let Some(push_downs) = push_downs {
153153
if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) {
154154
let expr = filter.as_expr(&BUILTIN_FUNCTIONS);
155-
find_eq_filter(&expr, &mut |col_name, scalar| {
156-
if col_name == "name" {
157-
if let Scalar::String(s) = scalar {
158-
task_name = Some(s.clone());
159-
}
160-
}
161-
Ok(())
162-
});
155+
let func_ctx = ctx.get_function_context()?;
156+
let (name, _) = extract_leveled_strings(&expr, &["name"], &func_ctx)?;
157+
// find_filters will collect name = xx or name = yy.
158+
// So if name.len() != 1 task_name should be None.
159+
if name.len() == 1 {
160+
task_name = Some(name[0].clone())
161+
};
163162
find_lt_filter(&expr, &mut |col_name, scalar| {
164163
if col_name == "scheduled_time" {
165164
if let Scalar::Timestamp(s) = scalar {

0 commit comments

Comments
 (0)