Skip to content

Commit 1404cc7

Browse files
committed
improve stream write
1 parent 3e6445f commit 1404cc7

File tree

6 files changed

+558
-263
lines changed

6 files changed

+558
-263
lines changed

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use databend_common_expression::TableSchemaRef;
3535
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID;
3636
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
3737
use databend_common_native::write::NativeWriter;
38+
use databend_common_sql::executor::physical_plans::MutationKind;
3839
use databend_storages_common_index::BloomIndex;
3940
use databend_storages_common_index::BloomIndexBuilder;
4041
use databend_storages_common_index::Index;
@@ -52,7 +53,7 @@ use parquet::file::properties::WriterProperties;
5253
use crate::io::create_inverted_index_builders;
5354
use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder;
5455
use crate::io::write::stream::cluster_statistics::ClusterStatisticsState;
55-
use crate::io::write::stream::column_statistics::ColumnStatisticsState;
56+
use crate::io::write::stream::ColumnStatisticsState;
5657
use crate::io::write::InvertedIndexState;
5758
use crate::io::BlockSerialization;
5859
use crate::io::BloomIndexState;
@@ -235,7 +236,7 @@ impl StreamBlockBuilder {
235236
pub fn need_flush(&self) -> bool {
236237
let file_size = self.block_writer.compressed_size();
237238
self.row_count >= self.properties.block_thresholds.min_rows_per_block
238-
|| self.block_size >= self.properties.block_thresholds.max_bytes_per_block
239+
|| self.block_size >= self.properties.block_thresholds.min_bytes_per_block * 2
239240
|| (file_size >= self.properties.block_thresholds.min_compressed_per_block
240241
&& self.block_size >= self.properties.block_thresholds.min_bytes_per_block)
241242
}
@@ -355,8 +356,8 @@ pub struct StreamBlockProperties {
355356
source_schema: TableSchemaRef,
356357

357358
cluster_stats_builder: Arc<ClusterStatisticsBuilder>,
358-
stats_columns: Vec<ColumnId>,
359-
distinct_columns: Vec<ColumnId>,
359+
stats_columns: Vec<(ColumnId, DataType)>,
360+
distinct_columns: Vec<(ColumnId, DataType)>,
360361
bloom_columns_map: BTreeMap<FieldIndex, TableField>,
361362
ngram_args: Vec<NgramArgs>,
362363
inverted_index_builders: Vec<InvertedIndexBuilder>,
@@ -367,16 +368,23 @@ impl StreamBlockProperties {
367368
pub fn try_create(
368369
ctx: Arc<dyn TableContext>,
369370
table: &FuseTable,
371+
kind: MutationKind,
370372
table_meta_timestamps: TableMetaTimestamps,
371373
) -> Result<Arc<Self>> {
372374
// remove virtual computed fields.
373-
let fields = table
375+
let mut fields = table
374376
.schema()
375377
.fields()
376378
.iter()
377379
.filter(|f| !matches!(f.computed_expr(), Some(ComputedExpr::Virtual(_))))
378380
.cloned()
379381
.collect::<Vec<_>>();
382+
if !matches!(kind, MutationKind::Insert | MutationKind::Replace) {
383+
// add stream fields.
384+
for stream_column in table.stream_columns().iter() {
385+
fields.push(stream_column.table_field());
386+
}
387+
}
380388

381389
let source_schema = Arc::new(TableSchema {
382390
fields,
@@ -407,12 +415,12 @@ impl StreamBlockProperties {
407415
let leaf_fields = source_schema.leaf_fields();
408416
for field in leaf_fields.iter() {
409417
let column_id = field.column_id();
410-
if RangeIndex::supported_type(&DataType::from(field.data_type()))
411-
&& column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID
418+
let data_type = DataType::from(field.data_type());
419+
if RangeIndex::supported_type(&data_type) && column_id != ORIGIN_BLOCK_ROW_NUM_COLUMN_ID
412420
{
413-
stats_columns.push(column_id);
421+
stats_columns.push((column_id, data_type.clone()));
414422
if !bloom_column_ids.contains(&column_id) {
415-
distinct_columns.push(column_id);
423+
distinct_columns.push((column_id, data_type));
416424
}
417425
}
418426
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::hash::Hash;
16+
use std::marker::PhantomData;
17+
18+
use databend_common_expression::types::boolean::TrueIdxIter;
19+
use databend_common_expression::types::DataType;
20+
use databend_common_expression::types::DateType;
21+
use databend_common_expression::types::Decimal128Type;
22+
use databend_common_expression::types::Decimal256Type;
23+
use databend_common_expression::types::Decimal64Type;
24+
use databend_common_expression::types::NumberDataType;
25+
use databend_common_expression::types::NumberType;
26+
use databend_common_expression::types::StringType;
27+
use databend_common_expression::types::TimestampType;
28+
use databend_common_expression::types::ValueType;
29+
use databend_common_expression::with_number_mapped_type;
30+
use databend_common_expression::Column;
31+
use databend_common_expression::ScalarRef;
32+
use databend_common_expression::SELECTIVITY_THRESHOLD;
33+
use databend_storages_common_table_meta::meta::ColumnDistinctHLL;
34+
35+
pub trait ColumnNDVEstimator: Send + Sync {
36+
fn update_column(&mut self, column: &Column);
37+
fn update_scalar(&mut self, scalar: &ScalarRef);
38+
fn finalize(&self) -> u64;
39+
}
40+
41+
pub fn create_column_ndv_estimator(data_type: &DataType) -> Box<dyn ColumnNDVEstimator> {
42+
let inner_type = data_type.remove_nullable();
43+
with_number_mapped_type!(|NUM_TYPE| match inner_type {
44+
DataType::Number(NumberDataType::NUM_TYPE) => {
45+
ColumnNDVEstimatorImpl::<NumberType<NUM_TYPE>>::create()
46+
}
47+
DataType::String => {
48+
ColumnNDVEstimatorImpl::<StringType>::create()
49+
}
50+
DataType::Date => {
51+
ColumnNDVEstimatorImpl::<DateType>::create()
52+
}
53+
DataType::Timestamp => {
54+
ColumnNDVEstimatorImpl::<TimestampType>::create()
55+
}
56+
DataType::Decimal(size) => {
57+
if size.can_carried_by_64() {
58+
ColumnNDVEstimatorImpl::<Decimal64Type>::create()
59+
} else if size.can_carried_by_128() {
60+
ColumnNDVEstimatorImpl::<Decimal128Type>::create()
61+
} else {
62+
ColumnNDVEstimatorImpl::<Decimal256Type>::create()
63+
}
64+
}
65+
_ => unreachable!("Unsupported data type: {:?}", data_type),
66+
})
67+
}
68+
69+
pub struct ColumnNDVEstimatorImpl<T>
70+
where
71+
T: ValueType + Send + Sync,
72+
for<'a> T::ScalarRef<'a>: Hash,
73+
{
74+
hll: ColumnDistinctHLL,
75+
_phantom: PhantomData<T>,
76+
}
77+
78+
impl<T> ColumnNDVEstimatorImpl<T>
79+
where
80+
T: ValueType + Send + Sync,
81+
for<'a> T::ScalarRef<'a>: Hash,
82+
{
83+
pub fn create() -> Box<dyn ColumnNDVEstimator> {
84+
Box::new(Self {
85+
hll: ColumnDistinctHLL::new(),
86+
_phantom: Default::default(),
87+
})
88+
}
89+
}
90+
91+
impl<T> ColumnNDVEstimator for ColumnNDVEstimatorImpl<T>
92+
where
93+
T: ValueType + Send + Sync,
94+
for<'a> T::ScalarRef<'a>: Hash,
95+
{
96+
fn update_column(&mut self, column: &Column) {
97+
let (column, validity) = if let Column::Nullable(box inner) = column {
98+
let validity = if inner.validity.null_count() == 0 {
99+
None
100+
} else {
101+
Some(&inner.validity)
102+
};
103+
(&inner.column, validity)
104+
} else {
105+
(column, None)
106+
};
107+
108+
let column = T::try_downcast_column(column).unwrap();
109+
if let Some(v) = validity {
110+
if v.true_count() as f64 / v.len() as f64 >= SELECTIVITY_THRESHOLD {
111+
for (data, valid) in T::iter_column(&column).zip(v.iter()) {
112+
if valid {
113+
self.hll.add_object(&data);
114+
}
115+
}
116+
} else {
117+
TrueIdxIter::new(v.len(), Some(v)).for_each(|idx| {
118+
let val = unsafe { T::index_column_unchecked(&column, idx) };
119+
self.hll.add_object(&val);
120+
})
121+
}
122+
} else {
123+
for value in T::iter_column(&column) {
124+
self.hll.add_object(&value);
125+
}
126+
}
127+
}
128+
129+
fn update_scalar(&mut self, scalar: &ScalarRef) {
130+
if matches!(scalar, ScalarRef::Null) {
131+
return;
132+
}
133+
134+
let val = T::try_downcast_scalar(scalar).unwrap();
135+
self.hll.add_object(&val);
136+
}
137+
138+
fn finalize(&self) -> u64 {
139+
self.hll.count() as u64
140+
}
141+
}

0 commit comments

Comments
 (0)