Skip to content

Commit e1d7515

Browse files
committed
fix
1 parent 51676bd commit e1d7515

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
156156
cluster_key_id,
157157
1,
158158
column_ids,
159+
1,
159160
);
160161
let (_, parts) = mutator
161162
.target_select(compact_segments, ReclusterMode::Recluster)
@@ -280,6 +281,7 @@ async fn test_safety_for_recluster() -> Result<()> {
280281
cluster_key_id,
281282
max_tasks,
282283
column_ids,
284+
500,
283285
));
284286
let (mode, selected_segs) = mutator.select_segments(&compact_segments, 8)?;
285287
// select the blocks with the highest depth.

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ pub struct ReclusterMutator {
7272
pub(crate) max_tasks: usize,
7373
pub(crate) cluster_key_types: Vec<DataType>,
7474
pub(crate) column_ids: HashSet<u32>,
75+
76+
average_size: usize,
7577
}
7678

7779
impl ReclusterMutator {
@@ -102,6 +104,13 @@ impl ReclusterMutator {
102104
// NOTE: The snapshot schema does not contain the stream column.
103105
let column_ids = snapshot.schema.to_leaf_column_id_set();
104106

107+
let average_size = cmp::max(
108+
snapshot
109+
.summary
110+
.uncompressed_byte_size
111+
.div_ceil(snapshot.summary.block_count) as usize,
112+
block_thresholds.max_bytes_per_block / 2,
113+
);
105114
Ok(Self {
106115
ctx,
107116
schema,
@@ -111,6 +120,7 @@ impl ReclusterMutator {
111120
max_tasks,
112121
cluster_key_types,
113122
column_ids,
123+
average_size,
114124
})
115125
}
116126

@@ -125,6 +135,7 @@ impl ReclusterMutator {
125135
cluster_key_id: u32,
126136
max_tasks: usize,
127137
column_ids: HashSet<u32>,
138+
average_size: usize,
128139
) -> Self {
129140
Self {
130141
ctx,
@@ -135,6 +146,7 @@ impl ReclusterMutator {
135146
max_tasks,
136147
cluster_key_types,
137148
column_ids,
149+
average_size,
138150
}
139151
}
140152

@@ -196,8 +208,7 @@ impl ReclusterMutator {
196208
.get_recluster_block_size()?
197209
.min(avail_memory_usage * 30 / 100) as usize;
198210
// specify a rather small value, so that `recluster_block_size` might be tuned to lower value.
199-
let max_blocks_num =
200-
(memory_threshold / self.block_thresholds.max_bytes_per_block).max(2) * self.max_tasks;
211+
let max_blocks_num = (memory_threshold / self.average_size).max(2) * self.max_tasks;
201212
let block_per_seg = self.block_thresholds.block_per_segment;
202213

203214
// Prepare task generation parameters

0 commit comments

Comments
 (0)