Skip to content

Commit f34ba1b

Browse files
committed
update
Signed-off-by: coldWater <forsaken628@gmail.com>
1 parent d7d3520 commit f34ba1b

File tree

5 files changed

+121
-9
lines changed

5 files changed

+121
-9
lines changed

src/query/service/src/pipelines/processors/transforms/sort/bounds.rs

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use databend_common_exception::Result;
16-
use databend_common_expression::types::DataType;
1716
use databend_common_expression::Column;
1817
use databend_common_expression::DataBlock;
1918
use databend_common_expression::DataField;
@@ -31,6 +30,10 @@ pub struct Bounds(
3130
);
3231

3332
impl Bounds {
33+
pub fn new_unchecked(column: Column) -> Bounds {
34+
Bounds(vec![column])
35+
}
36+
3437
pub fn from_column<R: Rows>(column: Column) -> Result<Bounds> {
3538
let block = DataBlock::sort(
3639
&DataBlock::new_from_columns(vec![column]),
@@ -100,12 +103,12 @@ impl Bounds {
100103
if n == 0 {
101104
return Some(Self::default());
102105
}
103-
let count = self.len();
104-
if n >= count {
106+
let total = self.len();
107+
if n >= total {
105108
return None;
106109
}
107110

108-
let step = count / n;
111+
let step = total / n;
109112
let offset = step / 2;
110113
let indices = self
111114
.0
@@ -130,6 +133,55 @@ impl Bounds {
130133
indices.len(),
131134
)]))
132135
}
136+
137+
pub fn dedup_reduce<R: Rows>(&self, n: usize) -> Self {
138+
if n == 0 {
139+
return Self::default();
140+
}
141+
let total = self.len();
142+
let mut step = total as f64 / n as f64;
143+
let mut target = step / 2.0;
144+
let mut indices = Vec::with_capacity(n);
145+
let mut last: Option<(R, _)> = None;
146+
for (i, (b_idx, r_idx)) in self
147+
.0
148+
.iter()
149+
.enumerate()
150+
.rev()
151+
.flat_map(|(b_idx, col)| std::iter::repeat_n(b_idx, col.len()).zip(0..col.len()))
152+
.enumerate()
153+
{
154+
if indices.len() >= n {
155+
break;
156+
}
157+
if (i as f64) < target {
158+
continue;
159+
}
160+
161+
let cur_rows = R::from_column(&self.0[b_idx]).unwrap();
162+
if last
163+
.as_ref()
164+
.map(|(last_rows, last_idx)| cur_rows.row(r_idx) == last_rows.row(*last_idx))
165+
.unwrap_or_default()
166+
{
167+
continue;
168+
}
169+
170+
indices.push((b_idx as u32, r_idx as u32, 1));
171+
target += step;
172+
if (i as f64) > target && indices.len() < n {
173+
step = (total - i) as f64 / (n - indices.len()) as f64;
174+
target = i as f64 + step / 2.0;
175+
}
176+
last = Some((cur_rows, r_idx));
177+
}
178+
179+
Bounds(vec![Column::take_column_indices(
180+
&self.0,
181+
&indices,
182+
indices.len(),
183+
)])
184+
}
133185
}
134186

135187
impl SortedStream for Bounds {
@@ -146,7 +198,6 @@ impl SortedStream for Bounds {
146198

147199
#[cfg(test)]
148200
mod tests {
149-
use databend_common_expression::types::ArgType;
150201
use databend_common_expression::types::Int32Type;
151202
use databend_common_expression::FromData;
152203
use databend_common_pipeline_transforms::sort::SimpleRowsAsc;
@@ -232,4 +283,35 @@ mod tests {
232283

233284
Ok(())
234285
}
286+
287+
#[test]
288+
fn test_dedup_reduce() -> Result<()> {
289+
let column = Int32Type::from_data(vec![1, 2, 2, 3, 3, 3, 4, 5, 5]);
290+
let bounds = Bounds::new_unchecked(column);
291+
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
292+
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![2, 3, 5])]));
293+
294+
let column = Int32Type::from_data(vec![5, 5, 4, 3, 3, 3, 2, 2, 1]);
295+
let bounds = Bounds::new_unchecked(column);
296+
let reduced = bounds.dedup_reduce::<SimpleRowsDesc<Int32Type>>(3);
297+
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![4, 3, 1])]));
298+
299+
let bounds_vec = [vec![5, 6, 7, 7], vec![3, 3, 4, 5], vec![1, 2, 2, 3]]
300+
.into_iter()
301+
.map(Int32Type::from_data)
302+
.collect::<Vec<_>>();
303+
let bounds = Bounds(bounds_vec);
304+
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(5);
305+
assert_eq!(
306+
reduced,
307+
Bounds(vec![Int32Type::from_data(vec![2, 3, 4, 6, 7])])
308+
);
309+
310+
let column = Int32Type::from_data(vec![1, 1, 1, 1, 1]);
311+
let bounds = Bounds(vec![column]);
312+
let reduced = bounds.dedup_reduce::<SimpleRowsAsc<Int32Type>>(3);
313+
assert_eq!(reduced, Bounds(vec![Int32Type::from_data(vec![1])]));
314+
315+
Ok(())
316+
}
235317
}

