Skip to content

Commit 0944c51

Browse files
committed
bound_index
1 parent 9cbe207 commit 0944c51

File tree

7 files changed

+38
-51
lines changed

7 files changed

+38
-51
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ tokio = { workspace = true }
171171
tokio-stream = { workspace = true, features = ["net"] }
172172
toml = { workspace = true, features = ["parse"] }
173173
tonic = { workspace = true }
174-
twox-hash = { workspace = true }
175174
typetag = { workspace = true }
176175
url = { workspace = true }
177176
uuid = { workspace = true }

src/query/service/src/pipelines/processors/transforms/sort/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use databend_common_expression::BlockMetaInfo;
2020
use databend_common_expression::BlockMetaInfoDowncast;
2121
use databend_common_expression::DataBlock;
2222
use databend_common_expression::DataSchemaRef;
23-
use databend_common_expression::Scalar;
2423
use databend_common_pipeline_transforms::SortSpillParams;
2524
use sort_spill::SpillableBlock;
2625

@@ -107,7 +106,14 @@ impl BlockMetaInfo for SortExchangeMeta {
107106

108107
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
109108
pub struct SortBound {
110-
bound: Option<Scalar>,
109+
bound_index: u32,
110+
}
111+
112+
impl SortBound {
113+
fn create(bound_index: u32) -> Box<dyn BlockMetaInfo> {
114+
debug_assert!(bound_index != u32::MAX);
115+
SortBound { bound_index }.boxed()
116+
}
111117
}
112118

113119
#[typetag::serde(name = "sort_bound")]

src/query/service/src/pipelines/processors/transforms/sort/sort_exchange_injector.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::hash::Hash;
16-
use std::hash::Hasher;
1715
use std::sync::Arc;
1816

1917
use databend_common_exception::Result;
2018
use databend_common_expression::BlockMetaInfoDowncast;
2119
use databend_common_expression::DataBlock;
22-
use databend_common_expression::Scalar;
2320
use databend_common_pipeline_core::Pipeline;
2421
use databend_common_settings::FlightCompression;
25-
use twox_hash::XxHash64;
2622

2723
use crate::pipelines::processors::transforms::SortBound;
2824
use crate::servers::flight::v1::exchange::DataExchange;
@@ -88,7 +84,7 @@ impl ExchangeInjector for SortInjector {
8884
}
8985

9086
pub struct SortBoundScatter {
91-
partitions: u64,
87+
partitions: u32,
9288
}
9389

9490
impl FlightScatter for SortBoundScatter {
@@ -101,23 +97,13 @@ impl FlightScatter for SortBoundScatter {
10197
}
10298
}
10399

104-
pub(super) fn bound_scatter(data_block: DataBlock, n: u64) -> Result<Vec<DataBlock>> {
100+
pub(super) fn bound_scatter(data_block: DataBlock, n: u32) -> Result<Vec<DataBlock>> {
105101
let meta = data_block
106102
.get_meta()
107103
.and_then(SortBound::downcast_ref_from)
108104
.unwrap();
109105

110-
let bound = match &meta.bound {
111-
Some(bound) => {
112-
debug_assert!(!bound.is_null());
113-
bound
114-
}
115-
None => &Scalar::Null,
116-
};
117-
118-
let mut hasher = XxHash64::default();
119-
bound.hash(&mut hasher);
120-
let index = hasher.finish() % n;
106+
let index = meta.bound_index % n;
121107

122108
Ok(std::iter::repeat_n(DataBlock::empty(), index as _)
123109
.chain(Some(data_block))

src/query/service/src/pipelines/processors/transforms/sort/sort_merge_stream.rs

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414

1515
use std::any::Any;
16-
use std::cmp::Ordering;
1716
use std::marker::PhantomData;
1817
use std::sync::Arc;
1918

@@ -22,7 +21,6 @@ use databend_common_expression::BlockMetaInfoDowncast;
2221
use databend_common_expression::Column;
2322
use databend_common_expression::DataBlock;
2423
use databend_common_expression::DataSchemaRef;
25-
use databend_common_expression::Scalar;
2624
use databend_common_pipeline_core::processors::Event;
2725
use databend_common_pipeline_core::processors::InputPort;
2826
use databend_common_pipeline_core::processors::OutputPort;
@@ -46,7 +44,7 @@ where A: SortAlgorithm
4644
limit: Option<usize>,
4745

4846
output_data: Option<DataBlock>,
49-
bound: Option<Scalar>,
47+
bound_index: u32,
5048
inner: std::result::Result<Merger<A, Stream<A>>, Vec<Stream<A>>>,
5149
}
5250

@@ -67,7 +65,7 @@ where A: SortAlgorithm
6765
data: None,
6866
input: input.clone(),
6967
remove_order_col,
70-
bound: None,
68+
bound_index: None,
7169
sort_row_offset: schema.fields().len() - 1,
7270
_r: PhantomData,
7371
})
@@ -80,7 +78,7 @@ where A: SortAlgorithm
8078
block_size,
8179
limit,
8280
output_data: None,
83-
bound: None,
81+
bound_index: u32::MAX,
8482
inner: Err(streams),
8583
})
8684
}
@@ -119,11 +117,7 @@ where A: SortAlgorithm + 'static
119117

