Skip to content

feat(query): range shuffle sort for standalone mode #17853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 62 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
4da8c95
reservoir_sampling
forsaken628 Nov 8, 2024
667bef3
Simpler
forsaken628 Nov 8, 2024
48eafc8
TransformSortSimple
forsaken628 Nov 12, 2024
6e5889a
rename
forsaken628 Apr 15, 2025
e23904e
fix
forsaken628 Apr 18, 2025
0bf671f
trait Spill
forsaken628 Apr 21, 2025
4094992
execute
forsaken628 Apr 21, 2025
ea6c9d9
move
forsaken628 Apr 21, 2025
b891e6f
update
forsaken628 Apr 22, 2025
a58f3d5
bounds
forsaken628 Apr 22, 2025
fc04da7
wait
forsaken628 Apr 22, 2025
40f7392
remove
forsaken628 Apr 23, 2025
7715bb4
scalar
forsaken628 Apr 23, 2025
9089940
exchange
forsaken628 Apr 23, 2025
129b789
update
forsaken628 Apr 23, 2025
470e973
test
forsaken628 Apr 23, 2025
c48f563
x
forsaken628 Apr 24, 2025
5aecc29
route
forsaken628 Apr 24, 2025
63f743d
builder
forsaken628 Apr 24, 2025
1e0bf24
update
forsaken628 Apr 24, 2025
8f1ba99
build
forsaken628 Apr 24, 2025
3575be8
rename
forsaken628 Apr 24, 2025
c150c34
fix
forsaken628 Apr 25, 2025
2777a28
fix
forsaken628 Apr 25, 2025
ffc8881
fix
forsaken628 Apr 25, 2025
bd484ce
fix
forsaken628 Apr 25, 2025
a879854
fix
forsaken628 Apr 25, 2025
1c98224
update
forsaken628 Apr 25, 2025
885fe7b
fix
forsaken628 Apr 26, 2025
f7a0373
fix
forsaken628 Apr 27, 2025
eb83630
fix
forsaken628 Apr 27, 2025
88b8290
Merge remote-tracking branch 'up/main' into xxx
forsaken628 Jul 7, 2025
5330273
x
forsaken628 Jun 30, 2025
0eef223
logical plan
forsaken628 Jul 1, 2025
d80e025
x
forsaken628 Jul 1, 2025
cfa2ede
x
forsaken628 Jul 1, 2025
e0b17b5
x
forsaken628 Jul 2, 2025
6fb293d
x
forsaken628 Jul 3, 2025
182cdde
x
forsaken628 Jul 4, 2025
79194ca
x
forsaken628 Jul 4, 2025
8f9e75d
x
forsaken628 Jul 4, 2025
d6ad39a
x
forsaken628 Jul 4, 2025
9a036b9
x
forsaken628 Jul 7, 2025
14f7281
x
forsaken628 Jul 7, 2025
c567982
x
forsaken628 Jul 7, 2025
881e07f
format
forsaken628 Jul 7, 2025
d791ccb
x
forsaken628 Jul 7, 2025
932600b
x
forsaken628 Jul 7, 2025
56ae36e
allow_adjust_parallelism
forsaken628 Jul 8, 2025
c672b69
InputBoundStream
forsaken628 Jul 8, 2025
c5db85b
BoundedMultiSortMergeProcessor
forsaken628 Jul 8, 2025
9cbe207
build_bounded_merge_sort
forsaken628 Jul 9, 2025
0944c51
bound_index
forsaken628 Jul 9, 2025
43a11e9
SortBoundEdge
forsaken628 Jul 9, 2025
a4fc585
route
forsaken628 Jul 10, 2025
75ac006
clean up
forsaken628 Jul 10, 2025
fdac251
enable
forsaken628 Jul 10, 2025
89dd885
fix
forsaken628 Jul 10, 2025
d8c64bb
fix
forsaken628 Jul 11, 2025
90636a5
fix
forsaken628 Jul 11, 2025
87ffa7d
test
forsaken628 Jul 11, 2025
3670ee1
fix
forsaken628 Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/common/base/src/base/watch_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ impl WatchNotify {
let _ = rx.changed().await;
}

pub fn has_notified(&self) -> bool {
match self.rx.has_changed() {
Ok(b) => b,
Err(_) => {
// The sender has never dropped before
unreachable!()
}
}
}

