Skip to content

Commit 3d57627

Browse files
committed
cleanup
1 parent e4f3f31 commit 3d57627

File tree

15 files changed

+55
-34
lines changed

15 files changed

+55
-34
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/util.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,6 @@ fn eq_filter(
312312
};
313313

314314
let min = RawExpr::Constant { span: None, scalar };
315-
// Make gte and lte function
316315
let eq_func = RawExpr::FunctionCall {
317316
span: None,
318317
name: "eq".to_string(),
@@ -322,9 +321,9 @@ fn eq_filter(
322321
let expr = type_check::check(&eq_func, &BUILTIN_FUNCTIONS)?;
323322

324323
// Fold
325-
// `Cast { expr: Constant { scalar: Number(50_u64), data_type: T }, dest_type: Nullable(T) }`
324+
// `Cast { expr: Constant { scalar: .., data_type: T }, dest_type: Nullable(T) }`
326325
// to
327-
// `Constant { scalar: Number(50_u64), data_type: Nullable(T) }`
326+
// `Constant { scalar: .., data_type: Nullable(T) }`
328327
// so that the expression can be utilized by bloom filter
329328
let (expr, _) = ConstantFolder::fold(&expr, func_ctx, &BUILTIN_FUNCTIONS);
330329
return Ok(Some(expr));

src/query/service/src/test_kits/block_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl<'a> BlockWriter<'a> {
109109

110110
let bloom_index_cols = BloomIndexColumns::All;
111111
let bloom_columns_map =
112-
bloom_index_cols.bloom_index_fields(schema.clone(), BloomIndex::supported_type)?;
112+
bloom_index_cols.bloom_index_fields(schema.as_ref(), BloomIndex::supported_type)?;
113113
let maybe_bloom_index = BloomIndex::try_create(
114114
FunctionContext::default(),
115115
location.1,

src/query/service/tests/it/storages/fuse/operations/read_plan.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,15 @@ fn test_to_partitions() -> Result<()> {
112112
let column_nodes = ColumnNodes { column_nodes };
113113

114114
// CASE I: no projection
115-
let (s, parts) = FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, None, None);
115+
let bloom_index_cols = None;
116+
let (s, parts) = FuseTable::to_partitions(
117+
None,
118+
&blocks_metas,
119+
&column_nodes,
120+
None,
121+
None,
122+
bloom_index_cols,
123+
);
116124
assert_eq!(parts.len(), num_of_block as usize);
117125
let expected_block_size: u64 = cols_metas
118126
.values()
@@ -140,8 +148,15 @@ fn test_to_partitions() -> Result<()> {
140148
..Default::default()
141149
});
142150

143-
let (stats, parts) =
144-
FuseTable::to_partitions(None, &blocks_metas, &column_nodes, None, push_down, None);
151+
let bloom_index_cols = None;
152+
let (stats, parts) = FuseTable::to_partitions(
153+
None,
154+
&blocks_metas,
155+
&column_nodes,
156+
None,
157+
push_down,
158+
bloom_index_cols,
159+
);
145160
assert_eq!(parts.len(), num_of_block as usize);
146161
assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64);
147162

src/query/sql/src/planner/bloom_index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use databend_common_ast::parser::tokenize_sql;
2020
use databend_common_ast::parser::Dialect;
2121
use databend_common_exception::ErrorCode;
2222
use databend_common_exception::Result;
23-
use databend_common_expression::is_stream_column_id;
23+
use databend_common_expression::{is_stream_column_id, TableSchema};
2424
use databend_common_expression::ComputedExpr;
2525
use databend_common_expression::FieldIndex;
2626
use databend_common_expression::TableDataType;
@@ -111,7 +111,7 @@ impl BloomIndexColumns {
111111
/// Get table field based on the BloomIndexColumns and schema.
112112
pub fn bloom_index_fields<F>(
113113
&self,
114-
schema: TableSchemaRef,
114+
schema: &TableSchema,
115115
verify_type: F,
116116
) -> Result<BTreeMap<FieldIndex, TableField>>
117117
where

src/query/storages/fuse/src/io/read/agg_index/agg_index_reader_native.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ impl AggIndexReader {
4646
.enumerate()
4747
.map(|(i, c)| (i as u32, ColumnMeta::Native(c)))
4848
.collect();
49+
let bloom_index_cols = None;
4950
let part = FuseBlockPartInfo::create(
5051
loc.to_string(),
5152
num_rows,
@@ -55,7 +56,7 @@ impl AggIndexReader {
5556
None,
5657
None,
5758
None,
58-
None,
59+
bloom_index_cols,
5960
);
6061
let res = self
6162
.reader
@@ -99,6 +100,7 @@ impl AggIndexReader {
99100
.enumerate()
100101
.map(|(i, c)| (i as u32, ColumnMeta::Native(c)))
101102
.collect();
103+
let bloom_index_cols = None;
102104
let part = FuseBlockPartInfo::create(
103105
loc.to_string(),
104106
num_rows,
@@ -108,7 +110,7 @@ impl AggIndexReader {
108110
None,
109111
None,
110112
None,
111-
None,
113+
bloom_index_cols,
112114
);
113115
let res = self
114116
.reader

src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl VirtualColumnReader {
8282
let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta);
8383

8484
if !ranges.is_empty() {
85+
let bloom_index_cols = None;
8586
let part = FuseBlockPartInfo::create(
8687
loc.to_string(),
8788
row_group.num_rows() as u64,
@@ -91,7 +92,7 @@ impl VirtualColumnReader {
9192
None,
9293
None,
9394
None,
94-
None,
95+
bloom_index_cols,
9596
);
9697

9798
let merge_io_result =
@@ -125,6 +126,7 @@ impl VirtualColumnReader {
125126
let (ranges, ignore_column_ids) = self.read_columns_meta(&schema, &columns_meta);
126127

127128
if !ranges.is_empty() {
129+
let bloom_index_cols = None;
128130
let part = FuseBlockPartInfo::create(
129131
loc.to_string(),
130132
row_group.num_rows() as u64,
@@ -134,7 +136,7 @@ impl VirtualColumnReader {
134136
None,
135137
None,
136138
None,
137-
None,
139+
bloom_index_cols,
138140
);
139141

140142
let merge_io_result = BlockReader::merge_io_read(

src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl TransformSerializeBlock {
125125

126126
let bloom_columns_map = table
127127
.bloom_index_cols
128-
.bloom_index_fields(source_schema.clone(), BloomIndex::supported_type)?;
128+
.bloom_index_fields(source_schema.as_ref(), BloomIndex::supported_type)?;
129129
let block_builder = BlockBuilder {
130130
ctx,
131131
meta_locations: table.meta_location_generator().clone(),

src/query/storages/fuse/src/operations/merge.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl FuseTable {
8888
.into();
8989
let bloom_columns_map = self
9090
.bloom_index_cols()
91-
.bloom_index_fields(new_schema.clone(), BloomIndex::supported_type)?;
91+
.bloom_index_fields(new_schema.as_ref(), BloomIndex::supported_type)?;
9292
let block_builder = BlockBuilder {
9393
ctx: ctx.clone(),
9494
meta_locations: self.meta_location_generator().clone(),

src/query/storages/fuse/src/operations/read/native_data_source_reader.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ use crate::io::BlockReader;
3838
use crate::io::TableMetaLocationGenerator;
3939
use crate::io::VirtualColumnReader;
4040
use crate::operations::read::data_source_with_meta::DataSourceWithMeta;
41-
use crate::operations::read::runtime_filter_prunner::runtime_bloom_filter_pruner;
42-
use crate::operations::read::runtime_filter_prunner::runtime_range_filter_pruner;
41+
use crate::operations::read::runtime_filter_prunner::runtime_bloom_filter_prune;
42+
use crate::operations::read::runtime_filter_prunner::runtime_range_filter_prune;
4343
use crate::FuseBlockPartInfo;
4444

4545
pub struct ReadNativeDataSource<const BLOCKING_IO: bool> {
@@ -140,7 +140,7 @@ impl SyncSource for ReadNativeDataSource<true> {
140140
.ctx
141141
.get_min_max_runtime_filter_with_id(self.table_index),
142142
);
143-
if runtime_range_filter_pruner(
143+
if runtime_range_filter_prune(
144144
self.table_schema.clone(),
145145
&part,
146146
&filters,
@@ -247,7 +247,7 @@ impl Processor for ReadNativeDataSource<false> {
247247
);
248248
let mut native_part_infos = Vec::with_capacity(parts.len());
249249
for part in parts.into_iter() {
250-
if runtime_range_filter_pruner(
250+
if runtime_range_filter_prune(
251251
self.table_schema.clone(),
252252
&part,
253253
&filters,
@@ -256,7 +256,7 @@ impl Processor for ReadNativeDataSource<false> {
256256
continue;
257257
}
258258

259-
if runtime_bloom_filter_pruner(
259+
if runtime_bloom_filter_prune(
260260
self.table_schema.clone(),
261261
&part,
262262
&filters,

src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,15 @@ impl<const BLOCKING_IO: bool> NativeRowsFetcher<BLOCKING_IO> {
223223
let block_idx = block_idx_in_segment(blocks.len(), block as usize);
224224
let block_meta = &blocks[block_idx];
225225
let page_size = block_meta.page_size();
226+
227+
let bloom_index_cols = None;
226228
let part_info = FuseTable::projection_part(
227229
block_meta,
228230
&None,
229231
&column_nodes,
230232
None,
231233
&self.projection,
232-
None, // TODO bloom filer desc
234+
bloom_index_cols,
233235
);
234236
self.part_map.insert(prefix, (part_info, page_size));
235237
}

0 commit comments

Comments
 (0)