Skip to content

Commit 3f111dd

Browse files
authored
chore: optimize building bloom index for null (#17625)
* optimize building bloom index for null calc scalar memory size * fix * fix
1 parent b83cb6e commit 3f111dd

File tree

15 files changed

+243
-96
lines changed

15 files changed

+243
-96
lines changed

src/query/expression/src/block.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,26 @@ impl DataBlock {
606606
}
607607
self.columns[col].value.index(row)
608608
}
609+
610+
/// Calculates the memory size of a `DataBlock` for writing purposes.
611+
/// This function is used to estimate the memory footprint of a `DataBlock` when writing it to storage.
612+
pub fn estimate_block_size(&self) -> usize {
613+
let num_rows = self.num_rows();
614+
self.columns()
615+
.iter()
616+
.map(|entry| match &entry.value {
617+
Value::Column(Column::Nullable(col)) if col.validity.true_count() == 0 => {
618+
// For `Nullable` columns with no valid values,
619+
// only the size of the validity bitmap is counted.
620+
col.validity.as_slice().0.len()
621+
}
622+
Value::Scalar(v) => v
623+
.as_ref()
624+
.estimated_scalar_repeat_size(num_rows, &entry.data_type),
625+
_ => entry.memory_size(),
626+
})
627+
.sum()
628+
}
609629
}
610630

611631
impl BlockEntry {

src/query/expression/src/values.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,56 @@ impl ScalarRef<'_> {
756756
},
757757
}
758758
}
759+
760+
/// Estimates the memory size of a scalar value if it were repeated `n` times,
761+
/// without actually converting it into a column. This avoids unnecessary allocations
762+
/// and provides a direct calculation based on the scalar type and its associated DataType.
763+
///
764+
/// # Parameters:
765+
/// - `scalar`: The scalar value to estimate memory size for.
766+
/// - `n`: The number of times the scalar is hypothetically repeated.
767+
/// - `data_type`: The data type of the scalar, used for correct size calculations.
768+
///
769+
/// # Returns:
770+
/// The estimated memory size (in bytes) that `n` repetitions of `scalar` would occupy.
771+
pub fn estimated_scalar_repeat_size(&self, n: usize, data_type: &DataType) -> usize {
772+
if let DataType::Nullable(ty) = data_type {
773+
let mut memory_size = (n + 7) / 8;
774+
if !self.is_null() {
775+
memory_size += self.estimated_scalar_repeat_size(n, ty);
776+
}
777+
return memory_size;
778+
}
779+
780+
match self {
781+
ScalarRef::Null => std::mem::size_of::<usize>(),
782+
ScalarRef::EmptyArray | ScalarRef::EmptyMap => std::mem::size_of::<usize>(),
783+
ScalarRef::Number(_) => n * self.memory_size(),
784+
ScalarRef::Decimal(_) => n * self.memory_size(),
785+
ScalarRef::Boolean(_) => (n + 7) / 8,
786+
ScalarRef::Binary(s) => s.len() * n + (n + 1) * 8,
787+
ScalarRef::String(s) => s.len() * n + n * 12,
788+
ScalarRef::Timestamp(_) => n * 8,
789+
ScalarRef::Date(_) => n * 4,
790+
ScalarRef::Interval(_) => n * 16,
791+
ScalarRef::Array(col) => col.memory_size() * n + (n + 1) * 8,
792+
ScalarRef::Map(col) => col.memory_size() * n + (n + 1) * 8,
793+
ScalarRef::Bitmap(b) => b.len() * n + (n + 1) * 8,
794+
ScalarRef::Tuple(fields) => {
795+
let DataType::Tuple(fields_ty) = data_type else {
796+
unreachable!()
797+
};
798+
fields
799+
.iter()
800+
.zip(fields_ty.iter())
801+
.map(|(v, ty)| v.estimated_scalar_repeat_size(n, ty))
802+
.sum()
803+
}
804+
ScalarRef::Variant(s) => s.len() * n + (n + 1) * 8,
805+
ScalarRef::Geometry(s) => s.len() * n + (n + 1) * 8,
806+
ScalarRef::Geography(s) => s.0.len() * n + (n + 1) * 8,
807+
}
808+
}
759809
}
760810

