Skip to content

Commit 2c82187

Browse files
committed
fix
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 3906b41 commit 2c82187

File tree

3 files changed

+33
-30
lines changed

3 files changed

+33
-30
lines changed

src/query/service/src/pipelines/processors/transforms/sort/bounds.rs

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ mod tests {
209209

210210
use super::*;
211211

212+
fn int32_columns<T>(data: T) -> Vec<Column>
213+
where T: IntoIterator<Item = Vec<i32>> {
214+
data.into_iter().map(Int32Type::from_data).collect()
215+
}
216+
212217
#[test]
213218
fn test_merge() -> Result<()> {
214219
{
@@ -231,17 +236,12 @@ mod tests {
231236

232237
assert_eq!(
233238
bounds,
234-
Bounds(vec![
235-
Int32Type::from_data(vec![6, 7]),
236-
Int32Type::from_data(vec![2, 6, 6]),
237-
Int32Type::from_data(vec![0, 0, 1]),
238-
])
239+
Bounds(int32_columns([vec![6, 7], vec![2, 6, 6], vec![0, 0, 1]]))
239240
);
240241
}
241242

242243
{
243-
let data = vec![vec![77, -2, 7], vec![3, 8, 6, 1, 1], vec![2]];
244-
244+
let data = [vec![77, -2, 7], vec![3, 8, 6, 1, 1], vec![2]];
245245
let data = data
246246
.into_iter()
247247
.map(|v| Bounds::from_column::<SimpleRowsDesc<Int32Type>>(Int32Type::from_data(v)))
@@ -250,13 +250,13 @@ mod tests {
250250

251251
assert_eq!(
252252
bounds,
253-
Bounds(vec![
254-
Int32Type::from_data(vec![-2]),
255-
Int32Type::from_data(vec![1, 1]),
256-
Int32Type::from_data(vec![3, 2]),
257-
Int32Type::from_data(vec![7, 6]),
258-
Int32Type::from_data(vec![77, 8]),
259-
])
253+
Bounds(int32_columns([
254+
vec![-2],
255+
vec![1, 1],
256+
vec![3, 2],
257+
vec![7, 6],
258+
vec![77, 8]
259+
]))
260260
);
261261
}
262262

@@ -274,16 +274,16 @@ mod tests {
274274
let bounds = Bounds::merge::<SimpleRowsDesc<Int32Type>>(data, 2)?;
275275

276276
let got = bounds.reduce(4, Int32Type::data_type()).unwrap();
277-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![8, 6, 2, 1])])); // 77 _8 7 _6 3 _2 1 _1 -2
277+
assert_eq!(got, Bounds(int32_columns([vec![8, 6, 2, 1]]))); // 77 _8 7 _6 3 _2 1 _1 -2
278278

279279
let got = bounds.reduce(3, Int32Type::data_type()).unwrap();
280-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![8, 3, 1])])); // 77 _8 7 6 _3 2 1 _1 -2
280+
assert_eq!(got, Bounds(int32_columns([vec![8, 3, 1]]))); // 77 _8 7 6 _3 2 1 _1 -2
281281

282282
let got = bounds.reduce(2, Int32Type::data_type()).unwrap();
283-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![7, 1])])); // 77 8 _7 6 3 2 _1 1 -2
283+
assert_eq!(got, Bounds(int32_columns([vec![7, 1]]))); // 77 8 _7 6 3 2 _1 1 -2
284284

285285
let got = bounds.reduce(1, Int32Type::data_type()).unwrap();
286-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![3])])); // 77 8 7 6 _3 2 1 1 -2
286+
assert_eq!(got, Bounds(int32_columns([vec![3]]))); // 77 8 7 6 _3 2 1 1 -2
287287

288288
Ok(())
289289
}
@@ -293,28 +293,23 @@ mod tests {
293293
let column = Int32Type::from_data(vec![1, 2, 2, 3, 3, 3, 4, 5, 5]);
294294
let bounds = Bounds::new_unchecked(column);
295295
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
296-
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![2, 3, 5])]));
296+
assert_eq!(reduced, Bounds(int32_columns([vec![2, 3, 5]])));
297297

298298
let column = Int32Type::from_data(vec![5, 5, 4, 3, 3, 3, 2, 2, 1]);
299299
let bounds = Bounds::new_unchecked(column);
300300
let reduced = bounds.dedup_reduce::<SimpleRowsDesc<Int32Type>>(3);
301-
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![4, 3, 1])]));
301+
assert_eq!(reduced, Bounds(int32_columns([vec![4, 3, 1]])));
302302

303-
let bounds_vec = [vec![5, 6, 7, 7], vec![3, 3, 4, 5], vec![1, 2, 2, 3]]
304-
.into_iter()
305-
.map(|v| Int32Type::from_data(v))
306-
.collect::<Vec<_>>();
307-
let bounds = Bounds(bounds_vec);
303+
let bounds = Bounds(int32_columns([vec![5, 6, 7, 7], vec![3, 3, 4, 5], vec![
304+
1, 2, 2, 3,
305+
]]));
308306
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(5);
309-
assert_eq!(
310-
reduced,
311-
Bounds(vec![Int32Type::from_data(vec![2, 3, 4, 6, 7])])
312-
);
307+
assert_eq!(reduced, Bounds(int32_columns([vec![2, 3, 4, 6, 7]])));
313308

314309
let column = Int32Type::from_data(vec![1, 1, 1, 1, 1]);
315310
let bounds = Bounds(vec![column]);
316311
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
317-
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![1])]));
312+
assert_eq!(reduced, Bounds(int32_columns([vec![1]])));
318313

319314
Ok(())
320315
}

src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ where
195195
let unit_size = self.memory_settings.spill_unit_size;
196196
let num_merge = bytes.div_ceil(unit_size).max(2);
197197
let batch_rows = rows.div_ceil(num_merge);
198+
199+
/// The memory will be doubled during merging.
200+
const MERGE_RATIO: usize = 2;
201+
let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2);
198202
log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}");
199203
SortSpillParams {
200204
batch_rows,

src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ where
163163
let unit_size = self.memory_settings.spill_unit_size;
164164
let num_merge = bytes.div_ceil(unit_size).max(2);
165165
let batch_rows = rows.div_ceil(num_merge);
166+
167+
/// The memory will be doubled during merging.
168+
const MERGE_RATIO: usize = 2;
169+
let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2);
166170
log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}");
167171
SortSpillParams {
168172
batch_rows,

0 commit comments

Comments
 (0)