Skip to content

Commit e7483b5

Browse files
committed
remove unused codes
1 parent ca55812 commit e7483b5

File tree

28 files changed

+33
-338
lines changed

28 files changed

+33
-338
lines changed

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use databend_common_catalog::table::TableExt;
2828
use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
3030
use databend_common_expression::type_check::check_function;
31-
use databend_common_expression::types::NumberScalar;
3231
use databend_common_expression::DataBlock;
3332
use databend_common_expression::Scalar;
3433
use databend_common_functions::BUILTIN_FUNCTIONS;
@@ -53,8 +52,6 @@ use databend_common_sql::plans::plan_hilbert_sql;
5352
use databend_common_sql::plans::replace_with_constant;
5453
use databend_common_sql::plans::set_update_stream_columns;
5554
use databend_common_sql::plans::BoundColumnRef;
56-
use databend_common_sql::plans::ConstantExpr;
57-
use databend_common_sql::plans::FunctionCall;
5855
use databend_common_sql::plans::Plan;
5956
use databend_common_sql::plans::ReclusterPlan;
6057
use databend_common_sql::IdentifierNormalizer;
@@ -433,44 +430,22 @@ impl ReclusterTableInterpreter {
433430

434431
// For distributed execution, add an exchange operator to distribute work
435432
if is_distributed {
436-
let nodes_num = cluster.nodes.len() as u64;
437-
let scalar_expr = ScalarExpr::FunctionCall(FunctionCall {
438-
span: None,
439-
func_name: "div".to_string(),
440-
params: vec![],
441-
arguments: vec![
442-
ScalarExpr::FunctionCall(FunctionCall {
443-
span: None,
444-
func_name: "multiply".to_string(),
445-
params: vec![],
446-
arguments: vec![
447-
ScalarExpr::BoundColumnRef(BoundColumnRef {
448-
span: None,
449-
column: bind_context.columns.last().unwrap().clone(),
450-
}),
451-
ScalarExpr::ConstantExpr(ConstantExpr {
452-
span: None,
453-
value: Scalar::Number(NumberScalar::UInt64(nodes_num)),
454-
}),
455-
],
456-
}),
457-
ScalarExpr::ConstantExpr(ConstantExpr {
458-
span: None,
459-
value: Scalar::Number(NumberScalar::UInt64(total_partitions as u64)),
460-
}),
461-
],
462-
});
463-
464433
// Create an expression for the partition column,
465434
// i.e.`range_partition_id(hilbert_range_index({hilbert_keys_str}), [...]) AS _predicate`
466-
let expr = scalar_expr_to_remote_expr(&scalar_expr, plan.output_schema()?.as_ref())?;
435+
let expr = scalar_expr_to_remote_expr(
436+
&ScalarExpr::BoundColumnRef(BoundColumnRef {
437+
span: None,
438+
column: bind_context.columns.last().unwrap().clone(),
439+
}),
440+
plan.output_schema()?.as_ref(),
441+
)?;
467442

468443
// Add exchange operator for data distribution,
469444
// shuffling data based on the hash of range partition IDs derived from the Hilbert index.
470445
plan = Box::new(PhysicalPlan::Exchange(Exchange {
471446
plan_id: 0,
472447
input: plan,
473-
kind: FragmentKind::Modulo,
448+
kind: FragmentKind::Normal,
474449
keys: vec![expr],
475450
allow_adjust_parallelism: true,
476451
ignore_exchange: false,
@@ -487,8 +462,7 @@ impl ReclusterTableInterpreter {
487462
plan_id: 0,
488463
input: plan,
489464
table_info: table_info.clone(),
490-
range_start: 0,
491-
range_width: total_partitions,
465+
num_partitions: total_partitions,
492466
table_meta_timestamps,
493467
bytes_per_block,
494468
rows_per_block,

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl PipelineBuilder {
5151

5252
self.main_pipeline.exchange(
5353
num_processors,
54-
ReclusterPartitionExchange::create(partition.range_start, partition.range_width),
54+
ReclusterPartitionExchange::create(partition.num_partitions),
5555
);
5656

5757
let settings = self.ctx.get_settings();
@@ -85,7 +85,7 @@ impl PipelineBuilder {
8585
&settings,
8686
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
8787
num_processors,
88-
partition.range_width,
88+
partition.num_partitions,
8989
window_spill_settings.clone(),
9090
disk_spill.clone(),
9191
ReclusterPartitionStrategy::new(properties.clone()),
@@ -112,7 +112,7 @@ impl PipelineBuilder {
112112
&settings,
113113
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
114114
num_processors,
115-
partition.range_width,
115+
partition.num_partitions,
116116
window_spill_settings.clone(),
117117
disk_spill.clone(),
118118
CompactPartitionStrategy::new(

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ impl PipelineBuilder {
195195

196196
self.main_pipeline.exchange(
197197
num_processors,
198-
ReclusterPartitionExchange::create(0, partitions),
198+
ReclusterPartitionExchange::create(partitions),
199199
);
200200
let processor_id = AtomicUsize::new(0);
201201

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ impl ExchangeInjector for AggregateInjector {
233233
match exchange {
234234
DataExchange::Merge(_) => unreachable!(),
235235
DataExchange::Broadcast(_) => unreachable!(),
236-
DataExchange::Modulo(_) => unreachable!(),
237236
DataExchange::ShuffleDataExchange(exchange) => {
238237
Ok(Arc::new(Box::new(HashTableHashScatter {
239238
buckets: exchange.destination_ids.len(),

src/query/service/src/pipelines/processors/transforms/recluster/recluster_partition_exchange.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,14 @@ use databend_common_pipeline_core::processors::Exchange;
2222
use crate::pipelines::processors::transforms::WindowPartitionMeta;
2323

2424
pub struct ReclusterPartitionExchange {
25-
start: u64,
26-
width: usize,
25+
num_partitions: usize,
2726
start_time: Instant,
2827
}
2928

3029
impl ReclusterPartitionExchange {
31-
pub fn create(start: u64, width: usize) -> Arc<ReclusterPartitionExchange> {
30+
pub fn create(num_partitions: usize) -> Arc<ReclusterPartitionExchange> {
3231
Arc::new(ReclusterPartitionExchange {
33-
start,
34-
width,
32+
num_partitions,
3533
start_time: Instant::now(),
3634
})
3735
}
@@ -50,18 +48,21 @@ impl Exchange for ReclusterPartitionExchange {
5048
// Scatter the data block to different partitions.
5149
let indices = range_ids
5250
.iter()
53-
.map(|&id| (id - self.start) as u16)
51+
.map(|&id| (id % self.num_partitions as u64) as u16)
5452
.collect::<Vec<_>>();
5553
data_block.pop_columns(1);
56-
57-
let scatter_indices = DataBlock::divide_indices_by_scatter_size(&indices, self.width);
54+
let scatter_indices =
55+
DataBlock::divide_indices_by_scatter_size(&indices, self.num_partitions);
5856
// Partition the data blocks to different processors.
5957
let mut output_data_blocks = vec![vec![]; n];
60-
for (partition_id, indices) in scatter_indices.into_iter().take(self.width).enumerate() {
58+
for (partition_id, indices) in scatter_indices
59+
.into_iter()
60+
.take(self.num_partitions)
61+
.enumerate()
62+
{
6163
if !indices.is_empty() {
62-
let target = (partition_id * n) / self.width;
6364
let block = data_block.take_with_optimize_size(&indices)?;
64-
output_data_blocks[target].push((partition_id, block));
65+
output_data_blocks[partition_id % n].push((partition_id, block));
6566
}
6667
}
6768
log::info!("Recluster range exchange: {:?}", self.start_time.elapsed());

src/query/service/src/pipelines/processors/transforms/recluster/recluster_partition_strategy.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,6 @@ impl ReclusterPartitionStrategy {
3737
impl PartitionProcessStrategy for ReclusterPartitionStrategy {
3838
const NAME: &'static str = "Recluster";
3939

40-
fn calc_partitions(
41-
&self,
42-
processor_id: usize,
43-
num_processors: usize,
44-
num_partitions: usize,
45-
) -> Vec<usize> {
46-
(0..num_partitions)
47-
.filter(|&partition| (partition * num_processors) / num_partitions == processor_id)
48-
.collect()
49-
}
50-
5140
/// Stream write each block, and flush it conditionally based on builder status
5241
/// and input size estimation.
5342
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
@@ -127,17 +116,6 @@ impl CompactPartitionStrategy {
127116
impl PartitionProcessStrategy for CompactPartitionStrategy {
128117
const NAME: &'static str = "Compact";
129118

130-
fn calc_partitions(
131-
&self,
132-
processor_id: usize,
133-
num_processors: usize,
134-
num_partitions: usize,
135-
) -> Vec<usize> {
136-
(0..num_partitions)
137-
.filter(|&partition| (partition * num_processors) / num_partitions == processor_id)
138-
.collect()
139-
}
140-
141119
/// Collects blocks into batches and merges them via `concat` when size threshold is reached.
142120
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
143121
let blocks_num = data_blocks.len();

src/query/service/src/pipelines/processors/transforms/window/partition/partition_process_strategy.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,6 @@ use databend_common_settings::Settings;
2222
pub trait PartitionProcessStrategy: Send + Sync + 'static {
2323
const NAME: &'static str;
2424

25-
/// Partition assignment: map partition index to processor via proportional mapping.
26-
fn calc_partitions(
27-
&self,
28-
processor_id: usize,
29-
num_processors: usize,
30-
num_partitions: usize,
31-
) -> Vec<usize>;
32-
3325
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>>;
3426
}
3527

@@ -66,17 +58,6 @@ impl WindowPartitionStrategy {
6658
impl PartitionProcessStrategy for WindowPartitionStrategy {
6759
const NAME: &'static str = "Window";
6860

69-
fn calc_partitions(
70-
&self,
71-
processor_id: usize,
72-
num_processors: usize,
73-
num_partitions: usize,
74-
) -> Vec<usize> {
75-
(0..num_partitions)
76-
.filter(|&partition| partition % num_processors == processor_id)
77-
.collect()
78-
}
79-
8061
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
8162
let data_blocks = data_blocks
8263
.into_iter()

src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ impl<S: PartitionProcessStrategy> TransformPartitionCollect<S> {
9292
strategy: S,
9393
) -> Result<Self> {
9494
// Calculate the partition ids collected by the processor.
95-
let partitions = strategy.calc_partitions(processor_id, num_processors, num_partitions);
95+
let partitions: Vec<usize> = (0..num_partitions)
96+
.filter(|&partition| partition % num_processors == processor_id)
97+
.collect();
9698

9799
// Map each partition id to new partition id.
98100
let mut partition_id = vec![0; num_partitions];

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_catalog::table_context::TableContext;
18-
use databend_common_exception::ErrorCode;
1918
use databend_common_exception::Result;
2019
use databend_common_meta_types::NodeInfo;
2120
use databend_common_sql::executor::physical_plans::CompactSource;
@@ -27,7 +26,6 @@ use databend_common_sql::executor::physical_plans::ExchangeSink;
2726
use databend_common_sql::executor::physical_plans::ExchangeSource;
2827
use databend_common_sql::executor::physical_plans::FragmentKind;
2928
use databend_common_sql::executor::physical_plans::HashJoin;
30-
use databend_common_sql::executor::physical_plans::HilbertPartition;
3129
use databend_common_sql::executor::physical_plans::MutationSource;
3230
use databend_common_sql::executor::physical_plans::Recluster;
3331
use databend_common_sql::executor::physical_plans::ReplaceInto;
@@ -41,7 +39,6 @@ use crate::schedulers::PlanFragment;
4139
use crate::servers::flight::v1::exchange::BroadcastExchange;
4240
use crate::servers::flight::v1::exchange::DataExchange;
4341
use crate::servers::flight::v1::exchange::MergeExchange;
44-
use crate::servers::flight::v1::exchange::ModuloExchange;
4542
use crate::servers::flight::v1::exchange::ShuffleDataExchange;
4643
use crate::sessions::QueryContext;
4744
use crate::sql::executor::PhysicalPlan;
@@ -69,7 +66,6 @@ enum State {
6966
Compact,
7067
Recluster,
7168
Other,
72-
HilbertRecluster,
7369
}
7470

7571
impl Fragmenter {
@@ -118,15 +114,6 @@ impl Fragmenter {
118114
FragmentKind::Expansive => {
119115
Ok(Some(BroadcastExchange::create(Self::get_executors(ctx))))
120116
}
121-
FragmentKind::Modulo => {
122-
if plan.keys.len() != 1 {
123-
return Err(ErrorCode::Internal("Modulo exchange require one key"));
124-
}
125-
Ok(Some(ModuloExchange::create(
126-
Self::get_executors(ctx),
127-
plan.keys[0].clone(),
128-
)))
129-
}
130117
_ => Ok(None),
131118
},
132119
_ => Ok(None),
@@ -213,15 +200,6 @@ impl PhysicalPlanReplacer for Fragmenter {
213200
Ok(PhysicalPlan::Recluster(Box::new(plan.clone())))
214201
}
215202

216-
fn replace_hilbert_serialize(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
217-
let input = self.replace(&plan.input)?;
218-
self.state = State::HilbertRecluster;
219-
Ok(PhysicalPlan::HilbertPartition(Box::new(HilbertPartition {
220-
input: Box::new(input),
221-
..plan.clone()
222-
})))
223-
}
224-
225203
fn replace_compact_source(&mut self, plan: &CompactSource) -> Result<PhysicalPlan> {
226204
self.state = State::Compact;
227205
Ok(PhysicalPlan::CompactSource(Box::new(plan.clone())))
@@ -323,7 +301,6 @@ impl PhysicalPlanReplacer for Fragmenter {
323301
State::ReplaceInto => FragmentType::ReplaceInto,
324302
State::Compact => FragmentType::Compact,
325303
State::Recluster => FragmentType::Recluster,
326-
State::HilbertRecluster => FragmentType::HilbertRecluster,
327304
};
328305
self.state = State::Other;
329306
let exchange = Self::get_exchange(self.ctx.clone(), &plan)?;

0 commit comments

Comments
 (0)