761811
impl PartialOrd for Scalar {
@@ -1708,9 +1758,8 @@ impl ColumnBuilder {
17081758
ScalarRef::Map(col) => ColumnBuilder::Map(Box::new(ArrayColumnBuilder::repeat(col, n))),
17091759
ScalarRef::Bitmap(b) => ColumnBuilder::Bitmap(BinaryColumnBuilder::repeat(b, n)),
17101760
ScalarRef::Tuple(fields) => {
1711-
let fields_ty = match data_type {
1712-
DataType::Tuple(fields_ty) => fields_ty,
1713-
_ => unreachable!(),
1761+
let DataType::Tuple(fields_ty) = data_type else {
1762+
unreachable!()
17141763
};
17151764
ColumnBuilder::Tuple(
17161765
fields

src/query/expression/tests/it/kernel.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,17 @@
1414

1515
use core::ops::Range;
1616

17+
use databend_common_base::base::OrderedFloat;
1718
use databend_common_column::bitmap::Bitmap;
1819
use databend_common_expression::block_debug::assert_block_value_eq;
1920
use databend_common_expression::types::number::*;
2021
use databend_common_expression::types::AnyType;
2122
use databend_common_expression::types::DataType;
23+
use databend_common_expression::types::DecimalDataType;
24+
use databend_common_expression::types::DecimalScalar;
25+
use databend_common_expression::types::DecimalSize;
26+
use databend_common_expression::types::IntervalType;
27+
use databend_common_expression::types::NullableType;
2228
use databend_common_expression::types::NumberDataType;
2329
use databend_common_expression::types::StringType;
2430
use databend_common_expression::types::ValueType;
@@ -31,6 +37,7 @@ use databend_common_expression::FilterVisitor;
3137
use databend_common_expression::FromData;
3238
use databend_common_expression::IterationStrategy;
3339
use databend_common_expression::Scalar;
40+
use databend_common_expression::ScalarRef;
3441
use databend_common_expression::Value;
3542
use goldenfile::Mint;
3643

@@ -636,3 +643,112 @@ fn test_builder() {
636643

637644
assert_eq!(r1, r2)
638645
}
646+
647+
fn assert_estimated_scalar_repeat_size(scalar: ScalarRef, num_rows: usize, ty: DataType) {
648+
let builder = ColumnBuilder::repeat(&scalar, num_rows, &ty);
649+
let col = builder.build();
650+
assert_eq!(
651+
scalar.estimated_scalar_repeat_size(num_rows, &ty),
652+
col.memory_size()
653+
);
654+
}
655+
656+
#[test]
657+
fn test_estimated_scalar_repeat_size() {
658+
let num_rows = 108;
659+
660+
// null
661+
{
662+
let scalar = ScalarRef::Null;
663+
let ty = DataType::Null;
664+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
665+
}
666+
667+
// nullable
668+
{
669+
let scalar = ScalarRef::Null;
670+
let ty = DataType::Nullable(Box::new(DataType::Interval));
671+
let builder = ColumnBuilder::repeat(&scalar, num_rows, &ty);
672+
let col = builder.build();
673+
let col = NullableType::<IntervalType>::try_downcast_column(&col).unwrap();
674+
assert_eq!(
675+
scalar.estimated_scalar_repeat_size(num_rows, &ty),
676+
col.validity.as_slice().0.len()
677+
);
678+
}
679+
680+
// nullable(float32)
681+
{
682+
let scalar = ScalarRef::Number(NumberScalar::Float32(OrderedFloat(2.33)));
683+
let ty = DataType::Nullable(Box::new(DataType::Number(NumberDataType::Float32)));
684+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
685+
}
686+
687+
// decimal
688+
{
689+
let scalar = ScalarRef::Decimal(DecimalScalar::Decimal128(233, DecimalSize {
690+
precision: 46,
691+
scale: 6,
692+
}));
693+
let ty = DataType::Decimal(DecimalDataType::Decimal256(DecimalSize {
694+
precision: 46,
695+
scale: 6,
696+
}));
697+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
698+
}
699+
700+
// string
701+
{
702+
let scalar = ScalarRef::String("abc");
703+
let ty = DataType::String;
704+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
705+
}
706+
707+
// string
708+
{
709+
let scalar = ScalarRef::String("abcdefghijklmn123");
710+
let ty = DataType::String;
711+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
712+
}
713+
714+
// binary
715+
{
716+
let scalar = ScalarRef::Binary(&[1, 133, 244, 123]);
717+
let ty = DataType::Binary;
718+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
719+
}
720+
721+
// boolean
722+
{
723+
let scalar = ScalarRef::Boolean(true);
724+
let ty = DataType::Boolean;
725+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
726+
}
727+
728+
// bitmap
729+
{
730+
let scalar = ScalarRef::Bitmap(&[1, 133, 244, 123]);
731+
let ty = DataType::Bitmap;
732+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
733+
}
734+
735+
// array
736+
{
737+
let scalar = ScalarRef::Array(StringType::from_data(vec!["abc", "abcdefghijklmn123"]));
738+
let ty = DataType::Array(Box::new(DataType::String));
739+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
740+
}
741+
742+
// map
743+
{
744+
let scalar = ScalarRef::Map(Column::Tuple(vec![
745+
UInt8Type::from_data(vec![1, 2]),
746+
StringType::from_data(vec!["a", "b"]),
747+
]));
748+
let ty = DataType::Map(Box::new(DataType::Tuple(vec![
749+
DataType::Number(NumberDataType::UInt8),
750+
DataType::String,
751+
])));
752+
assert_estimated_scalar_repeat_size(scalar, num_rows, ty);
753+
}
754+
}

src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,58 +12,28 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::sync::Arc;
16-
1715
use databend_common_exception::Result;
1816
use databend_common_expression::BlockThresholds;
19-
use databend_common_expression::Column;
2017
use databend_common_expression::DataBlock;
21-
use databend_common_expression::Value;
22-
use databend_common_pipeline_core::processors::InputPort;
23-
use databend_common_pipeline_core::processors::OutputPort;
24-
use databend_common_pipeline_core::processors::ProcessorPtr;
2518
use databend_common_pipeline_core::Pipeline;
2619

2720
use crate::processors::AccumulatingTransform;
2821
use crate::processors::BlockCompactMeta;
2922
use crate::processors::TransformCompactBlock;
3023
use crate::processors::TransformPipelineHelper;
31-
use crate::Transform;
32-
use crate::Transformer;
3324

3425
pub fn build_compact_block_pipeline(
3526
pipeline: &mut Pipeline,
3627
thresholds: BlockThresholds,
3728
) -> Result<()> {
3829
let output_len = pipeline.output_len();
39-
pipeline.add_transform(ConvertToFullTransform::create)?;
4030
pipeline.try_resize(1)?;
4131
pipeline.add_accumulating_transformer(|| BlockCompactBuilder::new(thresholds));
4232
pipeline.try_resize(output_len)?;
4333
pipeline.add_block_meta_transformer(TransformCompactBlock::default);
4434
Ok(())
4535
}
4636

47-
pub(crate) struct ConvertToFullTransform;
48-
49-
impl ConvertToFullTransform {
50-
pub(crate) fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> Result<ProcessorPtr> {
51-
Ok(ProcessorPtr::create(Transformer::create(
52-
input,
53-
output,
54-
ConvertToFullTransform {},
55-
)))
56-
}
57-
}
58-
59-
impl Transform for ConvertToFullTransform {
60-
const NAME: &'static str = "ConvertToFullTransform";
61-
62-
fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
63-
Ok(data.consume_convert_to_full())
64-
}
65-
}
66-
6737
pub struct BlockCompactBuilder {
6838
thresholds: BlockThresholds,
6939
// Holds blocks that are partially accumulated but haven't reached the threshold.
@@ -93,7 +63,7 @@ impl AccumulatingTransform for BlockCompactBuilder {
9363

9464
fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
9565
let num_rows = data.num_rows();
96-
let num_bytes = memory_size(&data);
66+
let num_bytes = data.estimate_block_size();
9767

9868
if !self.thresholds.check_for_compact(num_rows, num_bytes) {
9969
// holding slices of blocks to merge later may lead to oom, so
@@ -142,18 +112,3 @@ impl AccumulatingTransform for BlockCompactBuilder {
142112
}
143113
}
144114
}
145-
146-
pub fn memory_size(data_block: &DataBlock) -> usize {
147-
data_block
148-
.columns()
149-
.iter()
150-
.map(|entry| match &entry.value {
151-
Value::Column(Column::Nullable(col)) if col.validity.true_count() == 0 => {
152-
// For `Nullable` columns with no valid values,
153-
// only the size of the validity bitmap is counted.
154-
col.validity.as_slice().0.len()
155-
}
156-
_ => entry.memory_size(),
157-
})
158-
.sum()
159-
}

src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder {
7979

8080
fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
8181
self.accumulated_rows += data.num_rows();
82-
self.accumulated_bytes += crate::processors::memory_size(&data);
82+
self.accumulated_bytes += data.estimate_block_size();
8383
if !self
8484
.thresholds
8585
.check_large_enough(self.accumulated_rows, self.accumulated_bytes)

src/query/service/src/pipelines/processors/transforms/window/partition/data_processor_strategy.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use databend_common_expression::BlockThresholds;
1717
use databend_common_expression::DataBlock;
1818
use databend_common_expression::DataSchemaRef;
1919
use databend_common_expression::SortColumnDescription;
20-
use databend_common_pipeline_transforms::memory_size;
2120
use databend_common_pipeline_transforms::sort_merge;
2221
use databend_common_settings::Settings;
2322

@@ -56,7 +55,7 @@ impl DataProcessorStrategy for CompactStrategy {
5655
let mut result = Vec::with_capacity(blocks_num);
5756
for block in data_blocks {
5857
accumulated_rows += block.num_rows();
59-
accumulated_bytes += memory_size(&block);
58+
accumulated_bytes += block.estimate_block_size();
6059
if !self
6160
.thresholds
6261
.check_large_enough(accumulated_rows, accumulated_bytes)

src/query/service/src/test_kits/fixture.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ impl TestFixture {
563563
num_of_blocks: usize,
564564
start: i32,
565565
) -> (TableSchemaRef, Vec<Result<DataBlock>>) {
566-
Self::gen_sample_blocks_ex(num_of_blocks, 3, start)
566+
Self::gen_sample_blocks_ex(num_of_blocks, 2, start)
567567
}
568568

569569
pub fn gen_sample_blocks_ex(

src/query/service/src/test_kits/fuse.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ async fn generate_blocks(
177177
let mut block_metas = vec![];
178178

179179
// does not matter in this suite
180-
let rows_per_block = 1;
180+
let rows_per_block = 2;
181181
let value_start_from = 1;
182182

183183
let stream =

0 commit comments

Comments
 (0)