pub fn notify_waiters(&self) {
let _ = self.tx.send_replace(true);
}
Expand All @@ -61,11 +71,18 @@ mod tests {
#[tokio::test]
async fn test_notify_waiters_ahead() {
let notify = WatchNotify::new();
assert!(!notify.has_notified());
let notified1 = notify.notified();
assert!(!notify.has_notified());

// notify_waiters ahead of notified being instantiated and awaited
notify.notify_waiters();

assert!(notify.has_notified());
// this should not await indefinitely
let notified = notify.notified();
notified.await;
let notified2 = notify.notified();
notified2.await;

notified1.await;
assert!(notify.has_notified());
}
}
39 changes: 34 additions & 5 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,34 @@ pub trait BlockMetaInfoDowncast: Sized + BlockMetaInfo {
let boxed = boxed.as_ref() as &dyn Any;
boxed.downcast_ref()
}

fn downcast_from_err(boxed: BlockMetaInfoPtr) -> std::result::Result<Self, BlockMetaInfoPtr> {
if (boxed.as_ref() as &dyn Any).is::<Self>() {
Ok(*(boxed as Box<dyn Any>).downcast().unwrap())
} else {
Err(boxed)
}
}

fn downcast_mut(boxed: &mut BlockMetaInfoPtr) -> Option<&mut Self> {
let boxed = boxed.as_mut() as &mut dyn Any;
boxed.downcast_mut()
}
}

impl<T: BlockMetaInfo> BlockMetaInfoDowncast for T {}

#[typetag::serde(name = "empty")]
impl BlockMetaInfo for () {
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
<() as BlockMetaInfoDowncast>::downcast_ref_from(info).is_some()
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
Box::new(())
}
}

