Skip to content

Commit a116b11

Browse files
committed
fix exchange
1 parent 6643b25 commit a116b11

File tree

23 files changed

+208
-7
lines changed

23 files changed

+208
-7
lines changed

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ impl ReclusterTableInterpreter {
467467
plan = Box::new(PhysicalPlan::Exchange(Exchange {
468468
plan_id: 0,
469469
input: plan,
470-
kind: FragmentKind::Normal,
470+
kind: FragmentKind::Modulo,
471471
keys: vec![expr],
472472
allow_adjust_parallelism: true,
473473
ignore_exchange: false,

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ impl PipelineBuilder {
9999
self.ctx.clone(),
100100
input,
101101
output,
102+
MutationKind::Recluster,
102103
table,
103104
partition.table_meta_timestamps,
104105
false,

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ impl ExchangeInjector for AggregateInjector {
233233
match exchange {
234234
DataExchange::Merge(_) => unreachable!(),
235235
DataExchange::Broadcast(_) => unreachable!(),
236+
DataExchange::Modulo(_) => unreachable!(),
236237
DataExchange::ShuffleDataExchange(exchange) => {
237238
Ok(Arc::new(Box::new(HashTableHashScatter {
238239
buckets: exchange.destination_ids.len(),

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_catalog::table_context::TableContext;
18+
use databend_common_exception::ErrorCode;
1819
use databend_common_exception::Result;
1920
use databend_common_meta_types::NodeInfo;
2021
use databend_common_sql::executor::physical_plans::CompactSource;
@@ -40,6 +41,7 @@ use crate::schedulers::PlanFragment;
4041
use crate::servers::flight::v1::exchange::BroadcastExchange;
4142
use crate::servers::flight::v1::exchange::DataExchange;
4243
use crate::servers::flight::v1::exchange::MergeExchange;
44+
use crate::servers::flight::v1::exchange::ModuloExchange;
4345
use crate::servers::flight::v1::exchange::ShuffleDataExchange;
4446
use crate::sessions::QueryContext;
4547
use crate::sql::executor::PhysicalPlan;
@@ -116,6 +118,15 @@ impl Fragmenter {
116118
FragmentKind::Expansive => {
117119
Ok(Some(BroadcastExchange::create(Self::get_executors(ctx))))
118120
}
121+
FragmentKind::Modulo => {
122+
if plan.keys.len() != 1 {
123+
return Err(ErrorCode::Internal("Modulo exchange require one key"));
124+
}
125+
Ok(Some(ModuloExchange::create(
126+
Self::get_executors(ctx),
127+
plan.keys[0].clone(),
128+
)))
129+
}
119130
_ => Ok(None),
120131
},
121132
_ => Ok(None),
@@ -203,8 +214,12 @@ impl PhysicalPlanReplacer for Fragmenter {
203214
}
204215

205216
fn replace_hilbert_serialize(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
217+
let input = self.replace(&plan.input)?;
206218
self.state = State::HilbertRecluster;
207-
Ok(PhysicalPlan::HilbertPartition(Box::new(plan.clone())))
219+
Ok(PhysicalPlan::HilbertPartition(Box::new(HilbertPartition {
220+
input: Box::new(input),
221+
..plan.clone()
222+
})))
208223
}
209224

210225
fn replace_compact_source(&mut self, plan: &CompactSource) -> Result<PhysicalPlan> {

src/query/service/src/schedulers/fragments/plan_fragment.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,9 @@ struct ReplaceHilbert {
602602

603603
impl PhysicalPlanReplacer for ReplaceHilbert {
604604
fn replace_hilbert_serialize(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
605+
let input = self.replace(&plan.input)?;
605606
Ok(PhysicalPlan::HilbertPartition(Box::new(HilbertPartition {
607+
input: Box::new(input),
606608
range_width: self.range_width,
607609
range_start: self.range_start,
608610
..plan.clone()

src/query/service/src/schedulers/fragments/query_fragment_actions_display.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl Display for QueryFragmentActionsWrap<'_> {
7272
DataExchange::Merge(_) => writeln!(f, " DataExchange: Merge")?,
7373
DataExchange::Broadcast(_) => writeln!(f, " DataExchange: Broadcast")?,
7474
DataExchange::ShuffleDataExchange(_) => writeln!(f, " DataExchange: Shuffle")?,
75+
DataExchange::Modulo(_) => writeln!(f, " DataExchange: Modulo")?,
7576
}
7677
}
7778

src/query/service/src/servers/flight/v1/exchange/data_exchange.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub enum DataExchange {
1919
Merge(MergeExchange),
2020
Broadcast(BroadcastExchange),
2121
ShuffleDataExchange(ShuffleDataExchange),
22+
Modulo(ModuloExchange),
2223
}
2324

2425
impl DataExchange {
@@ -27,6 +28,7 @@ impl DataExchange {
2728
DataExchange::Merge(exchange) => vec![exchange.destination_id.clone()],
2829
DataExchange::Broadcast(exchange) => exchange.destination_ids.clone(),
2930
DataExchange::ShuffleDataExchange(exchange) => exchange.destination_ids.clone(),
31+
DataExchange::Modulo(exchange) => exchange.destination_ids.clone(),
3032
}
3133
}
3234
}
@@ -77,3 +79,18 @@ impl BroadcastExchange {
7779
DataExchange::Broadcast(BroadcastExchange { destination_ids })
7880
}
7981
}
82+
83+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
84+
pub struct ModuloExchange {
85+
pub destination_ids: Vec<String>,
86+
pub shuffle_key: RemoteExpr,
87+
}
88+
89+
impl ModuloExchange {
90+
pub fn create(destination_ids: Vec<String>, shuffle_key: RemoteExpr) -> DataExchange {
91+
DataExchange::Modulo(ModuloExchange {
92+
destination_ids,
93+
shuffle_key,
94+
})
95+
}
96+
}

src/query/service/src/servers/flight/v1/exchange/exchange_injector.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::servers::flight::v1::exchange::ShuffleExchangeParams;
2929
use crate::servers::flight::v1::scatter::BroadcastFlightScatter;
3030
use crate::servers::flight::v1::scatter::FlightScatter;
3131
use crate::servers::flight::v1::scatter::HashFlightScatter;
32+
use crate::servers::flight::v1::scatter::ModFlightScatter;
3233
use crate::sessions::QueryContext;
3334

3435
pub trait ExchangeInjector: Send + Sync + 'static {
@@ -100,6 +101,11 @@ impl ExchangeInjector for DefaultExchangeInjector {
100101
local_pos,
101102
)?
102103
}
104+
DataExchange::Modulo(exchange) => ModFlightScatter::try_create(
105+
ctx.get_function_context()?,
106+
&exchange.shuffle_key,
107+
exchange.destination_ids.len(),
108+
)?,
103109
}))
104110
}
105111

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ impl DataExchangeManager {
303303
None,
304304
Some(config.query.to_rpc_client_tls_config()),
305305
)
306-
.await?,
306+
.await?,
307307
))),
308308
false => Ok(FlightClient::new(FlightServiceClient::new(
309309
ConnectionFactory::create_rpc_channel(address.to_owned(), None, None).await?,
@@ -1011,6 +1011,19 @@ impl FragmentCoordinator {
10111011
.flight_scatter(&info.query_ctx, data_exchange)?,
10121012
}),
10131013
)),
1014+
DataExchange::Modulo(exchange) => {
1015+
Ok(Some(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
1016+
exchange_injector: exchange_injector.clone(),
1017+
schema: self.physical_plan.output_schema()?,
1018+
fragment_id: self.fragment_id,
1019+
query_id: info.query_id.to_string(),
1020+
executor_id: info.current_executor.to_string(),
1021+
destination_ids: exchange.destination_ids.to_owned(),
1022+
shuffle_scatter: exchange_injector
1023+
.flight_scatter(&info.query_ctx, data_exchange)?,
1024+
})
1025+
))
1026+
}
10141027
}
10151028
}
10161029

src/query/service/src/servers/flight/v1/exchange/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub mod serde;
3232
pub use data_exchange::BroadcastExchange;
3333
pub use data_exchange::DataExchange;
3434
pub use data_exchange::MergeExchange;
35+
pub use data_exchange::ModuloExchange;
3536
pub use data_exchange::ShuffleDataExchange;
3637
pub use exchange_injector::DefaultExchangeInjector;
3738
pub use exchange_injector::ExchangeInjector;

0 commit comments

Comments
 (0)