Skip to content

Commit 6fc4e0c

Browse files
committed
fix virtual column builder
1 parent ef16cfe commit 6fc4e0c

File tree

8 files changed

+458
-139
lines changed

8 files changed

+458
-139
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/storages/fuse/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ backoff = { workspace = true, features = ["futures", "tokio"] }
5050
bytes = { workspace = true }
5151
chrono = { workspace = true }
5252
enum-as-inner = { workspace = true }
53+
enum_dispatch = { workspace = true }
5354
fastrace = { workspace = true }
5455
futures = { workspace = true }
5556
futures-util = { workspace = true }

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use databend_common_expression::TableSchemaRef;
3535
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID;
3636
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
3737
use databend_common_native::write::NativeWriter;
38+
use databend_common_native::write::WriteOptions;
3839
use databend_common_sql::executor::physical_plans::MutationKind;
3940
use databend_storages_common_index::BloomIndex;
4041
use databend_storages_common_index::BloomIndexBuilder;
@@ -54,12 +55,14 @@ use crate::io::create_inverted_index_builders;
5455
use crate::io::write::stream::cluster_statistics::ClusterStatisticsBuilder;
5556
use crate::io::write::stream::cluster_statistics::ClusterStatisticsState;
5657
use crate::io::write::stream::ColumnStatisticsState;
58+
use crate::io::write::stream::VirtualColumnWriter;
5759
use crate::io::write::InvertedIndexState;
5860
use crate::io::BlockSerialization;
5961
use crate::io::BloomIndexState;
6062
use crate::io::InvertedIndexBuilder;
6163
use crate::io::InvertedIndexWriter;
6264
use crate::io::TableMetaLocationGenerator;
65+
use crate::io::VirtualColumnBuilder;
6366
use crate::io::WriteSettings;
6467
use crate::operations::column_parquet_metas;
6568
use crate::FuseStorageFormat;
@@ -149,6 +152,7 @@ pub struct StreamBlockBuilder {
149152
block_writer: BlockWriterImpl,
150153
inverted_index_writers: Vec<InvertedIndexWriter>,
151154
bloom_index_builder: BloomIndexBuilder,
155+
virtual_column_writer: Option<VirtualColumnWriter>,
152156

153157
cluster_stats_state: ClusterStatisticsState,
154158
column_stats_state: ColumnStatisticsState,
@@ -187,7 +191,7 @@ impl StreamBlockBuilder {
187191
let writer = NativeWriter::new(
188192
buffer,
189193
properties.source_schema.as_ref().clone(),
190-
databend_common_native::write::WriteOptions {
194+
WriteOptions {
191195
default_compression: properties.write_settings.table_compression.into(),
192196
max_page_size: Some(properties.write_settings.max_page_size),
193197
default_compress_ratio,
@@ -216,11 +220,18 @@ impl StreamBlockBuilder {
216220
ClusterStatisticsState::new(properties.cluster_stats_builder.clone());
217221
let column_stats_state =
218222
ColumnStatisticsState::new(&properties.stats_columns, &properties.distinct_columns);
223+
let virtual_column_writer = properties.virtual_column_builder.as_ref().map(|builder| {
224+
VirtualColumnWriter::create(
225+
builder.clone(),
226+
properties.write_settings.table_compression,
227+
)
228+
});
219229

220230
Ok(StreamBlockBuilder {
221231
properties,
222232
block_writer,
223233
inverted_index_writers,
234+
virtual_column_writer,
224235
bloom_index_builder,
225236
row_count: 0,
226237
block_size: 0,
@@ -257,6 +268,9 @@ impl StreamBlockBuilder {
257268
for writer in self.inverted_index_writers.iter_mut() {
258269
writer.add_block(&self.properties.source_schema, &block)?;
259270
}
271+
if let Some(writer) = self.virtual_column_writer.as_mut() {
272+
writer.add_block(&block)?;
273+
}
260274

261275
self.row_count += block.num_rows();
262276
self.block_size += block.estimate_block_size();
@@ -303,6 +317,12 @@ impl StreamBlockBuilder {
303317
inverted_index_states.push(inverted_index_state);
304318
}
305319

320+
let virtual_column_state = if let Some(writer) = self.virtual_column_writer.take() {
321+
Some(writer.finalize(&block_location)?)
322+
} else {
323+
None
324+
};
325+
306326
let col_metas = self.block_writer.finish(&self.properties.source_schema)?;
307327
let block_raw_data = mem::take(self.block_writer.inner_mut());
308328

@@ -333,15 +353,18 @@ impl StreamBlockBuilder {
333353
compression: self.properties.write_settings.table_compression.into(),
334354
inverted_index_size,
335355
create_on: Some(Utc::now()),
336-
ngram_filter_index_size: None,
356+
ngram_filter_index_size: bloom_index_state
357+
.as_ref()
358+
.map(|v| v.ngram_size)
359+
.unwrap_or_default(),
337360
virtual_block_meta: None,
338361
};
339362
let serialized = BlockSerialization {
340363
block_raw_data,
341364
block_meta,
342365
bloom_index_state,
343366
inverted_index_states,
344-
virtual_column_state: None,
367+
virtual_column_state,
345368
};
346369
Ok(serialized)
347370
}
@@ -361,6 +384,7 @@ pub struct StreamBlockProperties {
361384
bloom_columns_map: BTreeMap<FieldIndex, TableField>,
362385
ngram_args: Vec<NgramArgs>,
363386
inverted_index_builders: Vec<InvertedIndexBuilder>,
387+
virtual_column_builder: Option<VirtualColumnBuilder>,
364388
table_meta_timestamps: TableMetaTimestamps,
365389
}
366390

@@ -406,6 +430,15 @@ impl StreamBlockProperties {
406430
.collect::<HashSet<_>>();
407431

408432
let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta);
433+
let virtual_column_builder = if ctx
434+
.get_settings()
435+
.get_enable_refresh_virtual_column_after_write()
436+
.unwrap_or_default()
437+
{
438+
VirtualColumnBuilder::try_create(ctx.clone(), table, source_schema.clone()).ok()
439+
} else {
440+
None
441+
};
409442

410443
let cluster_stats_builder =
411444
ClusterStatisticsBuilder::try_create(table, ctx.clone(), &source_schema)?;
@@ -432,6 +465,7 @@ impl StreamBlockProperties {
432465
source_schema,
433466
write_settings,
434467
cluster_stats_builder,
468+
virtual_column_builder,
435469
stats_columns,
436470
distinct_columns,
437471
bloom_columns_map,

src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,56 +24,135 @@ use databend_common_expression::types::Decimal;
2424
use databend_common_expression::types::Decimal128Type;
2525
use databend_common_expression::types::Decimal256Type;
2626
use databend_common_expression::types::Decimal64Type;
27+
use databend_common_expression::types::Float32Type;
28+
use databend_common_expression::types::Float64Type;
29+
use databend_common_expression::types::Int16Type;
30+
use databend_common_expression::types::Int32Type;
31+
use databend_common_expression::types::Int64Type;
32+
use databend_common_expression::types::Int8Type;
2733
use databend_common_expression::types::NumberDataType;
28-
use databend_common_expression::types::NumberType;
2934
use databend_common_expression::types::StringType;
3035
use databend_common_expression::types::TimestampType;
36+
use databend_common_expression::types::UInt16Type;
37+
use databend_common_expression::types::UInt32Type;
38+
use databend_common_expression::types::UInt64Type;
39+
use databend_common_expression::types::UInt8Type;
3140
use databend_common_expression::types::ValueType;
32-
use databend_common_expression::with_number_mapped_type;
3341
use databend_common_expression::Column;
3442
use databend_common_expression::Scalar;
3543
use databend_common_expression::ScalarRef;
3644
use databend_common_expression::SELECTIVITY_THRESHOLD;
3745
use databend_storages_common_table_meta::meta::ColumnStatistics;
46+
use enum_dispatch::enum_dispatch;
3847

3948
use crate::statistics::Trim;
4049

41-
pub trait ColumnStatisticsBuilder: Send + Sync {
42-
fn update_column(&mut self, column: &Column);
50+
pub type CommonBuilder<T> = GenericColumnStatisticsBuilder<T, CommonAdapter>;
51+
pub type DecimalBuilder<T> = GenericColumnStatisticsBuilder<T, DecimalAdapter>;
52+
53+
#[enum_dispatch(ColumnStatsOps)]
54+
pub enum ColumnStatisticsBuilder {
55+
Int8(CommonBuilder<Int8Type>),
56+
Int16(CommonBuilder<Int16Type>),
57+
Int32(CommonBuilder<Int32Type>),
58+
Int64(CommonBuilder<Int64Type>),
59+
UInt8(CommonBuilder<UInt8Type>),
60+
UInt16(CommonBuilder<UInt16Type>),
61+
UInt32(CommonBuilder<UInt32Type>),
62+
UInt64(CommonBuilder<UInt64Type>),
63+
Float32(CommonBuilder<Float32Type>),
64+
Float64(CommonBuilder<Float64Type>),
65+
String(CommonBuilder<StringType>),
66+
Date(CommonBuilder<DateType>),
67+
Timestamp(CommonBuilder<TimestampType>),
68+
Decimal64(DecimalBuilder<Decimal64Type>),
69+
Decimal128(DecimalBuilder<Decimal128Type>),
70+
Decimal256(DecimalBuilder<Decimal256Type>),
71+
}
4372

73+
#[enum_dispatch]
74+
pub trait ColumnStatsOps {
75+
fn update_column(&mut self, column: &Column);
4476
fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType);
77+
fn finalize(self) -> Result<ColumnStatistics>;
78+
}
79+
80+
impl<T, A> ColumnStatsOps for GenericColumnStatisticsBuilder<T, A>
81+
where
82+
T: ValueType + Send + Sync,
83+
T::Scalar: Send + Sync,
84+
A: ColumnStatisticsAdapter<T> + 'static,
85+
for<'a, 'b> T::ScalarRef<'a>: PartialOrd<T::ScalarRef<'b>>,
86+
{
87+
fn update_column(&mut self, column: &Column) {
88+
GenericColumnStatisticsBuilder::update_column(self, column);
89+
}
90+
91+
fn update_scalar(&mut self, scalar: &ScalarRef, num_rows: usize, data_type: &DataType) {
92+
GenericColumnStatisticsBuilder::update_scalar(self, scalar, num_rows, data_type);
93+
}
94+
95+
fn finalize(self) -> Result<ColumnStatistics> {
96+
GenericColumnStatisticsBuilder::finalize(self)
97+
}
98+
}
4599

46-
fn finalize(self: Box<Self>) -> Result<ColumnStatistics>;
100+
macro_rules! create_builder_for_type {
101+
($data_type:expr, $variant:ident, $type:ty) => {
102+
ColumnStatisticsBuilder::$variant(CommonBuilder::<$type>::create($data_type))
103+
};
104+
($data_type:expr, $variant:ident, $type:ty, decimal) => {
105+
ColumnStatisticsBuilder::$variant(DecimalBuilder::<$type>::create($data_type))
106+
};
47107
}
48108

49-
pub fn create_column_stats_builder(data_type: &DataType) -> Box<dyn ColumnStatisticsBuilder> {
109+
pub fn create_column_stats_builder(data_type: &DataType) -> ColumnStatisticsBuilder {
50110
let inner_type = data_type.remove_nullable();
51-
with_number_mapped_type!(|NUM_TYPE| match inner_type {
52-
DataType::Number(NumberDataType::NUM_TYPE) => {
53-
GenericColumnStatisticsBuilder::<NumberType<NUM_TYPE>, CommonAdapter>::create(
54-
inner_type,
55-
)
111+
match inner_type {
112+
DataType::Number(NumberDataType::Int8) => {
113+
create_builder_for_type!(inner_type, Int8, Int8Type)
114+
}
115+
DataType::Number(NumberDataType::Int16) => {
116+
create_builder_for_type!(inner_type, Int16, Int16Type)
117+
}
118+
DataType::Number(NumberDataType::Int32) => {
119+
create_builder_for_type!(inner_type, Int32, Int32Type)
120+
}
121+
DataType::Number(NumberDataType::Int64) => {
122+
create_builder_for_type!(inner_type, Int64, Int64Type)
56123
}
57-
DataType::String => {
58-
GenericColumnStatisticsBuilder::<StringType, CommonAdapter>::create(inner_type)
124+
DataType::Number(NumberDataType::UInt8) => {
125+
create_builder_for_type!(inner_type, UInt8, UInt8Type)
59126
}
60-
DataType::Date => {
61-
GenericColumnStatisticsBuilder::<DateType, CommonAdapter>::create(inner_type)
127+
DataType::Number(NumberDataType::UInt16) => {
128+
create_builder_for_type!(inner_type, UInt16, UInt16Type)
62129
}
63-
DataType::Timestamp => {
64-
GenericColumnStatisticsBuilder::<TimestampType, CommonAdapter>::create(inner_type)
130+
DataType::Number(NumberDataType::UInt32) => {
131+
create_builder_for_type!(inner_type, UInt32, UInt32Type)
65132
}
133+
DataType::Number(NumberDataType::UInt64) => {
134+
create_builder_for_type!(inner_type, UInt64, UInt64Type)
135+
}
136+
DataType::Number(NumberDataType::Float32) => {
137+
create_builder_for_type!(inner_type, Float32, Float32Type)
138+
}
139+
DataType::Number(NumberDataType::Float64) => {
140+
create_builder_for_type!(inner_type, Float64, Float64Type)
141+
}
142+
DataType::String => create_builder_for_type!(inner_type, String, StringType),
143+
DataType::Date => create_builder_for_type!(inner_type, Date, DateType),
144+
DataType::Timestamp => create_builder_for_type!(inner_type, Timestamp, TimestampType),
66145
DataType::Decimal(size) => {
67146
if size.can_carried_by_64() {
68-
GenericColumnStatisticsBuilder::<Decimal64Type, DecimalAdapter>::create(inner_type)
147+
create_builder_for_type!(inner_type, Decimal64, Decimal64Type, decimal)
69148
} else if size.can_carried_by_128() {
70-
GenericColumnStatisticsBuilder::<Decimal128Type, DecimalAdapter>::create(inner_type)
149+
create_builder_for_type!(inner_type, Decimal128, Decimal128Type, decimal)
71150
} else {
72-
GenericColumnStatisticsBuilder::<Decimal256Type, DecimalAdapter>::create(inner_type)
151+
create_builder_for_type!(inner_type, Decimal256, Decimal256Type, decimal)
73152
}
74153
}
75154
_ => unreachable!("Unsupported data type: {:?}", data_type),
76-
})
155+
}
77156
}
78157

79158
pub trait ColumnStatisticsAdapter<T: ValueType>: Send + Sync {
@@ -86,7 +165,7 @@ pub trait ColumnStatisticsAdapter<T: ValueType>: Send + Sync {
86165
fn update_value(value: &mut Self::Value, scalar: T::ScalarRef<'_>, ordering: Ordering);
87166
}
88167

89-
struct CommonAdapter;
168+
pub struct CommonAdapter;
90169

91170
impl<T> ColumnStatisticsAdapter<T> for CommonAdapter
92171
where
@@ -111,7 +190,7 @@ where
111190
}
112191
}
113192

114-
struct DecimalAdapter;
193+
pub struct DecimalAdapter;
115194

116195
impl<T> ColumnStatisticsAdapter<T> for DecimalAdapter
117196
where
@@ -137,7 +216,7 @@ where
137216
}
138217
}
139218

140-
struct GenericColumnStatisticsBuilder<T, A>
219+
pub struct GenericColumnStatisticsBuilder<T, A>
141220
where
142221
T: ValueType,
143222
A: ColumnStatisticsAdapter<T>,
@@ -158,15 +237,15 @@ where
158237
A: ColumnStatisticsAdapter<T> + 'static,
159238
for<'a, 'b> T::ScalarRef<'a>: PartialOrd<T::ScalarRef<'b>>,
160239
{
161-
fn create(data_type: DataType) -> Box<dyn ColumnStatisticsBuilder> {
162-
Box::new(Self {
240+
fn create(data_type: DataType) -> Self {
241+
Self {
163242
min: None,
164243
max: None,
165244
null_count: 0,
166245
in_memory_size: 0,
167246
data_type,
168247
_phantom: PhantomData,
169-
})
248+
}
170249
}
171250

172251
fn add_batch<'a, I>(&mut self, mut iter: I)
@@ -201,15 +280,7 @@ where
201280
self.max = Some(A::scalar_to_value(max));
202281
}
203282
}
204-
}
205283

206-
impl<T, A> ColumnStatisticsBuilder for GenericColumnStatisticsBuilder<T, A>
207-
where
208-
T: ValueType + Send + Sync,
209-
T::Scalar: Send + Sync,
210-
A: ColumnStatisticsAdapter<T> + 'static,
211-
for<'a, 'b> T::ScalarRef<'a>: PartialOrd<T::ScalarRef<'b>>,
212-
{
213284
fn update_column(&mut self, column: &Column) {
214285
self.in_memory_size += column.memory_size();
215286
if column.len() == 0 {
@@ -265,7 +336,7 @@ where
265336
self.add(val.clone(), val);
266337
}
267338

268-
fn finalize(self: Box<Self>) -> Result<ColumnStatistics> {
339+
fn finalize(self) -> Result<ColumnStatistics> {
269340
let min = if let Some(v) = self.min {
270341
let v = A::value_to_scalar(v);
271342
// safe upwrap.

src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ use crate::io::write::stream::create_column_ndv_estimator;
2626
use crate::io::write::stream::create_column_stats_builder;
2727
use crate::io::write::stream::ColumnNDVEstimator;
2828
use crate::io::write::stream::ColumnStatisticsBuilder;
29+
use crate::io::write::stream::ColumnStatsOps;
2930
use crate::statistics::traverse_values_dfs;
3031

3132
pub struct ColumnStatisticsState {
32-
col_stats: HashMap<ColumnId, Box<dyn ColumnStatisticsBuilder>>,
33+
col_stats: HashMap<ColumnId, ColumnStatisticsBuilder>,
3334
distinct_columns: HashMap<ColumnId, Box<dyn ColumnNDVEstimator>>,
3435
}
3536

0 commit comments

Comments
 (0)