Skip to content

Commit a18d1f0

Browse files
authored
refactor(query): use graphs to build pipelines (#18184)
* refactor(query): use graphs to build pipelines * refactor(query): use graphs to build pipelines * refactor(query): use graphs to build pipelines * refactor(query): use graphs to build pipelines * refactor(query): use graphs to build pipelines * refactor(query): use graphs to build pipelines
1 parent 28aa33b commit a18d1f0

File tree

18 files changed

+834
-781
lines changed

18 files changed

+834
-781
lines changed

src/query/pipeline/core/src/pipeline.rs

Lines changed: 178 additions & 245 deletions
Large diffs are not rendered by default.

src/query/pipeline/core/src/pipeline_display.rs

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
use std::fmt::Display;
1616
use std::fmt::Formatter;
1717

18-
use crate::pipe::Pipe;
18+
use petgraph::dot::Dot;
19+
1920
use crate::Pipeline;
2021

2122
impl Pipeline {
@@ -28,35 +29,17 @@ struct PipelineIndentDisplayWrapper<'a> {
2829
pipeline: &'a Pipeline,
2930
}
3031

31-
impl PipelineIndentDisplayWrapper<'_> {
32-
fn pipe_name(pipe: &Pipe) -> String {
33-
unsafe { pipe.items[0].processor.name() }
34-
}
35-
}
36-
3732
impl Display for PipelineIndentDisplayWrapper<'_> {
3833
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
39-
let pipes = &self.pipeline.pipes;
40-
for (index, pipe) in pipes.iter().rev().enumerate() {
41-
if index > 0 {
42-
writeln!(f)?;
43-
}
44-
45-
for _ in 0..index {
46-
write!(f, " ")?;
47-
}
48-
49-
let pipe_name = Self::pipe_name(pipe);
50-
if pipe.input_length == pipe.output_length
51-
|| pipe.input_length == 0
52-
|| pipe.output_length == 0
53-
{
54-
write!(f, "{} × {}", Self::pipe_name(pipe), pipe.items.len(),)?;
55-
} else {
56-
write!(f, "Merge to {pipe_name} × {}", pipe.output_length,)?;
57-
}
58-
}
59-
60-
Ok(())
34+
write!(
35+
f,
36+
"{:?}",
37+
Dot::with_attr_getters(
38+
&self.pipeline.graph,
39+
&[],
40+
&|_, _| String::new(),
41+
&|_, (_, _)| String::new(),
42+
)
43+
)
6144
}
6245
}

src/query/pipeline/core/src/processors/profile.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cell::RefCell;
1516
use std::collections::btree_map::Entry;
1617
use std::collections::BTreeMap;
17-
use std::sync::atomic::AtomicUsize;
1818
use std::sync::atomic::Ordering;
1919
use std::sync::Arc;
2020

21-
use databend_common_base::runtime::drop_guard;
2221
use databend_common_base::runtime::error_info::NodeErrorType;
2322
use databend_common_base::runtime::metrics::MetricSample;
2423
use databend_common_base::runtime::metrics::ScopedRegistry;
@@ -29,25 +28,18 @@ use databend_common_exception::ErrorCode;
2928
use databend_common_exception::StackTrace;
3029

3130
pub struct PlanScopeGuard {
32-
idx: usize,
33-
scope_size: Arc<AtomicUsize>,
31+
save: Option<Arc<PlanScope>>,
3432
}
3533

