Skip to content

Commit 8dde650

Browse files
authored
chore: remove old runtime filter (#13896)
1 parent 7963416 commit 8dde650

40 files changed

+9
-2580
lines changed

src/query/service/src/interpreters/interpreter_merge_into_static_filter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,6 @@ impl MergeIntoInterpreter {
293293
RelOperator::Exchange(_) => {}
294294
RelOperator::UnionAll(_) => {}
295295
RelOperator::DummyTableScan(_) => {}
296-
RelOperator::RuntimeFilterSource(_) => {}
297296
RelOperator::Window(_) => {}
298297
RelOperator::ProjectSet(_) => {}
299298
RelOperator::MaterializedCte(_) => {}

src/query/service/src/pipelines/builders/builder_join.rs

Lines changed: 4 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@ use std::sync::Arc;
1616

1717
use common_base::base::tokio::sync::Barrier;
1818
use common_exception::Result;
19-
use common_pipeline_core::processors::InputPort;
2019
use common_pipeline_core::processors::ProcessorPtr;
21-
use common_pipeline_core::Pipe;
22-
use common_pipeline_core::PipeItem;
2320
use common_pipeline_sinks::Sinker;
2421
use common_pipeline_transforms::processors::ProcessorProfileWrapper;
2522
use common_pipeline_transforms::processors::ProfileStub;
2623
use common_pipeline_transforms::processors::Transformer;
2724
use common_sql::executor::physical_plans::HashJoin;
2825
use common_sql::executor::physical_plans::MaterializedCte;
2926
use common_sql::executor::physical_plans::RangeJoin;
30-
use common_sql::executor::physical_plans::RuntimeFilterSource;
3127
use common_sql::executor::PhysicalPlan;
3228
use common_sql::ColumnBinding;
3329
use common_sql::IndexType;
@@ -42,13 +38,10 @@ use crate::pipelines::processors::transforms::HashJoinProbeState;
4238
use crate::pipelines::processors::transforms::MaterializedCteSink;
4339
use crate::pipelines::processors::transforms::MaterializedCteState;
4440
use crate::pipelines::processors::transforms::ProbeSpillState;
45-
use crate::pipelines::processors::transforms::RuntimeFilterState;
4641
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
4742
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
4843
use crate::pipelines::processors::HashJoinDesc;
4944
use crate::pipelines::processors::HashJoinState;
50-
use crate::pipelines::processors::SinkRuntimeFilterSource;
51-
use crate::pipelines::processors::TransformRuntimeFilter;
5245
use crate::pipelines::PipelineBuilder;
5346
use crate::sessions::QueryContext;
5447

@@ -202,17 +195,11 @@ impl PipelineBuilder {
202195
Ok(ProcessorPtr::create(transform))
203196
}
204197
};
205-
if hash_join_plan.contain_runtime_filter {
206-
build_res.main_pipeline.duplicate(false)?;
207-
self.join_state = Some(build_state.clone());
208-
self.index = Some(self.pipelines.len());
209-
} else {
210-
// for merge into
211-
if hash_join_plan.need_hold_hash_table {
212-
self.join_state = Some(build_state.clone())
213-
}
214-
build_res.main_pipeline.add_sink(create_sink_processor)?;
198+
// for merge into
199+
if hash_join_plan.need_hold_hash_table {
200+
self.join_state = Some(build_state.clone())
215201
}
202+
build_res.main_pipeline.add_sink(create_sink_processor)?;
216203

217204
self.pipelines.push(build_res.main_pipeline.finalize());
218205
self.pipelines.extend(build_res.sources_pipelines);
@@ -301,83 +288,6 @@ impl PipelineBuilder {
301288
Ok(())
302289
}
303290

304-
pub fn build_runtime_filter_source(
305-
&mut self,
306-
runtime_filter_source: &RuntimeFilterSource,
307-
) -> Result<()> {
308-
let state = self.build_runtime_filter_state(self.ctx.clone(), runtime_filter_source)?;
309-
self.expand_runtime_filter_source(&runtime_filter_source.right_side, state.clone())?;
310-
self.build_runtime_filter(&runtime_filter_source.left_side, state)?;
311-
Ok(())
312-
}
313-
314-
fn expand_runtime_filter_source(
315-
&mut self,
316-
_right_side: &PhysicalPlan,
317-
state: Arc<RuntimeFilterState>,
318-
) -> Result<()> {
319-
let pipeline = &mut self.pipelines[self.index.unwrap()];
320-
let output_size = pipeline.output_len();
321-
debug_assert!(output_size % 2 == 0);
322-
323-
let mut items = Vec::with_capacity(output_size);
324-
// Join
325-
// / \
326-
// / \
327-
// RFSource \
328-
// / \ \
329-
// / \ \
330-
// scan t1 scan t2
331-
for _ in 0..output_size / 2 {
332-
let input = InputPort::create();
333-
items.push(PipeItem::create(
334-
ProcessorPtr::create(TransformHashJoinBuild::try_create(
335-
input.clone(),
336-
self.join_state.as_ref().unwrap().clone(),
337-
None,
338-
)?),
339-
vec![input],
340-
vec![],
341-
));
342-
let input = InputPort::create();
343-
items.push(PipeItem::create(
344-
ProcessorPtr::create(Sinker::<SinkRuntimeFilterSource>::create(
345-
input.clone(),
346-
SinkRuntimeFilterSource::new(state.clone()),
347-
)),
348-
vec![input],
349-
vec![],
350-
));
351-
}
352-
pipeline.add_pipe(Pipe::create(output_size, 0, items));
353-
Ok(())
354-
}
355-
356-
fn build_runtime_filter(
357-
&mut self,
358-
left_side: &PhysicalPlan,
359-
state: Arc<RuntimeFilterState>,
360-
) -> Result<()> {
361-
self.build_pipeline(left_side)?;
362-
self.main_pipeline.add_transform(|input, output| {
363-
let processor = TransformRuntimeFilter::create(input, output, state.clone());
364-
Ok(ProcessorPtr::create(processor))
365-
})?;
366-
Ok(())
367-
}
368-
369-
fn build_runtime_filter_state(
370-
&self,
371-
ctx: Arc<QueryContext>,
372-
runtime_filter_source: &RuntimeFilterSource,
373-
) -> Result<Arc<RuntimeFilterState>> {
374-
Ok(Arc::new(RuntimeFilterState::new(
375-
ctx,
376-
runtime_filter_source.left_runtime_filters.clone(),
377-
runtime_filter_source.right_runtime_filters.clone(),
378-
)))
379-
}
380-
381291
pub(crate) fn build_materialized_cte(
382292
&mut self,
383293
materialized_cte: &MaterializedCte,

src/query/service/src/pipelines/pipeline_builder.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@ pub struct PipelineBuilder {
4444

4545
// probe data_fields for merge into
4646
pub probe_data_fields: Option<Vec<DataField>>,
47-
// Used in runtime filter source
4847
pub join_state: Option<Arc<HashJoinBuildState>>,
49-
// record the index of join build side pipeline in `pipelines`
50-
pub index: Option<usize>,
5148

5249
// Cte -> state, each cte has it's own state
5350
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,
@@ -72,13 +69,12 @@ impl PipelineBuilder {
7269
func_ctx,
7370
settings,
7471
pipelines: vec![],
75-
join_state: None,
7672
main_pipeline: Pipeline::with_scopes(scopes),
7773
proc_profs: prof_span_set,
7874
exchange_injector: DefaultExchangeInjector::create(),
79-
index: None,
8075
cte_state: HashMap::new(),
8176
probe_data_fields: None,
77+
join_state: None,
8278
}
8379
}
8480

@@ -134,9 +130,6 @@ impl PipelineBuilder {
134130
PhysicalPlan::Exchange(_) => Err(ErrorCode::Internal(
135131
"Invalid physical plan with PhysicalPlan::Exchange",
136132
)),
137-
PhysicalPlan::RuntimeFilterSource(runtime_filter_source) => {
138-
self.build_runtime_filter_source(runtime_filter_source)
139-
}
140133
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
141134
PhysicalPlan::MaterializedCte(materialized_cte) => {
142135
self.build_materialized_cte(materialized_cte)

src/query/service/src/pipelines/processors/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@ pub(crate) mod transforms;
1818
pub use transforms::DeduplicateRowNumber;
1919
pub use transforms::HashJoinDesc;
2020
pub use transforms::HashJoinState;
21-
pub use transforms::SinkRuntimeFilterSource;
2221
pub use transforms::TransformAddStreamColumns;
2322
pub use transforms::TransformCastSchema;
2423
pub use transforms::TransformCreateSets;
2524
pub use transforms::TransformLimit;
2625
pub use transforms::TransformResortAddOn;
2726
pub use transforms::TransformResortAddOnWithoutSourceSchema;
28-
pub use transforms::TransformRuntimeFilter;
2927
pub use transforms::TransformWindow;

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ mod processor_accumulate_row_number;
1919
mod processor_deduplicate_row_number;
2020
mod processor_extract_hash_table_by_row_number;
2121
pub(crate) mod range_join;
22-
mod runtime_filter;
2322
mod transform_add_computed_columns;
2423
mod transform_add_const_columns;
2524
mod transform_add_stream_columns;
@@ -31,7 +30,6 @@ mod transform_merge_block;
3130
mod transform_resort_addon;
3231
mod transform_resort_addon_without_source_schema;
3332
mod transform_runtime_cast_schema;
34-
mod transform_runtime_filter;
3533
mod transform_srf;
3634
mod transform_udf;
3735
mod window;
@@ -41,7 +39,6 @@ pub use processor_accumulate_row_number::AccumulateRowNumber;
4139
pub use processor_deduplicate_row_number::DeduplicateRowNumber;
4240
pub use processor_extract_hash_table_by_row_number::ExtractHashTableByRowNumber;
4341
pub use range_join::RangeJoinState;
44-
pub use runtime_filter::RuntimeFilterState;
4542
pub use transform_add_computed_columns::TransformAddComputedColumns;
4643
pub use transform_add_const_columns::TransformAddConstColumns;
4744
pub use transform_add_stream_columns::TransformAddStreamColumns;
@@ -56,8 +53,6 @@ pub use transform_merge_block::TransformMergeBlock;
5653
pub use transform_resort_addon::TransformResortAddOn;
5754
pub use transform_resort_addon_without_source_schema::TransformResortAddOnWithoutSourceSchema;
5855
pub use transform_runtime_cast_schema::TransformRuntimeCastSchema;
59-
pub use transform_runtime_filter::SinkRuntimeFilterSource;
60-
pub use transform_runtime_filter::TransformRuntimeFilter;
6156
pub use transform_srf::TransformSRF;
6257
pub use transform_udf::TransformUdf;
6358
pub use window::FrameBound;

src/query/service/src/pipelines/processors/transforms/runtime_filter/mod.rs

Lines changed: 0 additions & 19 deletions
This file was deleted.

src/query/service/src/pipelines/processors/transforms/runtime_filter/runtime_filter_connector.rs

Lines changed: 0 additions & 35 deletions
This file was deleted.

0 commit comments

Comments
 (0)