impl DataBlock {
#[inline]
pub fn new(entries: Vec<BlockEntry>, num_rows: usize) -> Self {
Expand Down Expand Up @@ -431,11 +455,6 @@ impl DataBlock {
DataBlock::new_with_meta(vec![], 0, Some(meta))
}

#[inline]
pub fn take_meta(&mut self) -> Option<BlockMetaInfoPtr> {
self.meta.take()
}

#[inline]
pub fn columns(&self) -> &[BlockEntry] {
&self.entries
Expand Down Expand Up @@ -702,6 +721,16 @@ impl DataBlock {
})
}

#[inline]
pub fn take_meta(&mut self) -> Option<BlockMetaInfoPtr> {
self.meta.take()
}

#[inline]
pub fn mut_meta(&mut self) -> Option<&mut BlockMetaInfoPtr> {
self.meta.as_mut()
}

#[inline]
pub fn replace_meta(&mut self, meta: BlockMetaInfoPtr) {
self.meta.replace(meta);
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<T: AccessType> AccessType for ArrayType<T> {
scalar.clone()
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Array(array) => T::try_downcast_column(array),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl AccessType for BooleanType {
*scalar
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Boolean(scalar) => Some(*scalar),
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/types/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ impl<K: AccessType, V: AccessType> AccessType for MapType<K, V> {
MapInternal::<K, V>::to_scalar_ref(scalar)
}

fn try_downcast_scalar<'a>(scalar: &'a ScalarRef) -> Option<Self::ScalarRef<'a>> {
fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Option<Self::ScalarRef<'a>> {
match scalar {
ScalarRef::Map(array) => KvPair::<K, V>::try_downcast_column(array),
_ => None,
Expand Down
25 changes: 20 additions & 5 deletions src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,15 +458,29 @@ impl Pipeline {
self.sinks = new_sinks;
}

pub fn exchange<T: Exchange>(&mut self, n: usize, exchange: Arc<T>) {
pub fn exchange<T: Exchange>(&mut self, n: usize, exchange: Arc<T>) -> Result<()> {
self.exchange_with_merge(n, exchange.clone(), |inputs, output| {
Ok(MergePartitionProcessor::create(
inputs,
output,
exchange.clone(),
))
})
}

pub fn exchange_with_merge<T, F>(&mut self, n: usize, exchange: Arc<T>, f: F) -> Result<()>
where
T: Exchange,
F: Fn(Vec<Arc<InputPort>>, Arc<OutputPort>) -> Result<ProcessorPtr>,
{
if self.sinks.is_empty() {
return;
return Ok(());
}

let input_len = self.sinks.len();
let mut items = Vec::with_capacity(input_len);

for _index in 0..input_len {
for _ in 0..input_len {
let input = InputPort::create();
let outputs: Vec<_> = (0..n).map(|_| OutputPort::create()).collect();
items.push(PipeItem::create(
Expand All @@ -491,14 +505,15 @@ impl Pipeline {
let output = OutputPort::create();
let inputs: Vec<_> = (0..input_len).map(|_| InputPort::create()).collect();
items.push(PipeItem::create(
MergePartitionProcessor::create(inputs.clone(), output.clone(), exchange.clone()),
f(inputs.clone(), output.clone())?,
inputs,
vec![output],
));
}

// merge partition
self.add_pipe(Pipe::create(input_len * n, n, items))
self.add_pipe(Pipe::create(input_len * n, n, items));
Ok(())
}

#[track_caller]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ mod transform_accumulating;
mod transform_accumulating_async;
mod transform_async;
mod transform_blocking;
mod transform_blocking_async;
mod transform_compact_block;
mod transform_compact_builder;
mod transform_compact_no_split_builder;
mod transform_dummy;
mod transform_hook;
mod transform_k_way_merge_sort;
mod transform_multi_sort_merge;
mod transform_pipeline_helper;
Expand All @@ -36,12 +38,14 @@ pub use transform_accumulating::*;
pub use transform_accumulating_async::*;
pub use transform_async::*;
pub use transform_blocking::*;
pub use transform_blocking_async::*;
pub use transform_compact_block::*;
pub use transform_compact_builder::*;
pub use transform_compact_no_split_builder::*;
pub use transform_dummy::*;
pub use transform_hook::*;
pub use transform_k_way_merge_sort::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_multi_sort_merge::*;
pub use transform_pipeline_helper::TransformPipelineHelper;
pub use transform_retry_async::*;
pub use transform_sort_merge::sort_merge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ impl Rows for BinaryColumn {
fn slice(&self, range: Range<usize>) -> Self {
self.slice(range)
}

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> {
match s {
Scalar::Binary(s) => s,
_ => unreachable!(),
}
}

fn owned_item(item: Self::Item<'_>) -> Scalar {
Scalar::Binary(Vec::from(item))
}
}

impl RowConverter<BinaryColumn> for CommonRowConverter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod simple;
mod utils;

use std::fmt::Debug;
use std::ops::Range;

pub use common::*;
use databend_common_exception::ErrorCode;
Expand All @@ -25,7 +26,9 @@ use databend_common_expression::types::ArgType;
use databend_common_expression::types::DataType;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Scalar;
use databend_common_expression::SortColumnDescription;
pub use simple::*;
pub use utils::*;
Expand All @@ -39,6 +42,18 @@ where Self: Sized + Debug
output_schema: DataSchemaRef,
) -> Result<Self>;
fn convert(&self, columns: &[BlockEntry], num_rows: usize) -> Result<T>;

fn convert_data_block(
&self,
sort_desc: &[SortColumnDescription],
data_block: &DataBlock,
) -> Result<T> {
let order_by_cols = sort_desc
.iter()
.map(|desc| data_block.get_by_offset(desc.offset).clone())
.collect::<Vec<_>>();
self.convert(&order_by_cols, data_block.num_rows())
}
}

/// Rows can be compared.
Expand Down Expand Up @@ -82,5 +97,9 @@ where Self: Sized + Clone + Debug + Send
self.row(self.len() - 1)
}

fn slice(&self, range: std::ops::Range<usize>) -> Self;
fn slice(&self, range: Range<usize>) -> Self;

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a>;

fn owned_item(item: Self::Item<'_>) -> Scalar;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_expression::types::ValueType;
use databend_common_expression::BlockEntry;
use databend_common_expression::Column;
use databend_common_expression::DataSchemaRef;
use databend_common_expression::Scalar;
use databend_common_expression::SortColumnDescription;

use super::RowConverter;
Expand Down Expand Up @@ -68,6 +69,15 @@ where
inner: T::slice_column(&self.inner, range),
}
}

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> {
let s = &s.as_ref();
T::try_downcast_scalar(s).unwrap()
}

fn owned_item(item: Self::Item<'_>) -> Scalar {
T::upcast_scalar(T::to_owned_scalar(item))
}
}

/// Rows structure for single simple types. (numbers, date, timestamp)
Expand Down Expand Up @@ -111,6 +121,15 @@ where
inner: T::slice_column(&self.inner, range),
}
}

fn scalar_as_item<'a>(s: &'a Scalar) -> Self::Item<'a> {
let s = &s.as_ref();
Reverse(T::try_downcast_scalar(s).unwrap())
}

fn owned_item(item: Self::Item<'_>) -> Scalar {
T::upcast_scalar(T::to_owned_scalar(item.0))
}
}

/// If there is only one sort field and its type is a primitive type,
Expand Down
Loading
Loading