3634
impl PlanScopeGuard {
37-
pub fn create(scope_size: Arc<AtomicUsize>, idx: usize) -> PlanScopeGuard {
38-
PlanScopeGuard { idx, scope_size }
35+
pub fn new(save: Option<Arc<PlanScope>>) -> PlanScopeGuard {
36+
PlanScopeGuard { save }
3937
}
4038
}
4139

4240
impl Drop for PlanScopeGuard {
4341
fn drop(&mut self) {
44-
drop_guard(move || {
45-
if self.scope_size.fetch_sub(1, Ordering::SeqCst) != self.idx + 1
46-
&& !std::thread::panicking()
47-
{
48-
panic!("Broken pipeline scope stack.");
49-
}
50-
})
42+
PLAN_SCOPE.replace(self.save.take());
5143
}
5244
}
5345

@@ -186,20 +178,34 @@ pub struct PlanScope {
186178
pub metrics_registry: Arc<ScopedRegistry>,
187179
}
188180

181+
thread_local! {
182+
static PLAN_SCOPE: RefCell<Option<Arc<PlanScope>>> = const { RefCell::new(None) };
183+
}
184+
189185
impl PlanScope {
190186
pub fn create(
191187
id: u32,
192188
name: String,
193189
title: Arc<String>,
194190
labels: Arc<Vec<ProfileLabel>>,
195-
) -> PlanScope {
196-
PlanScope {
191+
) -> Arc<PlanScope> {
192+
let parent_id = PlanScope::get_plan_scope().map(|x| x.id);
193+
Arc::new(PlanScope {
197194
id,
198195
labels,
199196
title,
200-
parent_id: None,
197+
parent_id,
201198
name,
202199
metrics_registry: ScopedRegistry::create(None),
203-
}
200+
})
201+
}
202+
203+
pub fn enter_scope_guard(self: Arc<Self>) -> PlanScopeGuard {
204+
let old = PLAN_SCOPE.replace(Some(self));
205+
PlanScopeGuard::new(old)
206+
}
207+
208+
pub fn get_plan_scope() -> Option<Arc<PlanScope>> {
209+
PLAN_SCOPE.with(|x| x.borrow().clone())
204210
}
205211
}

src/query/service/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#![feature(try_blocks)]
3737
#![feature(variant_count)]
3838
#![feature(duration_constructors)]
39+
#![feature(get_mut_unchecked)]
3940
#![allow(clippy::diverging_sub_expression)]
4041
#![allow(clippy::arc_with_non_send_sync)]
4142

src/query/service/src/pipelines/builders/builder_exchange.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
16+
1517
use databend_common_exception::Result;
18+
use databend_common_pipeline_core::PlanScope;
1619
use databend_common_sql::executor::physical_plans::ExchangeSink;
1720
use databend_common_sql::executor::physical_plans::ExchangeSource;
1821

@@ -27,8 +30,21 @@ impl PipelineBuilder {
2730
self.exchange_injector.clone(),
2831
)?;
2932

30-
// add profile
31-
build_res.main_pipeline.reset_scopes(&self.main_pipeline);
33+
let plan_scope = PlanScope::get_plan_scope();
34+
for node in build_res.main_pipeline.graph.node_weights_mut() {
35+
let Some(scope) = node.scope.as_mut() else {
36+
node.scope = plan_scope.clone();
37+
continue;
38+
};
39+
40+
if scope.parent_id.is_none() {
41+
unsafe {
42+
let scope = Arc::get_mut_unchecked(scope);
43+
scope.parent_id = plan_scope.as_ref().map(|x| x.id);
44+
}
45+
}
46+
}
47+
3248
// add sharing data
3349
self.join_state = build_res.builder_data.input_join_state;
3450
self.merge_into_probe_data_fields = build_res.builder_data.input_probe_schema;

src/query/service/src/pipelines/builders/builder_join.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,8 @@ impl PipelineBuilder {
3838
// Create a new pipeline builder with the same context as the current builder
3939
fn create_sub_pipeline_builder(&self) -> PipelineBuilder {
4040
let sub_context = QueryContext::create_from(self.ctx.as_ref());
41-
let mut sub_builder = PipelineBuilder::create(
42-
self.func_ctx.clone(),
43-
self.settings.clone(),
44-
sub_context,
45-
self.main_pipeline.get_scopes(),
46-
);
41+
let mut sub_builder =
42+
PipelineBuilder::create(self.func_ctx.clone(), self.settings.clone(), sub_context);
4743
sub_builder.hash_join_states = self.hash_join_states.clone();
4844
sub_builder
4945
}

src/query/service/src/pipelines/builders/builder_union_all.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,8 @@ impl PipelineBuilder {
7272

7373
fn expand_union_all(&mut self, input: &PhysicalPlan) -> Result<Receiver<DataBlock>> {
7474
let union_ctx = QueryContext::create_from(self.ctx.as_ref());
75-
let mut pipeline_builder = PipelineBuilder::create(
76-
self.func_ctx.clone(),
77-
self.settings.clone(),
78-
union_ctx,
79-
self.main_pipeline.get_scopes(),
80-
);
75+
let mut pipeline_builder =
76+
PipelineBuilder::create(self.func_ctx.clone(), self.settings.clone(), union_ctx);
8177
pipeline_builder.hash_join_states = self.hash_join_states.clone();
8278

8379
let mut build_res = pipeline_builder.finalize(input)?;

src/query/service/src/pipelines/executor/executor_graph.rs

Lines changed: 34 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pub(crate) struct Node {
9696
impl Node {
9797
pub fn create(
9898
pid: usize,
99-
scope: Option<PlanScope>,
99+
scope: Option<Arc<PlanScope>>,
100100
processor: &ProcessorPtr,
101101
inputs_port: &[Arc<InputPort>],
102102
outputs_port: &[Arc<OutputPort>],
@@ -237,68 +237,32 @@ impl ExecutingGraph {
237237
graph: &mut StableGraph<Arc<Node>, EdgeInfo>,
238238
time_series_profile_builder: &mut QueryTimeSeriesProfileBuilder,
239239
) {
240-
#[derive(Debug)]
241-
struct Edge {
242-
source_port: usize,
243-
source_node: NodeIndex,
244-
target_port: usize,
245-
target_node: NodeIndex,
246-
}
247-
248-
let mut pipes_edges: Vec<Vec<Edge>> = Vec::new();
249-
for pipe in &pipeline.pipes {
250-
assert_eq!(
251-
pipe.input_length,
252-
pipes_edges.last().map(|x| x.len()).unwrap_or_default()
253-
);
254-
255-
let mut edge_index = 0;
256-
let mut pipe_edges = Vec::with_capacity(pipe.output_length);
257-
258-
for item in &pipe.items {
259-
let pid = graph.node_count();
260-
let time_series_profile = if let Some(scope) = pipe.scope.as_ref() {
261-
let plan_id = scope.id;
262-
let time_series_profile =
263-
time_series_profile_builder.register_time_series_profile(plan_id);
264-
Some(time_series_profile)
265-
} else {
266-
None
267-
};
268-
let node = Node::create(
269-
pid,
270-
pipe.scope.clone(),
271-
&item.processor,
272-
&item.inputs_port,
273-
&item.outputs_port,
274-
time_series_profile,
275-
);
276-
277-
let graph_node_index = graph.add_node(node.clone());
278-
unsafe {
279-
item.processor.set_id(graph_node_index);
280-
}
281-
282-
for offset in 0..item.inputs_port.len() {
283-
let last_edges = pipes_edges.last_mut().unwrap();
240+
let offset = graph.node_count();
241+
for node in pipeline.graph.node_weights() {
242+
let pid = graph.node_count();
243+
let mut time_series_profile = None;
244+
245+
if let Some(scope) = node.scope.as_ref() {
246+
let plan_id = scope.id;
247+
time_series_profile =
248+
Some(time_series_profile_builder.register_time_series_profile(plan_id));
249+
}
284250

285-
last_edges[edge_index].target_port = offset;
286-
last_edges[edge_index].target_node = graph_node_index;
287-
edge_index += 1;
288-
}
251+
let graph_node_index = graph.add_node(Node::create(
252+
pid,
253+
node.scope.clone(),
254+
&node.proc,
255+
&node.inputs,
256+
&node.outputs,
257+
time_series_profile,
258+
));
289259

290-
for offset in 0..item.outputs_port.len() {
291-
pipe_edges.push(Edge {
292-
source_port: offset,
293-
source_node: graph_node_index,
294-
target_port: 0,
295-
target_node: Default::default(),
296-
});
297-
}
260+
unsafe {
261+
node.proc.set_id(graph_node_index);
298262
}
299-
300-
pipes_edges.push(pipe_edges);
301263
}
264+
265+
// FIXME:
302266
let query_time_series = Arc::new(time_series_profile_builder.build());
303267
let node_indices: Vec<_> = graph.node_indices().collect();
304268
for node_index in node_indices {
@@ -313,23 +277,24 @@ impl ExecutingGraph {
313277
}
314278
}
315279

316-
// The last pipe cannot contain any output edge.
317-
assert!(pipes_edges.last().map(|x| x.is_empty()).unwrap_or_default());
318-
pipes_edges.pop();
280+
for edge in pipeline.graph.edge_indices() {
281+
let index = EdgeIndex::new(edge.index());
282+
if let Some((source, target)) = pipeline.graph.edge_endpoints(index) {
283+
let source = NodeIndex::new(offset + source.index());
284+
let target = NodeIndex::new(offset + target.index());
285+
let edge_weight = pipeline.graph.edge_weight(index).unwrap();
319286

320-
for pipe_edges in &pipes_edges {
321-
for edge in pipe_edges {
322-
let edge_index = graph.add_edge(edge.source_node, edge.target_node, EdgeInfo {
323-
input_index: edge.target_port,
324-
output_index: edge.source_port,
287+
let edge_index = graph.add_edge(source, target, EdgeInfo {
288+
input_index: edge_weight.input_index,
289+
output_index: edge_weight.output_index,
325290
});
326291

327292
unsafe {
328-
let (target_node, target_port) = (edge.target_node, edge.target_port);
293+
let (target_node, target_port) = (target, edge_weight.input_index);
329294
let input_trigger = graph[target_node].create_trigger(edge_index);
330295
graph[target_node].inputs_port[target_port].set_trigger(input_trigger);
331296

332-
let (source_node, source_port) = (edge.source_node, edge.source_port);
297+
let (source_node, source_port) = (source, edge_weight.output_index);
333298
let output_trigger = graph[source_node].create_trigger(edge_index);
334299
graph[source_node].outputs_port[source_port].set_trigger(output_trigger);
335300

src/query/service/src/pipelines/executor/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ mod global_queries_executor;
2222
mod pipeline_complete_executor;
2323
mod pipeline_executor;
2424
mod pipeline_pulling_executor;
25-
mod pipeline_pushing_executor;
2625
mod processor_async_task;
2726
mod queries_executor_tasks;
2827
mod queries_pipeline_executor;
@@ -40,7 +39,6 @@ pub use global_queries_executor::GlobalQueriesExecutor;
4039
pub use pipeline_complete_executor::PipelineCompleteExecutor;
4140
pub use pipeline_executor::PipelineExecutor;
4241
pub use pipeline_pulling_executor::PipelinePullingExecutor;
43-
pub use pipeline_pushing_executor::PipelinePushingExecutor;
4442
pub use processor_async_task::ProcessorAsyncTask;
4543
pub use queries_executor_tasks::QueriesExecutorTasksQueue;
4644
pub use queries_pipeline_executor::QueriesPipelineExecutor;

src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub struct PipelinePullingExecutor {
102102

103103
impl PipelinePullingExecutor {
104104
fn wrap_pipeline(pipeline: &mut Pipeline, tx: SyncSender<DataBlock>) -> Result<()> {
105-
if pipeline.is_pushing_pipeline()? || !pipeline.is_pulling_pipeline()? {
105+
if !pipeline.is_pulling_pipeline()? {
106106
return Err(ErrorCode::Internal(
107107
"Logical error, PipelinePullingExecutor can only work on pulling pipeline.",
108108
));

0 commit comments

Comments
 (0)