Skip to content

Commit 3932276

Browse files
committed
update
1 parent 1404cc7 commit 3932276

File tree

10 files changed

+192
-98
lines changed

10 files changed

+192
-98
lines changed

src/query/settings/src/settings_default.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,13 +1312,6 @@ impl DefaultSettings {
13121312
scope: SettingScope::Both,
13131313
range: None,
13141314
}),
1315-
("enable_block_stream_write", DefaultSettingValue {
1316-
value: UserSettingValue::UInt64(0),
1317-
desc: "Enables block stream write",
1318-
mode: SettingMode::Both,
1319-
scope: SettingScope::Both,
1320-
range: Some(SettingRange::Numeric(0..=1)),
1321-
}),
13221315
("trace_sample_rate", DefaultSettingValue {
13231316
value: UserSettingValue::UInt64(1),
13241317
desc: "Setting the trace sample rate. The value should be between '0' and '100'",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -974,10 +974,6 @@ impl Settings {
974974
self.set_setting("optimizer_skip_list".to_string(), v)
975975
}
976976

977-
pub fn get_enable_block_stream_write(&self) -> Result<bool> {
978-
Ok(self.try_get_u64("enable_block_stream_write")? == 1)
979-
}
980-
981977
pub fn get_statement_queue_ttl_in_seconds(&self) -> Result<u64> {
982978
self.try_get_u64("statement_queue_ttl_in_seconds")
983979
}

src/query/storages/common/index/src/bloom_index.rs

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::collections::BTreeMap;
1616
use std::collections::HashMap;
17+
use std::hash::DefaultHasher;
1718
use std::hash::Hasher;
1819
use std::ops::ControlFlow;
1920
use std::ops::Deref;
@@ -35,12 +36,18 @@ use databend_common_expression::types::BinaryType;
3536
use databend_common_expression::types::Bitmap;
3637
use databend_common_expression::types::Buffer;
3738
use databend_common_expression::types::DataType;
39+
use databend_common_expression::types::DateType;
3840
use databend_common_expression::types::MapType;
3941
use databend_common_expression::types::NullableType;
4042
use databend_common_expression::types::Number;
4143
use databend_common_expression::types::NumberDataType;
44+
use databend_common_expression::types::NumberType;
45+
use databend_common_expression::types::StringType;
46+
use databend_common_expression::types::TimestampType;
4247
use databend_common_expression::types::UInt64Type;
48+
use databend_common_expression::types::ValueType;
4349
use databend_common_expression::visit_expr;
50+
use databend_common_expression::with_number_mapped_type;
4451
use databend_common_expression::BlockEntry;
4552
use databend_common_expression::Column;
4653
use databend_common_expression::ColumnBuilder;
@@ -349,6 +356,71 @@ impl BloomIndex {
349356
Ok(column)
350357
}
351358

359+
pub fn calculate_digest_by_type(data_type: &DataType, column: &Column) -> Result<Vec<u64>> {
360+
let inner_type = data_type.remove_nullable();
361+
with_number_mapped_type!(|NUM_TYPE| match inner_type {
362+
DataType::Number(NumberDataType::NUM_TYPE) => {
363+
Self::calculate_nullable_column_digests::<NumberType<NUM_TYPE>>(column)
364+
}
365+
DataType::String => {
366+
Self::calculate_nullable_column_digests::<StringType>(column)
367+
}
368+
DataType::Date => {
369+
Self::calculate_nullable_column_digests::<DateType>(column)
370+
}
371+
DataType::Timestamp => {
372+
Self::calculate_nullable_column_digests::<TimestampType>(column)
373+
}
374+
_ => Err(ErrorCode::Internal(format!(
375+
"Unsupported data type: {:?}",
376+
data_type
377+
))),
378+
})
379+
}
380+
381+
#[inline(always)]
382+
fn hash_one<T: DFHash>(v: &T) -> u64 {
383+
let mut hasher = DefaultHasher::default();
384+
DFHash::hash(v, &mut hasher);
385+
hasher.finish()
386+
}
387+
388+
fn calculate_nullable_column_digests<T: ValueType>(column: &Column) -> Result<Vec<u64>>
389+
where for<'a> T::ScalarRef<'a>: DFHash {
390+
let (column, validity) = if let Column::Nullable(box inner) = column {
391+
let validity = if inner.validity.null_count() == 0 {
392+
None
393+
} else {
394+
Some(&inner.validity)
395+
};
396+
(&inner.column, validity)
397+
} else {
398+
(column, None)
399+
};
400+
401+
let capacity = validity.map_or(column.len(), |v| v.true_count() + 1);
402+
let mut result = Vec::with_capacity(capacity);
403+
if validity.is_some() {
404+
result.push(0);
405+
}
406+
let column = T::try_downcast_column(column).unwrap();
407+
if let Some(validity) = validity {
408+
let column_iter = T::iter_column(&column);
409+
let value_iter = column_iter
410+
.zip(validity.iter())
411+
.filter(|(_, v)| *v)
412+
.map(|(v, _)| v);
413+
for value in value_iter {
414+
result.push(Self::hash_one(&value));
415+
}
416+
} else {
417+
for value in T::iter_column(&column) {
418+
result.push(Self::hash_one(&value));
419+
}
420+
}
421+
Ok(result)
422+
}
423+
352424
/// calculate digest for column that may have null values
353425
///
354426
/// returns (column, validity) where column is the digest of the column
@@ -734,24 +806,8 @@ impl BloomIndexBuilder {
734806
}
735807
};
736808

737-
let (column, validity) =
738-
BloomIndex::calculate_nullable_column_digest(&self.func_ctx, &column, &data_type)?;
739-
// create filter per column
740-
if validity.as_ref().map(|v| v.null_count()).unwrap_or(0) > 0 {
741-
let validity = validity.unwrap();
742-
let it = column.deref().iter().zip(validity.iter()).map(
743-
|(v, b)| {
744-
if !b {
745-
&0
746-
} else {
747-
v
748-
}
749-
},
750-
);
751-
index_column.builder.add_digests(it);
752-
} else {
753-
index_column.builder.add_digests(column.deref());
754-
}
809+
let column = BloomIndex::calculate_digest_by_type(&data_type, &column)?;
810+
index_column.builder.add_digests(column.deref());
755811
}
756812
for index_column in self.ngram_columns.iter_mut() {
757813
let field_type = &block.data_type(index_column.index);

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,13 @@ impl FuseTable {
743743
)
744744
})
745745
}
746+
747+
pub fn enable_stream_block_write(&self) -> bool {
748+
matches!(self.storage_format, FuseStorageFormat::Parquet)
749+
&& self
750+
.cluster_type()
751+
.is_none_or(|v| matches!(v, ClusterType::Hilbert))
752+
}
746753
}
747754

