Skip to content

Commit 3906b41

Browse files
committed
update
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent 0ee6154 commit 3906b41

File tree

5 files changed

+122
-9
lines changed

5 files changed

+122
-9
lines changed

src/query/service/src/pipelines/processors/transforms/sort/bounds.rs

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ pub struct Bounds(
3131
);
3232

3333
impl Bounds {
34+
pub fn new_unchecked(column: Column) -> Bounds {
35+
Bounds(vec![column])
36+
}
37+
3438
pub fn from_column<R: Rows>(column: Column) -> Result<Bounds> {
3539
let block = DataBlock::sort(
3640
&DataBlock::new_from_columns(vec![column]),
@@ -100,12 +104,12 @@ impl Bounds {
100104
if n == 0 {
101105
return Some(Self::default());
102106
}
103-
let count = self.len();
104-
if n >= count {
107+
let total = self.len();
108+
if n >= total {
105109
return None;
106110
}
107111

108-
let step = count / n;
112+
let step = total / n;
109113
let offset = step / 2;
110114
let indices = self
111115
.0
@@ -131,6 +135,56 @@ impl Bounds {
131135
indices.len(),
132136
)]))
133137
}
138+
139+
pub fn dedup_reduce<R: Rows>(&self, n: usize) -> Self {
140+
if n == 0 {
141+
return Self::default();
142+
}
143+
let total = self.len();
144+
let mut step = total as f64 / n as f64;
145+
let mut target = step / 2.0;
146+
let mut indices = Vec::with_capacity(n);
147+
let mut last: Option<(R, _)> = None;
148+
for (i, (b_idx, r_idx)) in self
149+
.0
150+
.iter()
151+
.enumerate()
152+
.rev()
153+
.flat_map(|(b_idx, col)| std::iter::repeat_n(b_idx, col.len()).zip(0..col.len()))
154+
.enumerate()
155+
{
156+
if indices.len() >= n {
157+
break;
158+
}
159+
if (i as f64) < target {
160+
continue;
161+
}
162+
163+
let cur_rows = R::from_column(&self.0[b_idx]).unwrap();
164+
if last
165+
.as_ref()
166+
.map(|(last_rows, last_idx)| cur_rows.row(r_idx) == last_rows.row(*last_idx))
167+
.unwrap_or_default()
168+
{
169+
continue;
170+
}
171+
172+
indices.push((b_idx as u32, r_idx as u32, 1));
173+
target += step;
174+
if (i as f64) > target && indices.len() < n {
175+
step = (total - i) as f64 / (n - indices.len()) as f64;
176+
target = i as f64 + step / 2.0;
177+
}
178+
last = Some((cur_rows, r_idx));
179+
}
180+
181+
Bounds(vec![Column::take_column_indices(
182+
&self.0,
183+
R::data_type(),
184+
&indices,
185+
indices.len(),
186+
)])
187+
}
134188
}
135189

136190
impl SortedStream for Bounds {
@@ -233,4 +287,35 @@ mod tests {
233287

234288
Ok(())
235289
}
290+
291+
#[test]
292+
fn test_dedup_reduce() -> Result<()> {
293+
let column = Int32Type::from_data(vec![1, 2, 2, 3, 3, 3, 4, 5, 5]);
294+
let bounds = Bounds::new_unchecked(column);
295+
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
296+
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![2, 3, 5])]));
297+
298+
let column = Int32Type::from_data(vec![5, 5, 4, 3, 3, 3, 2, 2, 1]);
299+
let bounds = Bounds::new_unchecked(column);
300+
let reduced = bounds.dedup_reduce::<SimpleRowsDesc<Int32Type>>(3);
301+
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![4, 3, 1])]));
302+
303+
let bounds_vec = [vec![5, 6, 7, 7], vec![3, 3, 4, 5], vec![1, 2, 2, 3]]
304+
.into_iter()
305+
.map(|v| Int32Type::from_data(v))
306+
.collect::<Vec<_>>();
307+
let bounds = Bounds(bounds_vec);
308+
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(5);
309+
assert_eq!(
310+
reduced,
311+
Bounds(vec![Int32Type::from_data(vec![2, 3, 4, 6, 7])])
312+
);
313+
314+
let column = Int32Type::from_data(vec![1, 1, 1, 1, 1]);
315+
let bounds = Bounds(vec![column]);
316+
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
317+
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![1])]));
318+
319+
Ok(())
320+
}
236321
}

