Skip to content

feat: enable block stream write #18285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,13 +1319,6 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: None,
}),
("enable_block_stream_write", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables block stream write",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("trace_sample_rate", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Setting the trace sample rate. The value should be between '0' and '100'",
Expand Down
4 changes: 0 additions & 4 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,10 +978,6 @@ impl Settings {
self.set_setting("optimizer_skip_list".to_string(), v)
}

pub fn get_enable_block_stream_write(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_block_stream_write")? == 1)
}

pub fn get_statement_queue_ttl_in_seconds(&self) -> Result<u64> {
self.try_get_u64("statement_queue_ttl_in_seconds")
}
Expand Down
92 changes: 74 additions & 18 deletions src/query/storages/common/index/src/bloom_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::hash::DefaultHasher;
use std::hash::Hasher;
use std::ops::ControlFlow;
use std::ops::Deref;
Expand All @@ -35,12 +36,18 @@ use databend_common_expression::types::BinaryType;
use databend_common_expression::types::Bitmap;
use databend_common_expression::types::Buffer;
use databend_common_expression::types::DataType;
use databend_common_expression::types::DateType;
use databend_common_expression::types::MapType;
use databend_common_expression::types::NullableType;
use databend_common_expression::types::Number;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::NumberType;
use databend_common_expression::types::StringType;
use databend_common_expression::types::TimestampType;
use databend_common_expression::types::UInt64Type;
use databend_common_expression::types::ValueType;
use databend_common_expression::visit_expr;
use databend_common_expression::with_number_mapped_type;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::ColumnBuilder;
Expand Down Expand Up @@ -349,6 +356,71 @@ impl BloomIndex {
Ok(column)
}

pub fn calculate_digest_by_type(data_type: &DataType, column: &Column) -> Result<Vec<u64>> {
let inner_type = data_type.remove_nullable();
with_number_mapped_type!(|NUM_TYPE| match inner_type {
DataType::Number(NumberDataType::NUM_TYPE) => {
Self::calculate_nullable_column_digests::<NumberType<NUM_TYPE>>(column)
}
DataType::String => {
Self::calculate_nullable_column_digests::<StringType>(column)
}
DataType::Date => {
Self::calculate_nullable_column_digests::<DateType>(column)
}
DataType::Timestamp => {
Self::calculate_nullable_column_digests::<TimestampType>(column)
}
_ => Err(ErrorCode::Internal(format!(
"Unsupported data type: {:?}",
data_type
))),
})
}

#[inline(always)]
fn hash_one<T: DFHash>(v: &T) -> u64 {
let mut hasher = DefaultHasher::default();
DFHash::hash(v, &mut hasher);
hasher.finish()
}

fn calculate_nullable_column_digests<T: ValueType>(column: &Column) -> Result<Vec<u64>>
where for<'a> T::ScalarRef<'a>: DFHash {
let (column, validity) = if let Column::Nullable(box inner) = column {
let validity = if inner.validity.null_count() == 0 {
None
} else {
Some(&inner.validity)
};
(&inner.column, validity)
} else {
(column, None)
};

let capacity = validity.map_or(column.len(), |v| v.true_count() + 1);
let mut result = Vec::with_capacity(capacity);
if validity.is_some() {
result.push(0);
}
let column = T::try_downcast_column(column).unwrap();
if let Some(validity) = validity {
let column_iter = T::iter_column(&column);
let value_iter = column_iter
.zip(validity.iter())
.filter(|(_, v)| *v)
.map(|(v, _)| v);
for value in value_iter {
result.push(Self::hash_one(&value));
}
} else {
for value in T::iter_column(&column) {
result.push(Self::hash_one(&value));
}
}
Ok(result)
}

/// calculate digest for column that may have null values
///
/// returns (column, validity) where column is the digest of the column
Expand Down Expand Up @@ -734,24 +806,8 @@ impl BloomIndexBuilder {
}
};

let (column, validity) =
BloomIndex::calculate_nullable_column_digest(&self.func_ctx, &column, &data_type)?;
// create filter per column
if validity.as_ref().map(|v| v.null_count()).unwrap_or(0) > 0 {
let validity = validity.unwrap();
let it = column.deref().iter().zip(validity.iter()).map(
|(v, b)| {
if !b {
&0
} else {
v
}
},
);
index_column.builder.add_digests(it);
} else {
index_column.builder.add_digests(column.deref());
}
let column = BloomIndex::calculate_digest_by_type(&data_type, &column)?;
index_column.builder.add_digests(column.deref());
}
for index_column in self.ngram_columns.iter_mut() {
let field_type = &block.data_type(index_column.index);
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ backoff = { workspace = true, features = ["futures", "tokio"] }
bytes = { workspace = true }
chrono = { workspace = true }
enum-as-inner = { workspace = true }
enum_dispatch = { workspace = true }
fastrace = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
Expand All @@ -61,6 +62,7 @@ match-template = { workspace = true }
opendal = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
paste = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,13 @@ impl FuseTable {
)
})
}

pub fn enable_stream_block_write(&self) -> bool {
matches!(self.storage_format, FuseStorageFormat::Parquet)
&& self
.cluster_type()
.is_none_or(|v| matches!(v, ClusterType::Hilbert))
}
}

#[async_trait::async_trait]
Expand Down
8 changes: 8 additions & 0 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::time::Instant;
use chrono::Utc;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::local_block_meta_serde;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::Column;
use databend_common_expression::ColumnId;
use databend_common_expression::DataBlock;
Expand Down Expand Up @@ -124,6 +126,7 @@ pub async fn write_data(data: Vec<u8>, data_accessor: &Operator, location: &str)
Ok(())
}

#[derive(Debug)]
pub struct BlockSerialization {
pub block_raw_data: Vec<u8>,
pub block_meta: BlockMeta,
Expand All @@ -132,6 +135,11 @@ pub struct BlockSerialization {
pub virtual_column_state: Option<VirtualColumnState>,
}

local_block_meta_serde!(BlockSerialization);

#[typetag::serde(name = "block_serialization_meta")]
impl BlockMetaInfo for BlockSerialization {}

#[derive(Clone)]
pub struct BlockBuilder {
pub ctx: Arc<dyn TableContext>,
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/bloom_index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use opendal::Operator;
use crate::io::BlockReader;
use crate::FuseStorageFormat;

#[derive(Debug)]
pub struct BloomIndexState {
pub(crate) data: Vec<u8>,
pub(crate) size: u64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub fn create_inverted_index_builders(table_meta: &TableMeta) -> Vec<InvertedInd
inverted_index_builders
}

#[derive(Debug)]
pub struct InvertedIndexState {
pub(crate) data: Vec<u8>,
pub(crate) size: u64,
Expand Down
Loading
Loading