748755
#[async_trait::async_trait]

src/query/storages/fuse/src/io/write/block_writer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use std::time::Instant;
2020
use chrono::Utc;
2121
use databend_common_catalog::table_context::TableContext;
2222
use databend_common_exception::Result;
23+
use databend_common_expression::local_block_meta_serde;
24+
use databend_common_expression::BlockMetaInfo;
2325
use databend_common_expression::Column;
2426
use databend_common_expression::ColumnId;
2527
use databend_common_expression::DataBlock;
@@ -124,6 +126,7 @@ pub async fn write_data(data: Vec<u8>, data_accessor: &Operator, location: &str)
124126
Ok(())
125127
}
126128

129+
#[derive(Debug)]
127130
pub struct BlockSerialization {
128131
pub block_raw_data: Vec<u8>,
129132
pub block_meta: BlockMeta,
@@ -132,6 +135,11 @@ pub struct BlockSerialization {
132135
pub virtual_column_state: Option<VirtualColumnState>,
133136
}
134137

138+
local_block_meta_serde!(BlockSerialization);
139+
140+
#[typetag::serde(name = "block_serialization_meta")]
141+
impl BlockMetaInfo for BlockSerialization {}
142+
135143
#[derive(Clone)]
136144
pub struct BlockBuilder {
137145
pub ctx: Arc<dyn TableContext>,

src/query/storages/fuse/src/io/write/bloom_index_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use opendal::Operator;
4040
use crate::io::BlockReader;
4141
use crate::FuseStorageFormat;
4242

43+
#[derive(Debug)]
4344
pub struct BloomIndexState {
4445
pub(crate) data: Vec<u8>,
4546
pub(crate) size: u64,

src/query/storages/fuse/src/io/write/inverted_index_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ pub fn create_inverted_index_builders(table_meta: &TableMeta) -> Vec<InvertedInd
121121
inverted_index_builders
122122
}
123123

124+
#[derive(Debug)]
124125
pub struct InvertedIndexState {
125126
pub(crate) data: Vec<u8>,
126127
pub(crate) size: u64,

src/query/storages/fuse/src/operations/append.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ use databend_common_sql::executor::physical_plans::MutationKind;
3737
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
3838
use databend_storages_common_table_meta::table::ClusterType;
3939

40+
use crate::io::StreamBlockProperties;
41+
use crate::operations::TransformBlockBuilder;
4042
use crate::operations::TransformBlockWriter;
4143
use crate::operations::TransformSerializeBlock;
4244
use crate::statistics::ClusterStatsGenerator;
43-
use crate::FuseStorageFormat;
4445
use crate::FuseTable;
4546

4647
impl FuseTable {
@@ -50,19 +51,22 @@ impl FuseTable {
5051
pipeline: &mut Pipeline,
5152
table_meta_timestamps: TableMetaTimestamps,
5253
) -> Result<()> {
53-
let enable_stream_block_write = ctx.get_settings().get_enable_block_stream_write()?
54-
&& matches!(self.storage_format, FuseStorageFormat::Parquet);
54+
let enable_stream_block_write = self.enable_stream_block_write();
5555
if enable_stream_block_write {
56+
let properties = StreamBlockProperties::try_create(
57+
ctx.clone(),
58+
self,
59+
MutationKind::Insert,
60+
table_meta_timestamps,
61+
)?;
62+
5663
pipeline.add_transform(|input, output| {
57-
TransformBlockWriter::try_create(
58-
ctx.clone(),
59-
input,
60-
output,
61-
self,
62-
table_meta_timestamps,
63-
false,
64-
)
64+
TransformBlockBuilder::try_create(input, output, properties.clone())
6565
})?;
66+
67+
pipeline.add_async_accumulating_transformer(|| {
68+
TransformBlockWriter::create(ctx.clone(), MutationKind::Insert, self, false)
69+
});
6670
} else {
6771
let block_thresholds = self.get_block_thresholds();
6872
build_compact_block_pipeline(pipeline, block_thresholds)?;

src/query/storages/fuse/src/operations/common/processors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod transform_serialize_segment;
2222

2323
pub use multi_table_insert_commit::CommitMultiTableInsert;
2424
pub use sink_commit::CommitSink;
25+
pub use transform_block_writer::TransformBlockBuilder;
2526
pub use transform_block_writer::TransformBlockWriter;
2627
pub use transform_merge_commit_meta::TransformMergeCommitMeta;
2728
pub use transform_mutation_aggregator::TableMutationAggregator;

0 commit comments

Comments
 (0)