120118
fn process(&mut self) -> Result<()> {
121119
if let Some(block) = self.inner.as_mut().ok().unwrap().next_block()? {
122-
let meta = SortBound {
123-
bound: self.bound.clone(),
124-
}
125-
.boxed();
126-
self.output_data = Some(block.add_meta(Some(meta))?);
120+
self.output_data = Some(block.add_meta(Some(SortBound::create(self.bound_index)))?);
127121
};
128122
Ok(())
129123
}
@@ -158,20 +152,17 @@ where A: SortAlgorithm + 'static
158152
.get_meta()
159153
.and_then(SortBound::downcast_ref_from)
160154
.expect("require a SortBound");
161-
bounds.push(&meta.bound)
155+
bounds.push(meta.bound_index)
162156
}
163157

164-
self.bound = match bounds.iter().min_by(|a, b| match (a, b) {
165-
(None, None) => Ordering::Equal,
166-
(None, Some(_)) => Ordering::Greater,
167-
(Some(_), None) => Ordering::Less,
168-
(Some(a), Some(b)) => A::Rows::scalar_as_item(a).cmp(&A::Rows::scalar_as_item(b)),
169-
}) {
170-
Some(bound) => (*bound).clone(),
158+
let bound_index = match bounds.iter().min() {
159+
Some(index) => *index,
171160
None => return Ok(Event::Finished),
172161
};
162+
assert!(self.bound_index != u32::MAX || bound_index > self.bound_index);
163+
self.bound_index = bound_index;
173164
for stream in streams.iter_mut() {
174-
stream.bound = self.bound.clone();
165+
stream.bound_index = Some(self.bound_index);
175166
}
176167

177168
let streams = std::mem::take(streams);
@@ -189,7 +180,7 @@ struct BoundedInputStream<R: Rows> {
189180
data: Option<DataBlock>,
190181
input: Arc<InputPort>,
191182
remove_order_col: bool,
192-
bound: Option<Scalar>,
183+
bound_index: Option<u32>,
193184
sort_row_offset: usize,
194185
_r: PhantomData<R>,
195186
}
@@ -244,9 +235,9 @@ impl<R: Rows> BoundedInputStream<R> {
244235
.and_then(SortBound::downcast_ref_from)
245236
.expect("require a SortBound");
246237

247-
if meta.bound == self.bound {
238+
if meta.bound_index == self.bound_index.unwrap() {
248239
self.data.take().map(|mut data| {
249-
let _ = data.take_meta();
240+
data.take_meta().unwrap();
250241
data
251242
})
252243
} else {

src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ where
115115

116116
let OutputData {
117117
block,
118-
bound,
118+
bound: (bound_index, _),
119119
finish,
120120
} = spill_sort.on_restore().await?;
121121
if let Some(block) = block {
122-
let mut block = block.add_meta(Some(SortBound { bound }.boxed()))?;
122+
let mut block = block.add_meta(Some(SortBound::create(bound_index)))?;
123123
if self.remove_order_col {
124124
block.pop_columns(1);
125125
}

src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ struct StepSort<A: SortAlgorithm> {
6868
/// Each boundary represents a cutoff point where data less than or equal to it belongs to one partition.
6969
bounds: Bounds,
7070
cur_bound: Option<Scalar>,
71+
bound_index: i32,
7172

7273
subsequent: Vec<BoundBlockStream<A::Rows, Arc<Spiller>>>,
7374
current: Vec<BoundBlockStream<A::Rows, Arc<Spiller>>>,
@@ -116,6 +117,7 @@ where A: SortAlgorithm
116117
params,
117118
bounds,
118119
cur_bound: None,
120+
bound_index: -1,
119121
subsequent,
120122
current: vec![],
121123
output_merger: None,
@@ -172,7 +174,7 @@ where A: SortAlgorithm
172174
sort.merge_current(&self.base).await?;
173175
Ok(OutputData {
174176
block: None,
175-
bound: None,
177+
bound: (u32::MAX, None),
176178
finish: false,
177179
})
178180
} else {
@@ -303,6 +305,7 @@ impl<A: SortAlgorithm> StepCollect<A> {
303305
Ok(StepSort {
304306
bounds,
305307
cur_bound: None,
308+
bound_index: -1,
306309
subsequent: std::mem::take(&mut self.streams),
307310
current: vec![],
308311
output_merger: None,
@@ -313,7 +316,7 @@ impl<A: SortAlgorithm> StepCollect<A> {
313316

314317
pub struct OutputData {
315318
pub block: Option<DataBlock>,
316-
pub bound: Option<Scalar>,
319+
pub bound: (u32, Option<Scalar>),
317320
pub finish: bool,
318321
}
319322

@@ -323,6 +326,7 @@ impl<A: SortAlgorithm> StepSort<A> {
323326
Some(bound) => self.cur_bound = Some(bound),
324327
None => self.cur_bound = None,
325328
}
329+
self.bound_index += 1;
326330
}
327331

328332
async fn merge_current(&mut self, base: &Base) -> Result<()> {
@@ -372,7 +376,8 @@ impl<A: SortAlgorithm> StepSort<A> {
372376
let mut s = self.current.pop().unwrap();
373377
s.restore_first().await?;
374378
let block = Some(s.take_next_bounded_block());
375-
let bound = s.bound.clone();
379+
assert!(self.bound_index >= 0);
380+
let bound = (self.bound_index as _, s.bound.clone());
376381

377382
if !s.is_empty() {
378383
if s.should_include_first() {
@@ -415,15 +420,16 @@ impl<A: SortAlgorithm> StepSort<A> {
415420

416421
return Ok(OutputData {
417422
block: None,
418-
bound: None,
423+
bound: (u32::MAX, None),
419424
finish: self.subsequent.is_empty(),
420425
});
421426
};
422427

423428
let mut sorted = base.new_stream([base.new_block(data)].into(), self.cur_bound.clone());
424429
let (block, bound) = if sorted.should_include_first() {
425430
let block = Some(sorted.take_next_bounded_block());
426-
let bound = sorted.bound.clone();
431+
debug_assert!(self.bound_index >= 0);
432+
let bound = (self.bound_index as _, sorted.bound.clone());
427433
if sorted.is_empty() {
428434
return Ok(OutputData {
429435
block,
@@ -433,7 +439,7 @@ impl<A: SortAlgorithm> StepSort<A> {
433439
}
434440
(block, bound)
435441
} else {
436-
(None, None)
442+
(None, (u32::MAX, None))
437443
};
438444

439445
while let Some(data) = merger.async_next_block().await? {

0 commit comments

Comments
 (0)