src/query/service/src/pipelines/processors/transforms/sort/sort_shuffle.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl<R: Rows + 'static> Processor for TransformSortShuffle<R> {
191191
async fn async_process(&mut self) -> Result<()> {
192192
let bounds = match &self.step {
193193
Step::None if self.input.is_finished() => Bounds::default(),
194-
Step::Meta(meta) => meta.bounds.clone(),
194+
Step::Meta(meta) => meta.generate_bounds(),
195195
_ => unreachable!(),
196196
};
197197
self.state.commit_sample::<R>(self.id, bounds)?;
@@ -201,6 +201,27 @@ impl<R: Rows + 'static> Processor for TransformSortShuffle<R> {
201201
}
202202
}
203203

204+
impl SortCollectedMeta {
205+
fn generate_bounds(&self) -> Bounds {
206+
if self.bounds.len() > 1 {
207+
return self.bounds.clone();
208+
}
209+
210+
let Some(blocks) = self.blocks.get(self.blocks.len() / 2) else {
211+
return Bounds::default();
212+
};
213+
214+
blocks
215+
.get(blocks.len() / 2)
216+
.map(|block| match block.domain.len() {
217+
0 => Bounds::default(),
218+
1 => Bounds::new_unchecked(block.domain.clone()),
219+
_ => Bounds::new_unchecked(block.domain.slice(0..1)),
220+
})
221+
.unwrap_or_default()
222+
}
223+
}
224+
204225
pub struct SortSampleState {
205226
inner: RwLock<StateInner>,
206227
pub(super) done: WatchNotify,
@@ -261,7 +282,13 @@ impl StateInner {
261282
fn determine_bounds<R: Rows>(&mut self) -> Result<()> {
262283
let v = self.partial.drain(..).map(Option::unwrap).collect();
263284
let bounds = Bounds::merge::<R>(v, self.batch_rows)?;
264-
let bounds = bounds.reduce(self.partitions - 1).unwrap_or(bounds);
285+
286+
let n = self.partitions - 1;
287+
let bounds = if bounds.len() < n {
288+
bounds
289+
} else {
290+
bounds.dedup_reduce::<R>(n)
291+
};
265292
assert!(bounds.len() < self.partitions);
266293

267294
self.bounds = Some(bounds);

src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ pub struct SpillableBlock {
602602
data: Option<DataBlock>,
603603
rows: usize,
604604
location: Option<Location>,
605-
domain: Column,
605+
pub(super) domain: Column,
606606
processed: usize,
607607
}
608608

tests/sqllogictests/suites/mode/standalone/explain/window.test

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ set sort_spilling_memory_ratio = 0;
4747
statement ok
4848
set enable_parallel_multi_merge_sort = 0;
4949

50+
statement ok
51+
set enable_range_shuffle_sort = 0;
52+
5053
query T
5154
explain pipeline SELECT depname, empno, salary, sum(salary) OVER (PARTITION BY depname ORDER BY empno) FROM empsalary ORDER BY depname, empno;
5255
----

tests/sqllogictests/suites/stage/formats/parquet/read_policy.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ select t, t:a from @data/parquet/tuple.parquet order by id desc limit 2;
135135

136136
# topk contains output
137137
query TT
138-
select id, t:b, t:a from @data/parquet/tuple.parquet order by t:a desc limit 2;
138+
select id, t:b, t:a from @data/parquet/tuple.parquet order by t:a desc, id desc limit 2;
139139
----
140140
3 c 3
141141
2 b 3

0 commit comments

Comments
 (0)