Skip to content

Commit 0cc75f6

Browse files
committed
add test
1 parent 133e3b5 commit 0cc75f6

File tree

5 files changed

+58
-16
lines changed

5 files changed

+58
-16
lines changed

src/query/ee/src/hilbert_clustering/handler.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
6363
let max_bytes_per_block = fuse_table.get_option(
6464
FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD,
6565
DEFAULT_BLOCK_BUFFER_SIZE,
66-
);
66+
) * 2;
6767
let hilbert_min_bytes = std::cmp::max(
6868
hilbert_clustering_min_bytes,
6969
max_bytes_per_block * block_per_seg,
@@ -76,6 +76,7 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
7676
let mut checker = ReclusterChecker::new(
7777
cluster_key_id,
7878
hilbert_min_bytes,
79+
block_per_seg,
7980
push_downs.as_ref().is_none_or(|v| v.filters.is_none()),
8081
);
8182
'FOR: for chunk in segment_locations.chunks(chunk_size) {
@@ -139,19 +140,29 @@ struct ReclusterChecker {
139140
hilbert_min_bytes: usize,
140141
total_bytes: usize,
141142

143+
hilbert_min_blocks: usize,
144+
total_blocks: usize,
145+
142146
finished: bool,
143147
// Whether the target segments is at the head of snapshot.
144148
head_of_snapshot: bool,
145149
}
146150

147151
impl ReclusterChecker {
148-
fn new(default_cluster_id: u32, hilbert_min_bytes: usize, head_of_snapshot: bool) -> Self {
152+
fn new(
153+
default_cluster_id: u32,
154+
hilbert_min_bytes: usize,
155+
hilbert_min_blocks: usize,
156+
head_of_snapshot: bool,
157+
) -> Self {
149158
Self {
150159
segments: vec![],
151160
last_segment: None,
152161
default_cluster_id,
162+
hilbert_min_blocks,
153163
hilbert_min_bytes,
154164
total_bytes: 0,
165+
total_blocks: 0,
155166
finished: false,
156167
head_of_snapshot,
157168
}
@@ -164,10 +175,14 @@ impl ReclusterChecker {
164175

165176
if segment_should_recluster || !self.head_of_snapshot {
166177
self.total_bytes += segment.summary.uncompressed_byte_size as usize;
178+
self.total_blocks += segment.summary.block_count as usize;
167179
self.segments.push((location.clone(), segment.clone()));
168180
}
169181

170-
if !segment_should_recluster || self.total_bytes >= self.hilbert_min_bytes {
182+
if !segment_should_recluster
183+
|| (self.total_bytes >= self.hilbert_min_bytes
184+
&& self.total_blocks >= self.hilbert_min_blocks)
185+
{
171186
if self.check_for_recluster() {
172187
self.finished = true;
173188
return true;
@@ -208,6 +223,7 @@ impl ReclusterChecker {
208223

209224
fn reset(&mut self) {
210225
self.total_bytes = 0;
226+
self.total_blocks = 0;
211227
self.head_of_snapshot = false;
212228
self.segments.clear();
213229
}

src/query/expression/src/utils/block_thresholds.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ impl BlockThresholds {
153153
let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
154154
// Adjust the number of blocks based on block size thresholds.
155155
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
156-
let min_bytes_per_block = self.min_bytes_per_block.min(100 * 1024 * 1024);
156+
let min_bytes_per_block = max_bytes_per_block / 2;
157157
let block_nums = if bytes_per_block > max_bytes_per_block {
158158
// Case 1: If the block size is too bigger.
159159
total_bytes.div_ceil(max_bytes_per_block)

src/query/expression/tests/it/block_thresholds.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use databend_common_expression::BlockThresholds;
1616

1717
fn default_thresholds() -> BlockThresholds {
18-
BlockThresholds::new(1000, 1_000_000, 100_000, 4)
18+
BlockThresholds::new(1_000, 1_000_000, 100_000, 4)
1919
}
2020

2121
#[test]
@@ -101,14 +101,41 @@ fn test_calc_rows_for_recluster() {
101101
);
102102

103103
// Case 1: If the block size is too bigger.
104-
let result = t.calc_rows_for_recluster(4_000, 30_000_000, 600_000);
105-
assert_eq!(result, 400);
104+
let result = t.calc_rows_for_recluster(4_500, 30_000_000, 600_000);
105+
assert_eq!(result, 300);
106106

107107
// Case 2: If the block size is too smaller.
108-
let result = t.calc_rows_for_recluster(4_000, 2_000_000, 600_000);
109-
assert_eq!(result, 800);
108+
let result = t.calc_rows_for_recluster(4_000, 4_000_000, 600_000);
109+
assert_eq!(result, 1000);
110110

111111
// Case 3: use the compressed-based block count.
112112
let result = t.calc_rows_for_recluster(4_000, 10_000_000, 600_000);
113113
assert_eq!(result, 667);
114114
}
115+
116+
#[test]
117+
fn test_calc_partitions_for_recluster() {
118+
let t = default_thresholds();
119+
120+
// compact enough to skip further calculations
121+
assert_eq!(t.calc_partitions_for_recluster(1000, 500_000, 100_000), 1);
122+
123+
// row-based block count exceeds compressed-based block count, use max rows per block.
124+
assert_eq!(
125+
t.calc_partitions_for_recluster(10_000, 2_000_000, 100_000),
126+
10
127+
);
128+
129+
// Case 1: If the block size is too bigger.
130+
let result = t.calc_partitions_for_recluster(4_500, 30_000_000, 600_000);
131+
assert_eq!(result, 15);
132+
133+
// Case 2: If the block size is too smaller.
134+
let result = t.calc_partitions_for_recluster(4_000, 4_000_000, 600_000);
135+
assert_eq!(result, 4);
136+
137+
// Case 3: use the compressed-based block count.
138+
let result = t.calc_partitions_for_recluster(4_000, 10_000_000, 600_000);
139+
assert_eq!(result, 6);
140+
}
141+

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ impl PipelineBuilder {
7878
partition.range_width,
7979
window_spill_settings.clone(),
8080
disk_spill.clone(),
81+
partition.rows_per_block,
8182
partition.bytes_per_block,
8283
)?,
8384
)))

src/query/service/src/pipelines/processors/transforms/window/partition/transform_hilbert_collect.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl TransformHilbertCollect {
7373
num_partitions: usize,
7474
memory_settings: MemorySettings,
7575
disk_spill: Option<SpillerDiskConfig>,
76+
max_block_rows: usize,
7677
max_block_size: usize,
7778
) -> Result<Self> {
7879
// Calculate the partition ids collected by the processor.
@@ -99,13 +100,10 @@ impl TransformHilbertCollect {
99100
let spiller = Spiller::create(ctx, operator, spill_config)?;
100101

101102
// Create the window partition buffer.
102-
let sort_block_size = settings.get_window_partition_sort_block_size()? as usize;
103-
let buffer = WindowPartitionBuffer::new(
104-
spiller,
105-
partitions.len(),
106-
sort_block_size,
107-
memory_settings,
108-
)?;
103+
let max_block_rows =
104+
max_block_rows.min(settings.get_window_partition_sort_block_size()? as usize);
105+
let buffer =
106+
WindowPartitionBuffer::new(spiller, partitions.len(), max_block_rows, memory_settings)?;
109107

110108
Ok(Self {
111109
input,

0 commit comments

Comments
 (0)