Skip to content

feat(storage): hilbert recluster support stream block writer #17904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 36 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
58690aa
hilbert recluster support block stream write
zhyass Apr 30, 2025
8c77f35
fix exchange
zhyass May 3, 2025
67bd532
add transform hilbert collect
zhyass May 8, 2025
3cf0b6f
partial restore
zhyass May 8, 2025
82a5457
format
zhyass May 8, 2025
ac6ca41
add test
zhyass May 9, 2025
f3dbc57
fix test
zhyass May 9, 2025
22f2d3a
format
zhyass May 9, 2025
f5e0491
add compact strategy
zhyass May 13, 2025
504e60f
fix
zhyass May 14, 2025
0798c0f
fix test
zhyass May 14, 2025
98279ec
fix test
zhyass May 15, 2025
d150dc0
fix test
zhyass May 15, 2025
797177f
spill block build and block write
zhyass May 19, 2025
62f2093
fix test
zhyass May 20, 2025
e2b02f7
add noise for hilbert recluster
zhyass May 20, 2025
81e8dba
update
zhyass Jun 4, 2025
e59fe4d
linear recluster support block stream writer
zhyass Jun 6, 2025
8811b22
fix
zhyass Jun 7, 2025
e395e10
fix
zhyass Jun 8, 2025
e6a2a25
fix test
zhyass Jun 9, 2025
fa6f023
fix
zhyass Jun 9, 2025
42ebaba
fix test
zhyass Jun 9, 2025
bd996f2
fix test
zhyass Jun 9, 2025
ad818da
improve recluster partition
zhyass Jun 12, 2025
22c37b3
for test
zhyass Jun 13, 2025
27fa7db
for test
zhyass Jun 13, 2025
8875f53
for test
zhyass Jun 13, 2025
c807783
fix
zhyass Jun 13, 2025
cb50347
for test
zhyass Jun 15, 2025
35bfee3
fix
zhyass Jun 15, 2025
0eb6279
add column ndv estimator
zhyass Jun 17, 2025
f0b0d93
add column min max state
zhyass Jun 18, 2025
ca55812
fix
zhyass Jun 19, 2025
e7483b5
remove unused codes
zhyass Jun 19, 2025
7d140eb
add cluster sample
zhyass Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/base/src/base/watch_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl WatchNotify {
pub fn notify_waiters(&self) {
let _ = self.tx.send_replace(true);
}

pub fn is_notified(&self) -> bool {
*self.rx.borrow()
}
}

#[cfg(test)]
Expand Down
20 changes: 18 additions & 2 deletions src/query/ee/src/hilbert_clustering/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
let mut checker = ReclusterChecker::new(
cluster_key_id,
hilbert_min_bytes,
block_per_seg,
push_downs.as_ref().is_none_or(|v| v.filters.is_none()),
);
'FOR: for chunk in segment_locations.chunks(chunk_size) {
Expand Down Expand Up @@ -139,19 +140,29 @@ struct ReclusterChecker {
hilbert_min_bytes: usize,
total_bytes: usize,

hilbert_min_blocks: usize,
total_blocks: usize,

finished: bool,
// Whether the target segments is at the head of snapshot.
head_of_snapshot: bool,
}

