Skip to content

Commit d901add

Browse files
committed
feat(storage): use distinct count calculated by the xor hash function
1 parent 81e8b52 commit d901add

File tree

11 files changed

+98
-48
lines changed

11 files changed

+98
-48
lines changed

src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,9 +663,10 @@ impl CompactSegmentTestFixture {
663663
let mut stats_acc = StatisticsAccumulator::default();
664664
for block in blocks {
665665
let block = block?;
666-
let col_stats = gen_columns_statistics(&block)?;
666+
let col_stats = gen_columns_statistics(&block, None)?;
667667

668-
let mut block_statistics = BlockStatistics::from(&block, "".to_owned(), None)?;
668+
let mut block_statistics =
669+
BlockStatistics::from(&block, "".to_owned(), None, None)?;
669670
let block_meta = block_writer.write(block, col_stats, None).await?;
670671
block_statistics.block_file_location = block_meta.location.0.clone();
671672

src/query/service/tests/it/storages/fuse/statistics.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::storages::fuse::table_test_fixture::TestFixture;
4848
fn test_ft_stats_block_stats() -> common_exception::Result<()> {
4949
let schema = DataSchemaRefExt::create(vec![DataField::new("a", i32::to_data_type())]);
5050
let block = DataBlock::create(schema, vec![Series::from_data(vec![1, 2, 3])]);
51-
let r = gen_columns_statistics(&block)?;
51+
let r = gen_columns_statistics(&block, None)?;
5252
assert_eq!(1, r.len());
5353
let col_stats = r.get(&0).unwrap();
5454
assert_eq!(col_stats.min, DataValue::Int64(1));
@@ -69,7 +69,7 @@ fn test_ft_tuple_stats_block_stats() -> common_exception::Result<()> {
6969
let column = StructColumn::from_data(inner_columns, tuple_data_type).arc();
7070

7171
let block = DataBlock::create(schema, vec![column]);
72-
let r = gen_columns_statistics(&block)?;
72+
let r = gen_columns_statistics(&block, None)?;
7373
assert_eq!(2, r.len());
7474
let col0_stats = r.get(&0).unwrap();
7575
assert_eq!(col0_stats.min, DataValue::Int64(1));
@@ -89,7 +89,7 @@ fn test_ft_stats_col_stats_reduce() -> common_exception::Result<()> {
8989
let blocks = TestFixture::gen_sample_blocks_ex(num_of_blocks, rows_per_block, val_start_with);
9090
let col_stats = blocks
9191
.iter()
92-
.map(|b| gen_columns_statistics(&b.clone().unwrap()))
92+
.map(|b| gen_columns_statistics(&b.clone().unwrap(), None))
9393
.collect::<common_exception::Result<Vec<_>>>()?;
9494
let r = reducers::reduce_block_statistics(&col_stats, None);
9595
assert!(r.is_ok());
@@ -158,8 +158,9 @@ async fn test_accumulator() -> common_exception::Result<()> {
158158

159159
for item in blocks {
160160
let block = item?;
161-
let col_stats = gen_columns_statistics(&block)?;
162-
let block_statistics = BlockStatistics::from(&block, "does_not_matter".to_owned(), None)?;
161+
let col_stats = gen_columns_statistics(&block, None)?;
162+
let block_statistics =
163+
BlockStatistics::from(&block, "does_not_matter".to_owned(), None, None)?;
163164
let block_writer = BlockWriter::new(&operator, &loc_generator);
164165
let block_meta = block_writer.write(block, col_stats, None).await?;
165166
stats_acc.add_with_block_meta(block_meta, block_statistics)?;
@@ -344,7 +345,7 @@ fn test_ft_stats_block_stats_string_columns_trimming_using_eval() -> common_exce
344345
let max_expr = max_col.get(0);
345346

346347
// generate the statistics of column
347-
let stats_of_columns = gen_columns_statistics(&block).unwrap();
348+
let stats_of_columns = gen_columns_statistics(&block, None).unwrap();
348349

349350
// check if the max value (untrimmed) is in degenerated condition:
350351
// - the length of string value is larger or equal than STRING_PREFIX_LEN

src/query/service/tests/it/storages/statistics/column_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,15 @@ fn test_column_traverse() -> Result<()> {
8888
let cols = traverse::traverse_columns_dfs(sample_block.columns())?;
8989

9090
assert_eq!(5, cols.len());
91-
(0..5).for_each(|i| assert_eq!(cols[i], sample_cols[i], "checking col {}", i));
91+
(0..5).for_each(|i| assert_eq!(cols[i].1, sample_cols[i], "checking col {}", i));
9292

9393
Ok(())
9494
}
9595

9696
#[test]
9797
fn test_column_statistic() -> Result<()> {
9898
let (sample_block, sample_cols) = gen_sample_block();
99-
let col_stats = gen_columns_statistics(&sample_block)?;
99+
let col_stats = gen_columns_statistics(&sample_block, None)?;
100100

101101
assert_eq!(5, col_stats.len());
102102

src/query/storages/fuse/fuse-result/src/result_table_sink.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl Processor for ResultTableSink {
176176
match std::mem::replace(&mut self.state, State::None) {
177177
State::NeedSerialize(block) => {
178178
let location = self.locations.gen_block_location();
179-
let block_statistics = BlockStatistics::from(&block, location.clone(), None)?;
179+
let block_statistics = BlockStatistics::from(&block, location.clone(), None, None)?;
180180

181181
let mut data = Vec::with_capacity(100 * 1024 * 1024);
182182
let schema = block.schema().clone();

src/query/storages/fuse/fuse-result/src/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ impl ResultTableWriter {
101101
pub async fn append_block(&mut self, block: DataBlock) -> Result<PartInfoPtr> {
102102
let location = self.locations.gen_block_location();
103103
let mut data = Vec::with_capacity(100 * 1024 * 1024);
104-
let block_statistics = BlockStatistics::from(&block, location.clone(), None)?;
104+
let block_statistics = BlockStatistics::from(&block, location.clone(), None, None)?;
105105
let schema = block.schema().clone();
106106
let (size, meta_data) = serialize_data_blocks(vec![block], &schema, &mut data)?;
107107

src/query/storages/fuse/fuse/src/operations/fuse_sink.rs

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16+
use std::collections::HashMap;
1617
use std::sync::Arc;
1718

1819
use async_trait::async_trait;
@@ -51,6 +52,33 @@ struct BloomIndexState {
5152
location: Location,
5253
}
5354

55+
impl BloomIndexState {
56+
pub fn try_create(
57+
block: &DataBlock,
58+
location: Location,
59+
) -> Result<(Self, HashMap<usize, usize>)> {
60+
// write index
61+
let bloom_index = BlockFilter::try_create(&[block])?;
62+
let index_block = bloom_index.filter_block;
63+
let mut data = Vec::with_capacity(100 * 1024);
64+
let index_block_schema = &bloom_index.filter_schema;
65+
let (size, _) = serialize_data_blocks_with_compression(
66+
vec![index_block],
67+
index_block_schema,
68+
&mut data,
69+
CompressionOptions::Uncompressed,
70+
)?;
71+
Ok((
72+
Self {
73+
data,
74+
size,
75+
location,
76+
},
77+
bloom_index.column_distinct_count,
78+
))
79+
}
80+
}
81+
5482
enum State {
5583
None,
5684
NeedSerialize(DataBlock),
@@ -168,28 +196,16 @@ impl Processor for FuseTableSink {
168196

169197
let (block_location, block_id) = self.meta_locations.gen_block_location();
170198

171-
let bloom_index_state = {
172-
// write index
173-
let bloom_index = BlockFilter::try_create(&[&block])?;
174-
let index_block = bloom_index.filter_block;
175-
let location = self.meta_locations.block_bloom_index_location(&block_id);
176-
let mut data = Vec::with_capacity(100 * 1024);
177-
let index_block_schema = &bloom_index.filter_schema;
178-
let (size, _) = serialize_data_blocks_with_compression(
179-
vec![index_block],
180-
index_block_schema,
181-
&mut data,
182-
CompressionOptions::Uncompressed,
183-
)?;
184-
BloomIndexState {
185-
data,
186-
size,
187-
location,
188-
}
189-
};
199+
let location = self.meta_locations.block_bloom_index_location(&block_id);
200+
let (bloom_index_state, column_distinct_count) =
201+
BloomIndexState::try_create(&block, location)?;
190202

191-
let block_statistics =
192-
BlockStatistics::from(&block, block_location.0, cluster_stats)?;
203+
let block_statistics = BlockStatistics::from(
204+
&block,
205+
block_location.0,
206+
cluster_stats,
207+
Some(column_distinct_count),
208+
)?;
193209
// we need a configuration of block size threshold here
194210
let mut data = Vec::with_capacity(100 * 1024 * 1024);
195211
let schema = block.schema().clone();

src/query/storages/fuse/fuse/src/operations/mutation/deletion_mutator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl DeletionMutator {
8383
&self.base_mutator.data_accessor,
8484
&self.base_mutator.location_generator,
8585
);
86-
let col_stats = gen_columns_statistics(&replace_with)?;
86+
let col_stats = gen_columns_statistics(&replace_with, None)?;
8787
let cluster_stats = self
8888
.cluster_stats_gen
8989
.gen_with_origin_stats(&replace_with, origin_stats)?;

src/query/storages/fuse/fuse/src/statistics/block_statistics.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@ impl BlockStatistics {
3434
data_block: &DataBlock,
3535
location: String,
3636
cluster_stats: Option<ClusterStatistics>,
37+
column_distinct_count: Option<HashMap<usize, usize>>,
3738
) -> common_exception::Result<BlockStatistics> {
3839
Ok(BlockStatistics {
3940
block_file_location: location,
4041
block_rows_size: data_block.num_rows() as u64,
4142
block_bytes_size: data_block.memory_size() as u64,
42-
block_column_statistics: column_statistic::gen_columns_statistics(data_block)?,
43+
block_column_statistics: column_statistic::gen_columns_statistics(
44+
data_block,
45+
column_distinct_count,
46+
)?,
4347
block_cluster_statistics: cluster_stats,
4448
})
4549
}

src/query/storages/fuse/fuse/src/statistics/column_statistic.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617

1718
use common_datablocks::DataBlock;
1819
use common_datavalues::Column;
20+
use common_datavalues::ColumnRef;
1921
use common_datavalues::ColumnWithField;
2022
use common_datavalues::DataField;
2123
use common_datavalues::DataValue;
@@ -39,16 +41,19 @@ pub fn calc_column_distinct_of_values(
3941
distinct_values.get(0).as_u64()
4042
}
4143

42-
pub fn get_traverse_columns_dfs(data_block: &DataBlock) -> Result<Vec<Arc<dyn Column>>> {
44+
pub fn get_traverse_columns_dfs(data_block: &DataBlock) -> Result<Vec<(Option<usize>, ColumnRef)>> {
4345
traverse::traverse_columns_dfs(data_block.columns())
4446
}
4547

46-
pub fn gen_columns_statistics(data_block: &DataBlock) -> Result<StatisticsOfColumns> {
48+
pub fn gen_columns_statistics(
49+
data_block: &DataBlock,
50+
column_distinct_count: Option<HashMap<usize, usize>>,
51+
) -> Result<StatisticsOfColumns> {
4752
let mut statistics = StatisticsOfColumns::new();
4853

4954
let leaves = traverse::traverse_columns_dfs(data_block.columns())?;
5055

51-
for (idx, col) in leaves.iter().enumerate() {
56+
for (idx, (col_idx, col)) in leaves.iter().enumerate() {
5257
let col_data_type = col.data_type();
5358
if !MinMaxIndex::is_supported_type(&col_data_type) {
5459
continue;
@@ -81,7 +86,17 @@ pub fn gen_columns_statistics(data_block: &DataBlock) -> Result<StatisticsOfColu
8186
}
8287
}
8388

84-
let distinct_of_values = calc_column_distinct_of_values(col, column_field)?;
89+
// use distinct count calculated by the xor hash function to avoid repetitive operation.
90+
let distinct_of_values = match (col_idx, &column_distinct_count) {
91+
(Some(col_idx), Some(ref column_distinct_count)) => {
92+
if let Some(value) = column_distinct_count.get(col_idx) {
93+
*value as u64
94+
} else {
95+
calc_column_distinct_of_values(col, column_field)?
96+
}
97+
}
98+
(_, _) => calc_column_distinct_of_values(col, column_field)?,
99+
};
85100

86101
let (is_all_null, bitmap) = col.validity();
87102
let unset_bits = match (is_all_null, bitmap) {
@@ -113,25 +128,29 @@ pub mod traverse {
113128
use super::*;
114129

115130
// traverses columns and collects the leaves in depth first manner
116-
pub fn traverse_columns_dfs(columns: &[ColumnRef]) -> Result<Vec<ColumnRef>> {
131+
pub fn traverse_columns_dfs(columns: &[ColumnRef]) -> Result<Vec<(Option<usize>, ColumnRef)>> {
117132
let mut leaves = vec![];
118-
for f in columns {
119-
traverse_recursive(f, &mut leaves)?;
133+
for (idx, col) in columns.iter().enumerate() {
134+
traverse_recursive(Some(idx), col, &mut leaves)?;
120135
}
121136
Ok(leaves)
122137
}
123138

124-
fn traverse_recursive(column: &ColumnRef, leaves: &mut Vec<ColumnRef>) -> Result<()> {
139+
fn traverse_recursive(
140+
idx: Option<usize>,
141+
column: &ColumnRef,
142+
leaves: &mut Vec<(Option<usize>, ColumnRef)>,
143+
) -> Result<()> {
125144
match column.data_type() {
126145
DataTypeImpl::Struct(_) => {
127146
let full_column = column.convert_full_column();
128147
let struct_col: &StructColumn = Series::check_get(&full_column)?;
129-
for f in struct_col.values() {
130-
traverse_recursive(f, leaves)?
148+
for col in struct_col.values() {
149+
traverse_recursive(None, col, leaves)?
131150
}
132151
}
133152
_ => {
134-
leaves.push(column.clone());
153+
leaves.push((idx, column.clone()));
135154
}
136155
}
137156
Ok(())

src/query/storages/fuse/fuse/src/statistics/reducers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pub fn reduce_block_statistics<T: Borrow<StatisticsOfColumns>>(
9595

9696
let distinct_of_values = match data_block {
9797
Some(_data_block) => {
98-
if let Some(col) = leaves.as_ref().unwrap().get(*id as usize) {
98+
if let Some((_, col)) = leaves.as_ref().unwrap().get(*id as usize) {
9999
let col_data_type = col.data_type();
100100
let data_field = DataField::new("", col_data_type);
101101
let column_field = ColumnWithField::new(col.clone(), data_field);

0 commit comments

Comments
 (0)