Skip to content

Commit afb47eb

Browse files
committed
fix
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent f34ba1b commit afb47eb

File tree

2 files changed

+29
-30
lines changed

2 files changed

+29
-30
lines changed

src/query/service/src/pipelines/processors/transforms/sort/bounds.rs

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ mod tests {
205205

206206
use super::*;
207207

208+
fn int32_columns<T>(data: T) -> Vec<Column>
209+
where T: IntoIterator<Item = Vec<i32>> {
210+
data.into_iter().map(Int32Type::from_data).collect()
211+
}
212+
208213
#[test]
209214
fn test_merge() -> Result<()> {
210215
{
@@ -227,17 +232,12 @@ mod tests {
227232

228233
assert_eq!(
229234
bounds,
230-
Bounds(vec![
231-
Int32Type::from_data(vec![6, 7]),
232-
Int32Type::from_data(vec![2, 6, 6]),
233-
Int32Type::from_data(vec![0, 0, 1]),
234-
])
235+
Bounds(int32_columns([vec![6, 7], vec![2, 6, 6], vec![0, 0, 1]]))
235236
);
236237
}
237238

238239
{
239-
let data = vec![vec![77, -2, 7], vec![3, 8, 6, 1, 1], vec![2]];
240-
240+
let data = [vec![77, -2, 7], vec![3, 8, 6, 1, 1], vec![2]];
241241
let data = data
242242
.into_iter()
243243
.map(|v| Bounds::from_column::<SimpleRowsDesc<Int32Type>>(Int32Type::from_data(v)))
@@ -246,13 +246,13 @@ mod tests {
246246

247247
assert_eq!(
248248
bounds,
249-
Bounds(vec![
250-
Int32Type::from_data(vec![-2]),
251-
Int32Type::from_data(vec![1, 1]),
252-
Int32Type::from_data(vec![3, 2]),
253-
Int32Type::from_data(vec![7, 6]),
254-
Int32Type::from_data(vec![77, 8]),
255-
])
249+
Bounds(int32_columns([
250+
vec![-2],
251+
vec![1, 1],
252+
vec![3, 2],
253+
vec![7, 6],
254+
vec![77, 8]
255+
]))
256256
);
257257
}
258258

@@ -270,16 +270,16 @@ mod tests {
270270
let bounds = Bounds::merge::<SimpleRowsDesc<Int32Type>>(data, 2)?;
271271

272272
let got = bounds.reduce(4).unwrap();
273-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![8, 6, 2, 1])])); // 77 _8 7 _6 3 _2 1 _1 -2
273+
assert_eq!(got, Bounds(int32_columns([vec![8, 6, 2, 1]]))); // 77 _8 7 _6 3 _2 1 _1 -2
274274

275275
let got = bounds.reduce(3).unwrap();
276-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![8, 3, 1])])); // 77 _8 7 6 _3 2 1 _1 -2
276+
assert_eq!(got, Bounds(int32_columns([vec![8, 3, 1]]))); // 77 _8 7 6 _3 2 1 _1 -2
277277

278278
let got = bounds.reduce(2).unwrap();
279-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![7, 1])])); // 77 8 _7 6 3 2 _1 1 -2
279+
assert_eq!(got, Bounds(int32_columns([vec![7, 1]]))); // 77 8 _7 6 3 2 _1 1 -2
280280

281281
let got = bounds.reduce(1).unwrap();
282-
assert_eq!(got, Bounds(vec![Int32Type::from_data(vec![3])])); // 77 8 7 6 _3 2 1 1 -2
282+
assert_eq!(got, Bounds(int32_columns([vec![3]]))); // 77 8 7 6 _3 2 1 1 -2
283283

284284
Ok(())
285285
}
@@ -289,28 +289,23 @@ mod tests {
289289
let column = Int32Type::from_data(vec![1, 2, 2, 3, 3, 3, 4, 5, 5]);
290290
let bounds = Bounds::new_unchecked(column);
291291
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
292-
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![2, 3, 5])]));
292+
assert_eq!(reduced, Bounds(int32_columns([vec![2, 3, 5]])));
293293

294294
let column = Int32Type::from_data(vec![5, 5, 4, 3, 3, 3, 2, 2, 1]);
295295
let bounds = Bounds::new_unchecked(column);
296296
let reduced = bounds.dedup_reduce::<SimpleRowsDesc<Int32Type>>(3);
297-
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![4, 3, 1])]));
297+
assert_eq!(reduced, Bounds(int32_columns([vec![4, 3, 1]])));
298298

299-
let bounds_vec = [vec![5, 6, 7, 7], vec![3, 3, 4, 5], vec![1, 2, 2, 3]]
300-
.into_iter()
301-
.map(Int32Type::from_data)
302-
.collect::<Vec<_>>();
303-
let bounds = Bounds(bounds_vec);
299+
let bounds = Bounds(int32_columns([vec![5, 6, 7, 7], vec![3, 3, 4, 5], vec![
300+
1, 2, 2, 3,
301+
]]));
304302
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(5);
305-
assert_eq!(
306-
reduced,
307-
Bounds(vec![Int32Type::from_data(vec![2, 3, 4, 6, 7])])
308-
);
303+
assert_eq!(reduced, Bounds(int32_columns([vec![2, 3, 4, 6, 7]])));
309304

310305
let column = Int32Type::from_data(vec![1, 1, 1, 1, 1]);
311306
let bounds = Bounds(vec![column]);
312307
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
313-
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![1])]));
308+
assert_eq!(reduced, Bounds(int32_columns([vec![1]])));
314309

315310
Ok(())
316311
}

src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ where
163163
let unit_size = self.memory_settings.spill_unit_size;
164164
let num_merge = bytes.div_ceil(unit_size).max(2);
165165
let batch_rows = rows.div_ceil(num_merge);
166+
167+
/// The memory will be doubled during merging.
168+
const MERGE_RATIO: usize = 2;
169+
let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2);
166170
log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}");
167171
SortSpillParams {
168172
batch_rows,

0 commit comments

Comments
 (0)