Skip to content

Commit 1215ffd

Browse files
committed
fix virtual column builder
1 parent 7574cd4 commit 1215ffd

File tree

7 files changed

+347
-39
lines changed

7 files changed

+347
-39
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/storages/fuse/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ backoff = { workspace = true, features = ["futures", "tokio"] }
5050
bytes = { workspace = true }
5151
chrono = { workspace = true }
5252
enum-as-inner = { workspace = true }
53+
enum_dispatch = { workspace = true }
5354
fastrace = { workspace = true }
5455
futures = { workspace = true }
5556
futures-util = { workspace = true }

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use databend_common_expression::TableSchemaRef;
3535
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID;
3636
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
3737
use databend_common_native::write::NativeWriter;
38+
use databend_common_native::write::WriteOptions;
3839
use databend_common_sql::executor::physical_plans::MutationKind;
3940
use databend_storages_common_index::BloomIndex;
4041
use databend_storages_common_index::BloomIndexBuilder;
@@ -54,6 +55,7 @@ use crate::io::create_inverted_index_builders;
5455
use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder;
5556
use crate::io::write::stream::cluster_statistics::ClusterStatisticsState;
5657
use crate::io::write::stream::ColumnStatisticsState;
58+
use crate::io::write::stream::VirtualColumnWriter;
5759
use crate::io::write::InvertedIndexState;
5860
use crate::io::BlockSerialization;
5961
use crate::io::BloomIndexState;
@@ -189,7 +191,7 @@ impl StreamBlockBuilder {
189191
let writer = NativeWriter::new(
190192
buffer,
191193
properties.source_schema.as_ref().clone(),
192-
databend_common_native::write::WriteOptions {
194+
WriteOptions {
193195
default_compression: properties.write_settings.table_compression.into(),
194196
max_page_size: Some(properties.write_settings.max_page_size),
195197
default_compress_ratio,
@@ -234,11 +236,18 @@ impl StreamBlockBuilder {
234236
ClusterStatisticsState::new(properties.cluster_stats_builder.clone());
235237
let column_stats_state =
236238
ColumnStatisticsState::new(&properties.stats_columns, &properties.distinct_columns);
239+
let virtual_column_writer = properties.virtual_column_builder.as_ref().map(|builder| {
240+
VirtualColumnWriter::create(
241+
builder.clone(),
242+
properties.write_settings.table_compression,
243+
)
244+
});
237245

238246
Ok(StreamBlockBuilder {
239247
properties,
240248
block_writer,
241249
inverted_index_writers,
250+
virtual_column_writer,
242251
bloom_index_builder,
243252
virtual_column_builder,
244253
row_count: 0,
@@ -333,6 +342,12 @@ impl StreamBlockBuilder {
333342
None
334343
};
335344

345+
let virtual_column_state = if let Some(writer) = self.virtual_column_writer.take() {
346+
Some(writer.finalize(&block_location)?)
347+
} else {
348+
None
349+
};
350+
336351
let col_metas = self.block_writer.finish(&self.properties.source_schema)?;
337352
let block_raw_data = mem::take(self.block_writer.inner_mut());
338353

@@ -363,7 +378,10 @@ impl StreamBlockBuilder {
363378
compression: self.properties.write_settings.table_compression.into(),
364379
inverted_index_size,
365380
create_on: Some(Utc::now()),
366-
ngram_filter_index_size: None,
381+
ngram_filter_index_size: bloom_index_state
382+
.as_ref()
383+
.map(|v| v.ngram_size)
384+
.unwrap_or_default(),
367385
virtual_block_meta: None,
368386
};
369387
let serialized = BlockSerialization {
@@ -391,6 +409,7 @@ pub struct StreamBlockProperties {
391409
bloom_columns_map: BTreeMap<FieldIndex, TableField>,
392410
ngram_args: Vec<NgramArgs>,
393411
inverted_index_builders: Vec<InvertedIndexBuilder>,
412+
virtual_column_builder: Option<VirtualColumnBuilder>,
394413
table_meta_timestamps: TableMetaTimestamps,
395414
support_virtual_columns: bool,
396415
}
@@ -437,6 +456,15 @@ impl StreamBlockProperties {
437456
.collect::<HashSet<_>>();
438457

439458
let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta);
459+
let virtual_column_builder = if ctx
460+
.get_settings()
461+
.get_enable_refresh_virtual_column_after_write()
462+
.unwrap_or_default()
463+
{
464+
VirtualColumnBuilder::try_create(ctx.clone(), table, source_schema.clone()).ok()
465+
} else {
466+
None
467+
};
440468

441469
let cluster_stats_builder =
442470
ClusterStatisticsBuilder::try_create(table, ctx.clone(), &source_schema)?;
@@ -463,6 +491,7 @@ impl StreamBlockProperties {
463491
source_schema,
464492
write_settings,
465493
cluster_stats_builder,
494+
virtual_column_builder,
466495
stats_columns,
467496
distinct_columns,
468497
bloom_columns_map,

src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,56 +24,135 @@ use databend_common_expression::types::Decimal;
2424
use databend_common_expression::types::Decimal128Type;
2525
use databend_common_expression::types::Decimal256Type;
2626
use databend_common_expression::types::Decimal64Type;
27+
use databend_common_expression::types::Float32Type;
28+
use databend_common_expression::types::Float64Type;
29+
use databend_common_expression::types::Int16Type;
30+
use databend_common_expression::types::Int32Type;
31+
use databend_common_expression::types::Int64Type;
32+
use databend_common_expression::types::Int8Type;
2733
use databend_common_expression::types::NumberDataType;
28-
use databend_common_expression::types::NumberType;
2934
use databend_common_expression::types::StringType;
3035
use databend_common_expression::types::TimestampType;
36+
use databend_common_expression::types::UInt16Type;
37+
use databend_common_expression::types::UInt32Type;
38+
use databend_common_expression::types::UInt64Type;
39+
use databend_common_expression::types::UInt8Type;
3140
use databend_common_expression::types::ValueType;
32-
use databend_common_expression::with_number_mapped_type;
3341
use databend_common_expression::Column;
3442
use databend_common_expression::Scalar;
3543
use databend_common_expression::ScalarRef;
3644
use databend_common_expression::SELECTIVITY_THRESHOLD;
3745
use databend_storages_common_table_meta::meta::ColumnStatistics;
46+
use enum_dispatch::enum_dispatch;
3847

3948
use crate::statistics::Trim;
4049

41-
pub trait ColumnStatisticsBuilder: Send + Sync {
42-
fn update_column(&mut self, column: &Column);
50+
pub type CommonBuilder<T> = GenericColumnStatisticsBuilder<T, CommonAdapter>;
51+
pub type DecimalBuilder<T> = GenericColumnStatisticsBuilder<T, DecimalAdapter>;
52+
53+
#[enum_dispatch(ColumnStatsOps)]
54+
pub enum ColumnStatisticsBuilder {
55+
Int8(CommonBuilder<Int8Type>),
56+
Int16(CommonBuilder<Int16Type>),
57+
Int32(CommonBuilder<Int32Type>),
58+
Int64(CommonBuilder<Int64Type>),
59+
UInt8(CommonBuilder<UInt8Type>),
60+
UInt16(CommonBuilder<UInt16Type>),
61+
UInt32(CommonBuilder<UInt32Type>),
62+
UInt64(CommonBuilder<UInt64Type>),
63+
Float32(CommonBuilder<Float32Type>),
64+
Float64(CommonBuilder<Float64Type>),
65+
String(CommonBuilder<StringType>),
66+
Date(CommonBuilder<DateType>),
67+
Timestamp(CommonBuilder<TimestampType>),
68+
Decimal64(DecimalBuilder<Decimal64Type>),
69+
Decimal128(DecimalBuilder<Decimal128Type>),
70+
Decimal256(DecimalBuilder<Decimal256Type>),
71+
}
4372

73+
#[enum_dispatch]
74+
pub trait ColumnStatsOps {
75+
fn update_column(&mut self, column: &Column);
4476
fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType);
77+
fn finalize(self) -> Result<ColumnStatistics>;
78+
}
79+
80+
impl<T, A> ColumnStatsOps for GenericColumnStatisticsBuilder<T, A>
81+
where
82+
T: ValueType + Send + Sync,
83+
T::Scalar: Send + Sync,
84+
A: ColumnStatisticsAdapter<T> + 'static,
85+
for<'a, 'b> T::ScalarRef<'a>: PartialOrd<T::ScalarRef<'b>>,
86+
{
87+
fn update_column(&mut self, column: &Column) {
88+
GenericColumnStatisticsBuilder::update_column(self, column);
89+
}
90+
91+
fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType) {
92+
GenericColumnStatisticsBuilder::update_scalar(self, scalar, num_rows, data_type);
93+
}
94+
95+
fn finalize(self) -> Result<ColumnStatistics> {
96+
GenericColumnStatisticsBuilder::finalize(self)
97+
}
98+
}
4599

46-
fn finalize(self: Box<Self>) -> Result<ColumnStatistics>;
100+
macro_rules! create_builder_for_type {
101+
($data_type:expr, $variant:ident, $type:ty) => {
102+
ColumnStatisticsBuilder::$variant(CommonBuilder::<$type>::create($data_type))
103+
};
104+
($data_type:expr, $variant:ident, $type:ty, decimal) => {
105+
ColumnStatisticsBuilder::$variant(DecimalBuilder::<$type>::create($data_type))
106+
};
47107
}
48108

49-
pub fn create_column_stats_builder(data_type: &DataType) -> Box<dyn ColumnStatisticsBuilder> {
109+
pub fn create_column_stats_builder(data_type: &DataType) -> ColumnStatisticsBuilder {
50110
let inner_type = data_type.remove_nullable();
51-
with_number_mapped_type!(|NUM_TYPE| match inner_type {
52-
DataType::Number(NumberDataType::NUM_TYPE) => {
53-
GenericColumnStatisticsBuilder::<NumberType<NUM_TYPE>, CommonAdapter>::create(
54-
inner_type,
55-
)
111+
match inner_type {
112+
DataType::Number(NumberDataType::Int8) => {
113+
create_builder_for_type!(inner_type, Int8, Int8Type)
114+
}
115+
DataType::Number(NumberDataType::Int16) => {
116+
create_builder_for_type!(inner_type, Int16, Int16Type)
117+
}
118+
DataType::Number(NumberDataType::Int32) => {
119+
create_builder_for_type!(inner_type, Int32, Int32Type)
120+
}
121+
DataType::Number(NumberDataType::Int64) => {
122+
create_builder_for_type!(inner_type, Int64, Int64Type)
56123
}
57-
DataType::String => {
58-
GenericColumnStatisticsBuilder::<StringType, CommonAdapter>::create(inner_type)
124+
DataType::Number(NumberDataType::UInt8) => {
125+
create_builder_for_type!(inner_type, UInt8, UInt8Type)
59126
}
60-
DataType::Date => {
61-
GenericColumnStatisticsBuilder::<DateType, CommonAdapter>::create(inner_type)
127+
DataType::Number(NumberDataType::UInt16) => {
128+
create_builder_for_type!(inner_type, UInt16, UInt16Type)
62129
}
63-
DataType::Timestamp => {
64-
GenericColumnStatisticsBuilder::<TimestampType, CommonAdapter>::create(inner_type)
130+
DataType::Number(NumberDataType::UInt32) => {
131+
create_builder_for_type!(inner_type, UInt32, UInt32Type)
65132
}
133+
DataType::Number(NumberDataType::UInt64) => {
134+
create_builder_for_type!(inner_type, UInt64, UInt64Type)
135+
}
136+
DataType::Number(NumberDataType::Float32) => {
137+
create_builder_for_type!(inner_type, Float32, Float32Type)
138+
}
139+
DataType::Number(NumberDataType::Float64) => {
140+
create_builder_for_type!(inner_type, Float64, Float64Type)
141+
}
142+
DataType::String => create_builder_for_type!(inner_type, String, StringType),
143+
DataType::Date => create_builder_for_type!(inner_type, Date, DateType),
144+
DataType::Timestamp => create_builder_for_type!(inner_type, Timestamp, TimestampType),
66145
DataType::Decimal(size) => {
67146
if size.can_carried_by_64() {
68-
GenericColumnStatisticsBuilder::<Decimal64Type, DecimalAdapter>::create(inner_type)
147+
create_builder_for_type!(inner_type, Decimal64, Decimal64Type, decimal)
69148
} else if size.can_carried_by_128() {
70-
GenericColumnStatisticsBuilder::<Decimal128Type, DecimalAdapter>::create(inner_type)
149+
create_builder_for_type!(inner_type, Decimal128, Decimal128Type, decimal)
71150
} else {
72-
GenericColumnStatisticsBuilder::<Decimal256Type, DecimalAdapter>::create(inner_type)
151+
create_builder_for_type!(inner_type, Decimal256, Decimal256Type, decimal)
73152
}
74153
}
75154
_ => unreachable!("Unsupported data type: {:?}", data_type),
76-
})
155+
}
77156
}
78157

79158
pub trait ColumnStatisticsAdapter<T: ValueType>: Send + Sync {
@@ -86,7 +165,7 @@ pub trait ColumnStatisticsAdapter<T: ValueType>: Send + Sync {
86165
fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering);
87166
}
88167

89-
struct CommonAdapter;
168+
pub struct CommonAdapter;
90169

91170
impl<T> ColumnStatisticsAdapter<T> for CommonAdapter
92171
where
@@ -111,7 +190,7 @@ where
111190
}
112191
}
113192

114-
struct DecimalAdapter;
193+
pub struct DecimalAdapter;
115194

116195
impl<T> ColumnStatisticsAdapter<T> for DecimalAdapter
117196
where
@@ -137,7 +216,7 @@ where
137216
}
138217
}
139218

140-
struct GenericColumnStatisticsBuilder<T, A>
219+
pub struct GenericColumnStatisticsBuilder<T, A>
141220
where
142221
T: ValueType,
143222
A: ColumnStatisticsAdapter<T>,
@@ -158,15 +237,15 @@ where
158237
A: ColumnStatisticsAdapter<T> + 'static,
159238
for<'a, 'b> T::ScalarRef<'a>: PartialOrd<T::ScalarRef<'b>>,
160239
{
161-
fn create(data_type: DataType) -> Box<dyn ColumnStatisticsBuilder> {
162-
Box::new(Self {
240+
fn create(data_type: DataType) -> Self {
241+
Self {
163242
min: None,
164243
max: None,
165244
null_count: 0,
166245
in_memory_size: 0,
167246
data_type,
168247
_phantom: PhantomData,
169-
})
248+
}
170249
}
171250

172251
fn add_batch<'a, I>(&mut self, mut iter: I)
@@ -201,15 +280,7 @@ where
201280
self.max = Some(A::scalar_to_value(max));
202281
}
203282
}
204-
}
205283

206-
impl<T, A> ColumnStatisticsBuilder for GenericColumnStatisticsBuilder<T, A>
207-
where
208-
T: ValueType + Send + Sync,
209-
T::Scalar: Send + Sync,
210-
A: ColumnStatisticsAdapter<T> + 'static,
211-
for<'a, 'b> T::ScalarRef<'a>: PartialOrd<T::ScalarRef<'b>>,
212-
{
213284
fn update_column(&mut self, column: &Column) {
214285
self.in_memory_size += column.memory_size();
215286
if column.len() == 0 {
@@ -265,7 +336,7 @@ where
265336
self.add(val.clone(), val);
266337
}
267338

268-
fn finalize(self: Box<Self>) -> Result<ColumnStatistics> {
339+
fn finalize(self) -> Result<ColumnStatistics> {
269340
let min = if let Some(v) = self.min {
270341
let v = A::value_to_scalar(v);
271342
// safe upwrap.

src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ use crate::io::write::stream::create_column_ndv_estimator;
2626
use crate::io::write::stream::create_column_stats_builder;
2727
use crate::io::write::stream::ColumnNDVEstimator;
2828
use crate::io::write::stream::ColumnStatisticsBuilder;
29+
use crate::io::write::stream::ColumnStatsOps;
2930
use crate::statistics::traverse_values_dfs;
3031

3132
pub struct ColumnStatisticsState {
32-
col_stats: HashMap<ColumnId, Box<dyn ColumnStatisticsBuilder>>,
33+
col_stats: HashMap<ColumnId, ColumnStatisticsBuilder>,
3334
distinct_columns: HashMap<ColumnId, Box<dyn ColumnNDVEstimator>>,
3435
}
3536

src/query/storages/fuse/src/io/write/stream/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@ mod cluster_statistics;
1717
mod column_ndv_estimator;
1818
mod column_statistics_builder;
1919
mod column_statistics_state;
20+
mod virtual_column_builder;
2021

2122
pub(crate) use block_builder::StreamBlockBuilder;
2223
pub(crate) use block_builder::StreamBlockProperties;
2324
pub(crate) use column_ndv_estimator::create_column_ndv_estimator;
2425
pub(crate) use column_ndv_estimator::ColumnNDVEstimator;
2526
pub(crate) use column_statistics_builder::create_column_stats_builder;
2627
pub(crate) use column_statistics_builder::ColumnStatisticsBuilder;
28+
pub(crate) use column_statistics_builder::ColumnStatsOps;
2729
pub(crate) use column_statistics_state::ColumnStatisticsState;
30+
pub(crate) use virtual_column_builder::VirtualColumnWriter;

0 commit comments

Comments
 (0)