impl ReclusterChecker {
fn new(default_cluster_id: u32, hilbert_min_bytes: usize, head_of_snapshot: bool) -> Self {
fn new(
default_cluster_id: u32,
hilbert_min_bytes: usize,
hilbert_min_blocks: usize,
head_of_snapshot: bool,
) -> Self {
Self {
segments: vec![],
last_segment: None,
default_cluster_id,
hilbert_min_blocks,
hilbert_min_bytes,
total_bytes: 0,
total_blocks: 0,
finished: false,
head_of_snapshot,
}
Expand All @@ -164,10 +175,14 @@ impl ReclusterChecker {

if segment_should_recluster || !self.head_of_snapshot {
self.total_bytes += segment.summary.uncompressed_byte_size as usize;
self.total_blocks += segment.summary.block_count as usize;
self.segments.push((location.clone(), segment.clone()));
}

if !segment_should_recluster || self.total_bytes >= self.hilbert_min_bytes {
if !segment_should_recluster
|| (self.total_bytes >= self.hilbert_min_bytes
&& self.total_blocks >= self.hilbert_min_blocks)
{
if self.check_for_recluster() {
self.finished = true;
return true;
Expand Down Expand Up @@ -208,6 +223,7 @@ impl ReclusterChecker {

fn reset(&mut self) {
self.total_bytes = 0;
self.total_blocks = 0;
self.head_of_snapshot = false;
self.segments.clear();
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ impl DataBlock {
}

#[inline]
pub fn remove_column(&mut self, index: usize) {
self.entries.remove(index);
pub fn remove_column(&mut self, index: usize) -> BlockEntry {
self.entries.remove(index)
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/sampler/fixed_size_sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ fn compact_indices(indices: &mut Vec<BlockRowIndex>, blocks: &mut Vec<DataBlock>
.collect();
}

mod reservoir_sampling {
pub mod reservoir_sampling {
use std::num::NonZeroUsize;

use rand::Rng;
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/sampler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ mod fixed_rate_sampler;
mod fixed_size_sampler;

pub use fixed_rate_sampler::FixedRateSampler;
pub use fixed_size_sampler::reservoir_sampling::AlgoL;
pub use fixed_size_sampler::FixedSizeSampler;
57 changes: 53 additions & 4 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Default for BlockThresholds {
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE * 2,
min_bytes_per_block: (DEFAULT_BLOCK_BUFFER_SIZE * 4).div_ceil(5),
max_compressed_per_block: DEFAULT_BLOCK_COMPRESSED_SIZE,
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 4).div_ceil(5),
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 3).div_ceil(5),
block_per_segment: DEFAULT_BLOCK_PER_SEGMENT,
}
}
Expand All @@ -58,7 +58,7 @@ impl BlockThresholds {
max_bytes_per_block: bytes_per_block * 2,
min_bytes_per_block: (bytes_per_block * 4).div_ceil(5),
max_compressed_per_block,
min_compressed_per_block: (max_compressed_per_block * 4).div_ceil(5),
min_compressed_per_block: (max_compressed_per_block * 3).div_ceil(5),
block_per_segment,
}
}
Expand Down Expand Up @@ -152,8 +152,8 @@ impl BlockThresholds {

let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
// Adjust the number of blocks based on block size thresholds.
let max_bytes_per_block = (4 * self.min_bytes_per_block).min(400 * 1024 * 1024);
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
let min_bytes_per_block = self.min_bytes_per_block.min(100 * 1024 * 1024);
let block_nums = if bytes_per_block > max_bytes_per_block {
// Case 1: If the block size is too bigger.
total_bytes.div_ceil(max_bytes_per_block)
Expand All @@ -166,4 +166,53 @@ impl BlockThresholds {
};
total_rows.div_ceil(block_nums.max(1)).max(1)
}

/// Calculates the optimal number of partitions (blocks) based on total data size and row count.
///
/// # Parameters
/// - `total_rows`: The total number of rows in the data.
/// - `total_bytes`: The total uncompressed size of the data in bytes.
/// - `total_compressed`: The total compressed size of the data in bytes.
///
/// # Returns
/// - The calculated number of partitions (blocks) needed.
#[inline]
pub fn calc_partitions_for_recluster(
&self,
total_rows: usize,
total_bytes: usize,
total_compressed: usize,
) -> usize {
// If the data is already compact enough, return a single partition.
if self.check_for_compact(total_rows, total_bytes)
&& total_compressed < 2 * self.min_compressed_per_block
{
return 1;
}

// Estimate the number of blocks based on row count and compressed size.
let by_rows = std::cmp::max(total_rows / self.max_rows_per_block, 1);
let by_compressed = total_compressed / self.max_compressed_per_block;
// If row-based block count is greater, use max rows per block as limit.
if by_rows >= by_compressed {
return by_rows;
}

// Adjust block count based on byte size thresholds.
let bytes_per_block = total_bytes.div_ceil(by_compressed);
let max_bytes = self.max_bytes_per_block.min(400 * 1024 * 1024);
let min_bytes = self.min_bytes_per_block.min(100 * 1024 * 1024);
let total_partitions = if bytes_per_block > max_bytes {
// Block size is too large.
total_bytes / max_bytes
} else if bytes_per_block < min_bytes {
// Block size is too small.
total_bytes / min_bytes
} else {
// Block size is acceptable.
by_compressed
};

std::cmp::max(total_partitions, 1)
}
}
36 changes: 31 additions & 5 deletions src/query/expression/tests/it/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use databend_common_expression::BlockThresholds;

fn default_thresholds() -> BlockThresholds {
BlockThresholds::new(1000, 1_000_000, 100_000, 4)
BlockThresholds::new(1_000, 1_000_000, 100_000, 4)
}

#[test]
Expand Down Expand Up @@ -101,14 +101,40 @@ fn test_calc_rows_for_recluster() {
);

// Case 1: If the block size is too bigger.
let result = t.calc_rows_for_recluster(4_000, 30_000_000, 600_000);
assert_eq!(result, 400);
let result = t.calc_rows_for_recluster(4_500, 30_000_000, 600_000);
assert_eq!(result, 300);

// Case 2: If the block size is too smaller.
let result = t.calc_rows_for_recluster(4_000, 2_000_000, 600_000);
assert_eq!(result, 800);
let result = t.calc_rows_for_recluster(4_000, 1_600_000, 600_000);
assert_eq!(result, 2000);

// Case 3: use the compressed-based block count.
let result = t.calc_rows_for_recluster(4_000, 10_000_000, 600_000);
assert_eq!(result, 667);
}

#[test]
fn test_calc_partitions_for_recluster() {
let t = default_thresholds();

// compact enough to skip further calculations
assert_eq!(t.calc_partitions_for_recluster(1000, 500_000, 100_000), 1);

// row-based block count exceeds compressed-based block count, use max rows per block.
assert_eq!(
t.calc_partitions_for_recluster(10_000, 2_000_000, 100_000),
10
);

// Case 1: If the block size is too bigger.
let result = t.calc_partitions_for_recluster(4_500, 30_000_000, 600_000);
assert_eq!(result, 15);

// Case 2: If the block size is too smaller.
let result = t.calc_partitions_for_recluster(4_000, 1_600_000, 800_000);
assert_eq!(result, 2);

// Case 3: use the compressed-based block count.
let result = t.calc_partitions_for_recluster(4_000, 10_000_000, 600_000);
assert_eq!(result, 6);
}
4 changes: 1 addition & 3 deletions src/query/functions/src/aggregates/aggregate_range_bound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,7 @@ pub fn try_create_aggregate_range_bound_function(
/// For a column with values `(0, 1, 3, 6, 8)` and `partition_num = 3`, the function calculates the
/// partition boundaries based on the distribution of the data. The boundaries might be `[1, 6]`.
pub fn aggregate_range_bound_function_desc() -> AggregateFunctionDescription {
AggregateFunctionDescription::creator(Box::new(
crate::aggregates::try_create_aggregate_range_bound_function,
))
AggregateFunctionDescription::creator(Box::new(try_create_aggregate_range_bound_function))
}

fn get_partitions(
Expand Down
103 changes: 97 additions & 6 deletions src/query/functions/src/scalars/hilbert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,33 @@ use databend_common_expression::types::BinaryType;
use databend_common_expression::types::DataType;
use databend_common_expression::types::GenericType;
use databend_common_expression::types::NullableType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::NumberType;
use databend_common_expression::types::ReturnType;
use databend_common_expression::types::StringType;
use databend_common_expression::types::ValueType;
use databend_common_expression::types::ALL_NUMERICS_TYPES;
use databend_common_expression::vectorize_with_builder_1_arg;
use databend_common_expression::vectorize_with_builder_2_arg;
use databend_common_expression::with_number_mapped_type;
use databend_common_expression::Column;
use databend_common_expression::FixedLengthEncoding;
use databend_common_expression::Function;
use databend_common_expression::FunctionDomain;
use databend_common_expression::FunctionEval;
use databend_common_expression::FunctionFactory;
use databend_common_expression::FunctionProperty;
use databend_common_expression::FunctionRegistry;
use databend_common_expression::FunctionSignature;
use databend_common_expression::ScalarRef;
use databend_common_expression::Value;
use rand::rngs::SmallRng;
use rand::Rng;
use rand::SeedableRng;

/// Registers Hilbert curve related functions with the function registry.
pub fn register(registry: &mut FunctionRegistry) {
// Register the hilbert_range_index function that calculates Hilbert indices for multi-dimensional data
// Register the hilbert_range_index function that calculates Hilbert indices for multidimensional data
let factory = FunctionFactory::Closure(Box::new(|_, args_type: &[DataType]| {
let args_num = args_type.len();
// The function supports 2, 3, 4, or 5 dimensions (each dimension requires 2 arguments)
Expand Down Expand Up @@ -97,7 +106,7 @@ pub fn register(registry: &mut FunctionRegistry) {
points.push(key);
}

// Convert the multi-dimensional point to a Hilbert index
// Convert the multidimensional point to a Hilbert index
// This maps the n-dimensional point to a 1-dimensional value
let points = points
.iter()
Expand Down Expand Up @@ -153,6 +162,88 @@ pub fn register(registry: &mut FunctionRegistry) {
builder.push(id);
}),
);

// We use true randomness by appending a random u8 value at the end of the binary key.
// This introduces noise to break tie cases in clustering keys that are not uniformly distributed.
// Although this may slightly affect the accuracy of range_bound estimation,
// it ensures that Hilbert index + scatter will no longer suffer from data skew.
// Moreover, since the noise is added at the tail, the original order of the keys is preserved.
registry.properties.insert(
"add_noise".to_string(),
FunctionProperty::default().non_deterministic(),
);

registry.register_passthrough_nullable_1_arg::<StringType, BinaryType, _, _>(
"add_noise",
|_, _| FunctionDomain::Full,
vectorize_with_builder_1_arg::<StringType, BinaryType>(|val, builder, _| {
let mut bytes = val.as_bytes().to_vec();
let mut rng = SmallRng::from_entropy();
bytes.push(rng.gen::<u8>());
builder.put_slice(&bytes);
builder.commit_row();
}),
);

for ty in ALL_NUMERICS_TYPES {
with_number_mapped_type!(|NUM_TYPE| match ty {
NumberDataType::NUM_TYPE => {
registry
.register_passthrough_nullable_1_arg::<NumberType<NUM_TYPE>, BinaryType, _, _>(
"add_noise",
|_, _| FunctionDomain::Full,
vectorize_with_builder_1_arg::<NumberType<NUM_TYPE>, BinaryType>(
|val, builder, _| {
let mut encoded = val.encode().to_vec();
let mut rng = SmallRng::from_entropy();
encoded.push(rng.gen::<u8>());
builder.put_slice(&encoded);
builder.commit_row();
},
),
);
}
})
}

registry.register_passthrough_nullable_2_arg::<StringType, NumberType<u64>, BinaryType, _, _>(
"add_noise",
|_, _, _| FunctionDomain::Full,
vectorize_with_builder_2_arg::<StringType, NumberType<u64>, BinaryType>(
|val, level, builder, _| {
let mut bytes = val.as_bytes().to_vec();
let mut rng = SmallRng::from_entropy();
for _ in 0..level {
bytes.push(rng.gen::<u8>());
}
builder.put_slice(&bytes);
builder.commit_row();
},
),
);

for ty in ALL_NUMERICS_TYPES {
with_number_mapped_type!(|NUM_TYPE| match ty {
NumberDataType::NUM_TYPE => {
registry
.register_passthrough_nullable_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType, _, _>(
"add_noise",
|_, _, _| FunctionDomain::Full,
vectorize_with_builder_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType>(
|val, level, builder, _| {
let mut encoded = val.encode().to_vec();
let mut rng = SmallRng::from_entropy();
for _ in 0..level {
encoded.push(rng.gen::<u8>());
}
builder.put_slice(&encoded);
builder.commit_row();
},
),
);
}
})
}
}

/// Calculates the partition ID for a value based on range boundaries.
Expand All @@ -166,10 +257,10 @@ pub fn register(registry: &mut FunctionRegistry) {
///
/// # Example
/// For boundaries [10, 20, 30]:
/// - Values < 10 get partition ID 0
/// - Values >= 10 and < 20 get partition ID 1
/// - Values >= 20 and < 30 get partition ID 2
/// - Values >= 30 get partition ID 3
/// - Values <= 10 get partition ID 0
/// - Values > 10 and <= 20 get partition ID 1
/// - Values > 20 and <= 30 get partition ID 2
/// - Values > 30 get partition ID 3
fn calc_range_partition_id(val: ScalarRef, arr: &Column) -> u64 {
let mut low = 0;
let mut high = arr.len();
Expand Down
Loading
Loading