src/query/service/src/pipelines/processors/transforms/sort/sort_shuffle.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl<R: Rows + 'static> Processor for TransformSortShuffle<R> {
191191
async fn async_process(&mut self) -> Result<()> {
192192
let bounds = match &self.step {
193193
Step::None if self.input.is_finished() => Bounds::default(),
194-
Step::Meta(meta) => meta.bounds.clone(),
194+
Step::Meta(meta) => meta.generate_bounds(),
195195
_ => unreachable!(),
196196
};
197197
self.state.commit_sample::<R>(self.id, bounds)?;
@@ -201,6 +201,27 @@ impl<R: Rows + 'static> Processor for TransformSortShuffle<R> {
201201
}
202202
}
203203

204+
impl SortCollectedMeta {
205+
fn generate_bounds(&self) -> Bounds {
206+
if self.bounds.len() > 1 {
207+
return self.bounds.clone();
208+
}
209+
210+
let Some(blocks) = self.blocks.get(self.blocks.len() / 2) else {
211+
return Bounds::default();
212+
};
213+
214+
blocks
215+
.get(blocks.len() / 2)
216+
.map(|block| match block.domain.len() {
217+
0 => Bounds::default(),
218+
1 => Bounds::new_unchecked(block.domain.clone()),
219+
_ => Bounds::new_unchecked(block.domain.slice(0..1)),
220+
})
221+
.unwrap_or_default()
222+
}
223+
}
224+
204225
pub struct SortSampleState {
205226
inner: RwLock<StateInner>,
206227
pub(super) done: WatchNotify,
@@ -261,9 +282,13 @@ impl StateInner {
261282
fn determine_bounds<R: Rows>(&mut self) -> Result<()> {
262283
let v = self.partial.drain(..).map(Option::unwrap).collect();
263284
let bounds = Bounds::merge::<R>(v, self.batch_rows)?;
264-
let bounds = bounds
265-
.reduce(self.partitions - 1, R::data_type())
266-
.unwrap_or(bounds);
285+
286+
let n = self.partitions - 1;
287+
let bounds = if bounds.len() < n {
288+
bounds
289+
} else {
290+
bounds.dedup_reduce::<R>(n)
291+
};
267292
assert!(bounds.len() < self.partitions);
268293

269294
self.bounds = Some(bounds);

src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ pub struct SpillableBlock {
596596
data: Option<DataBlock>,
597597
rows: usize,
598598
location: Option<Location>,
599-
domain: Column,
599+
pub(super) domain: Column,
600600
processed: usize,
601601
}
602602

tests/sqllogictests/suites/mode/standalone/explain/window.test

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ set sort_spilling_memory_ratio = 0;
4747
statement ok
4848
set enable_parallel_multi_merge_sort = 0;
4949

50+
statement ok
51+
set enable_range_shuffle_sort = 0;
52+
5053
query T
5154
explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno;
5255
----

tests/sqllogictests/suites/stage/formats/parquet/read_policy.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ select t, t:a from @data/parquet/tuple.parquet order by id desc limit 2;
135135

136136
# topk contains output
137137
query TT
138-
select id, t:b, t:a from @data/parquet/tuple.parquet order by t:a desc limit 2;
138+
select id, t:b, t:a from @data/parquet/tuple.parquet order by t:a desc, id desc limit 2;
139139
----
140140
3 c 3
141141
2 b 3

0 commit comments

Comments
 (0)