Skip to content

Commit 7590fd9

Browse files
committed
add column ndv estimator
1 parent 5761706 commit 7590fd9

File tree

2 files changed

+97
-88
lines changed

2 files changed

+97
-88
lines changed

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ pub struct StreamBlockProperties {
359359

360360
cluster_stats_builder: Arc<ClusterStatisticsBuilder>,
361361
stats_columns: Vec<ColumnId>,
362-
distinct_columns: Vec<ColumnId>,
362+
distinct_columns: Vec<(ColumnId, DataType)>,
363363
bloom_columns_map: BTreeMap<FieldIndex, TableField>,
364364
ngram_args: Vec<NgramArgs>,
365365
inverted_index_builders: Vec<InvertedIndexBuilder>,
@@ -414,12 +414,12 @@ impl StreamBlockProperties {
414414
let leaf_fields = source_schema.leaf_fields();
415415
for field in leaf_fields.iter() {
416416
let column_id = field.column_id();
417-
if RangeIndex::supported_type(&DataType::from(field.data_type()))
418-
&& column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID
417+
let data_type = DataType::from(field.data_type());
418+
if RangeIndex::supported_type(&data_type) && column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID
419419
{
420420
stats_columns.push(column_id);
421421
if !bloom_column_ids.contains(&column_id) {
422-
distinct_columns.push(column_id);
422+
distinct_columns.push((column_id, data_type));
423423
}
424424
}
425425
}

src/query/storages/fuse/src/io/write/stream/column_statistics.rs

Lines changed: 93 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,20 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::hash::Hash;
17+
use std::marker::PhantomData;
1618

1719
use databend_common_exception::Result;
18-
use databend_common_expression::types::AccessType;
20+
use databend_common_expression::types::boolean::TrueIdxIter;
1921
use databend_common_expression::types::DataType;
2022
use databend_common_expression::types::DateType;
21-
use databend_common_expression::types::DecimalColumn;
22-
use databend_common_expression::types::DecimalScalar;
23+
use databend_common_expression::types::Decimal128Type;
24+
use databend_common_expression::types::Decimal256Type;
2325
use databend_common_expression::types::NumberDataType;
2426
use databend_common_expression::types::NumberType;
2527
use databend_common_expression::types::StringType;
2628
use databend_common_expression::types::TimestampType;
29+
use databend_common_expression::types::ValueType;
2730
use databend_common_expression::with_number_mapped_type;
2831
use databend_common_expression::Column;
2932
use databend_common_expression::ColumnId;
@@ -32,6 +35,7 @@ use databend_common_expression::Scalar;
3235
use databend_common_expression::ScalarRef;
3336
use databend_common_expression::TableSchemaRef;
3437
use databend_common_expression::Value;
38+
use databend_common_expression::SELECTIVITY_THRESHOLD;
3539
use databend_common_functions::aggregates::eval_aggr;
3640
use databend_storages_common_table_meta::meta::ColumnDistinctHLL;
3741
use databend_storages_common_table_meta::meta::ColumnStatistics;
@@ -43,19 +47,19 @@ use crate::statistics::Trim;
4347

4448
pub struct ColumnStatisticsState {
4549
col_stats: HashMap<ColumnId, Vec<ColumnStatistics>>,
46-
distinct_columns: HashMap<ColumnId, ColumnDistinctHLL>,
50+
distinct_columns: HashMap<ColumnId, Box<dyn ColumnNDVEstimator>>,
4751
}
4852

4953
impl ColumnStatisticsState {
50-
pub fn new(stats_columns: &[ColumnId], distinct_columns: &[ColumnId]) -> Self {
54+
pub fn new(stats_columns: &[ColumnId], distinct_columns: &[(ColumnId, DataType)]) -> Self {
5155
let col_stats = stats_columns
5256
.iter()
5357
.map(|&col_id| (col_id, Vec::new()))
5458
.collect();
5559

5660
let distinct_columns = distinct_columns
5761
.iter()
58-
.map(|&col_id| (col_id, ColumnDistinctHLL::default()))
62+
.map(|(col_id, data_type)| (*col_id, create_estimator(data_type)))
5963
.collect();
6064

6165
Self {
@@ -80,8 +84,8 @@ impl ColumnStatisticsState {
8084
in_memory_size as u64,
8185
None,
8286
);
83-
if let Some(hll) = self.distinct_columns.get_mut(&column_id) {
84-
scalar_update_hll_cardinality(&s.as_ref(), &data_type, hll);
87+
if let Some(estimator) = self.distinct_columns.get_mut(&column_id) {
88+
estimator.update_scalar(&s.as_ref());
8589
}
8690
self.col_stats.get_mut(&column_id).unwrap().push(col_stats);
8791
}
@@ -128,8 +132,8 @@ impl ColumnStatisticsState {
128132
self.col_stats.get_mut(&column_id).unwrap().push(col_stats);
129133

130134
// use distinct count calculated by the xor hash function to avoid repetitive operation.
131-
if let Some(hll) = self.distinct_columns.get_mut(&column_id) {
132-
column_update_hll_cardinality(&col, &data_type, hll);
135+
if let Some(estimator) = self.distinct_columns.get_mut(&column_id) {
136+
estimator.update_column(&col);
133137
}
134138
}
135139
}
@@ -146,102 +150,107 @@ impl ColumnStatisticsState {
146150
let mut col_stats = reduce_column_statistics(stats);
147151
if let Some(count) = column_distinct_count.get(id) {
148152
col_stats.distinct_of_values = Some(*count as u64);
149-
} else if let Some(hll) = self.distinct_columns.get(id) {
150-
col_stats.distinct_of_values = Some(hll.count() as u64);
153+
} else if let Some(estimator) = self.distinct_columns.get(id) {
154+
col_stats.distinct_of_values = Some(estimator.finalize());
151155
}
152156
statistics.insert(*id, col_stats);
153157
}
154158
Ok(statistics)
155159
}
156160
}
157161

158-
fn column_update_hll_cardinality(col: &Column, ty: &DataType, hll: &mut ColumnDistinctHLL) {
159-
if let DataType::Nullable(inner) = ty {
160-
let col = col.as_nullable().unwrap();
161-
for (i, v) in col.validity.iter().enumerate() {
162-
if v {
163-
let scalar = unsafe { col.column.index_unchecked(i) };
164-
scalar_update_hll_cardinality(&scalar, inner, hll);
165-
}
166-
}
167-
return;
168-
}
162+
pub trait ColumnNDVEstimator: Send + Sync {
163+
fn update_column(&mut self, column: &Column);
164+
fn update_scalar(&mut self, scalar: &ScalarRef);
165+
fn finalize(&self) -> u64;
166+
}
169167

170-
with_number_mapped_type!(|NUM_TYPE| match ty {
168+
pub fn create_estimator(data_type: &DataType) -> Box<dyn ColumnNDVEstimator> {
169+
let inner_type = data_type.remove_nullable();
170+
with_number_mapped_type!(|NUM_TYPE| match inner_type {
171171
DataType::Number(NumberDataType::NUM_TYPE) => {
172-
let col = NumberType::<NUM_TYPE>::try_downcast_column(col).unwrap();
173-
for v in col.iter() {
174-
hll.add_object(v);
175-
}
172+
ColumnNDVEstimatorImpl::<NumberType<NUM_TYPE>>::create()
176173
}
177174
DataType::String => {
178-
let col = StringType::try_downcast_column(col).unwrap();
179-
for v in col.iter() {
180-
hll.add_object(&v);
181-
}
175+
ColumnNDVEstimatorImpl::<StringType>::create()
182176
}
183177
DataType::Date => {
184-
let col = DateType::try_downcast_column(col).unwrap();
185-
for v in col.iter() {
186-
hll.add_object(v);
187-
}
178+
ColumnNDVEstimatorImpl::<DateType>::create()
188179
}
189180
DataType::Timestamp => {
190-
let col = TimestampType::try_downcast_column(col).unwrap();
191-
for v in col.iter() {
192-
hll.add_object(v);
193-
}
181+
ColumnNDVEstimatorImpl::<TimestampType>::create()
182+
}
183+
DataType::Decimal(s) if s.can_carried_by_128() => {
184+
ColumnNDVEstimatorImpl::<Decimal128Type>::create()
194185
}
195186
DataType::Decimal(_) => {
196-
match col {
197-
Column::Decimal(DecimalColumn::Decimal128(col, _)) => {
198-
for v in col.iter() {
199-
hll.add_object(v);
200-
}
201-
}
202-
Column::Decimal(DecimalColumn::Decimal256(col, _)) => {
203-
for v in col.iter() {
204-
hll.add_object(v);
205-
}
206-
}
207-
_ => unreachable!(),
208-
};
187+
ColumnNDVEstimatorImpl::<Decimal256Type>::create()
209188
}
210-
_ => unreachable!("Unsupported data type: {:?}", ty),
211-
});
189+
_ => unreachable!("Unsupported data type: {:?}", data_type),
190+
})
212191
}
213192

214-
fn scalar_update_hll_cardinality(scalar: &ScalarRef, ty: &DataType, hll: &mut ColumnDistinctHLL) {
215-
if matches!(scalar, ScalarRef::Null) {
216-
return;
217-
}
193+
pub struct ColumnNDVEstimatorImpl<T>
194+
where
195+
T: ValueType + Send + Sync,
196+
T::Scalar: Hash,
197+
{
198+
hll: ColumnDistinctHLL,
199+
_phantom: PhantomData<T>,
200+
}
218201

219-
let ty = ty.remove_nullable();
202+
impl<T> ColumnNDVEstimatorImpl<T>
203+
where
204+
T: ValueType + Send + Sync,
205+
T::Scalar: Hash,
206+
{
207+
pub fn create() -> Box<dyn ColumnNDVEstimator> {
208+
Box::new(Self {
209+
hll: ColumnDistinctHLL::new(),
210+
_phantom: Default::default(),
211+
})
212+
}
213+
}
220214

221-
with_number_mapped_type!(|NUM_TYPE| match ty {
222-
DataType::Number(NumberDataType::NUM_TYPE) => {
223-
let val = NumberType::<NUM_TYPE>::try_downcast_scalar(scalar).unwrap();
224-
hll.add_object(&val);
225-
}
226-
DataType::String => {
227-
let val = StringType::try_downcast_scalar(scalar).unwrap();
228-
hll.add_object(&val);
229-
}
230-
DataType::Date => {
231-
let val = DateType::try_downcast_scalar(scalar).unwrap();
232-
hll.add_object(&val);
233-
}
234-
DataType::Timestamp => {
235-
let val = TimestampType::try_downcast_scalar(scalar).unwrap();
236-
hll.add_object(&val);
237-
}
238-
DataType::Decimal(_) => {
239-
match scalar {
240-
ScalarRef::Decimal(DecimalScalar::Decimal128(v, _)) => hll.add_object(&v),
241-
ScalarRef::Decimal(DecimalScalar::Decimal256(v, _)) => hll.add_object(&v),
242-
_ => unreachable!(),
215+
impl<T> ColumnNDVEstimator for ColumnNDVEstimatorImpl<T>
216+
where
217+
T: ValueType + Send + Sync,
218+
T::Scalar: Hash,
219+
{
220+
fn update_column(&mut self, column: &Column) {
221+
if let Column::Nullable(box inner) = column {
222+
let validity_len = inner.validity.len();
223+
let column = T::try_downcast_column(&inner.column).unwrap();
224+
if inner.validity.true_count() as f64 / validity_len as f64 >= SELECTIVITY_THRESHOLD {
225+
for (data, valid) in T::iter_column(&column).zip(inner.validity.iter()) {
226+
if valid {
227+
self.hll.add_object(&T::to_owned_scalar(data));
228+
}
229+
}
230+
} else {
231+
TrueIdxIter::new(validity_len, Some(&inner.validity)).for_each(|idx| {
232+
let val = unsafe { T::index_column_unchecked(&column, idx) };
233+
self.hll.add_object(&T::to_owned_scalar(val));
234+
})
235+
}
236+
} else {
237+
let column = T::try_downcast_column(column).unwrap();
238+
for value in T::iter_column(&column) {
239+
self.hll.add_object(&T::to_owned_scalar(value));
243240
}
244241
}
245-
_ => unreachable!("Unsupported data type: {:?}", ty),
246-
});
242+
}
243+
244+
fn update_scalar(&mut self, scalar: &ScalarRef) {
245+
if matches!(scalar, ScalarRef::Null) {
246+
return;
247+
}
248+
249+
let val = T::try_downcast_scalar(scalar).unwrap();
250+
self.hll.add_object(&T::to_owned_scalar(val));
251+
}
252+
253+
fn finalize(&self) -> u64 {
254+
self.hll.count() as u64
255+
}
247256
}

0 commit comments

Comments
 (0)