Skip to content

Commit 78d0de4

Browse files
authored
refactor(executor): add graph information to prepare adding points (#14681)
* chore: add graph information to prepare adding points Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * fix: unit test Signed-off-by: Liuqing Yue <dqhl76@gmail.com> * chore: clean Signed-off-by: Liuqing Yue <dqhl76@gmail.com> --------- Signed-off-by: Liuqing Yue <dqhl76@gmail.com>
1 parent b459400 commit 78d0de4

File tree

6 files changed

+143
-95
lines changed

6 files changed

+143
-95
lines changed

โ€Žsrc/query/service/src/pipelines/executor/executor_graph.rs

Lines changed: 49 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,11 @@ impl ExecutingGraph {
228228
pub unsafe fn init_schedule_queue(
229229
locker: &StateLockGuard,
230230
capacity: usize,
231+
graph: &Arc<RunningGraph>,
231232
) -> Result<ScheduleQueue> {
232233
let mut schedule_queue = ScheduleQueue::with_capacity(capacity);
233234
for sink_index in locker.graph.externals(Direction::Outgoing) {
234-
ExecutingGraph::schedule_queue(locker, sink_index, &mut schedule_queue)?;
235+
ExecutingGraph::schedule_queue(locker, sink_index, &mut schedule_queue, graph)?;
235236
}
236237

237238
Ok(schedule_queue)
@@ -244,6 +245,7 @@ impl ExecutingGraph {
244245
locker: &StateLockGuard,
245246
index: NodeIndex,
246247
schedule_queue: &mut ScheduleQueue,
248+
graph: &Arc<RunningGraph>,
247249
) -> Result<()> {
248250
let mut need_schedule_nodes = VecDeque::new();
249251
let mut need_schedule_edges = VecDeque::new();
@@ -304,11 +306,17 @@ impl ExecutingGraph {
304306
}
305307
Event::NeedData | Event::NeedConsume => State::Idle,
306308
Event::Sync => {
307-
schedule_queue.push_sync(node.processor.clone());
309+
schedule_queue.push_sync(ProcessorWrapper {
310+
processor: node.processor.clone(),
311+
graph: graph.clone(),
312+
});
308313
State::Processing
309314
}
310315
Event::Async => {
311-
schedule_queue.push_async(node.processor.clone());
316+
schedule_queue.push_async(ProcessorWrapper {
317+
processor: node.processor.clone(),
318+
graph: graph.clone(),
319+
});
312320
State::Processing
313321
}
314322
};
@@ -322,9 +330,15 @@ impl ExecutingGraph {
322330
}
323331
}
324332

333+
#[derive(Clone)]
334+
pub struct ProcessorWrapper {
335+
pub processor: ProcessorPtr,
336+
pub graph: Arc<RunningGraph>,
337+
}
338+
325339
pub struct ScheduleQueue {
326-
pub sync_queue: VecDeque<ProcessorPtr>,
327-
pub async_queue: VecDeque<ProcessorPtr>,
340+
pub sync_queue: VecDeque<ProcessorWrapper>,
341+
pub async_queue: VecDeque<ProcessorWrapper>,
328342
}
329343

330344
impl ScheduleQueue {
@@ -336,25 +350,15 @@ impl ScheduleQueue {
336350
}
337351

338352
#[inline]
339-
pub fn push_sync(&mut self, processor: ProcessorPtr) {
353+
pub fn push_sync(&mut self, processor: ProcessorWrapper) {
340354
self.sync_queue.push_back(processor);
341355
}
342356

343357
#[inline]
344-
pub fn push_async(&mut self, processor: ProcessorPtr) {
358+
pub fn push_async(&mut self, processor: ProcessorWrapper) {
345359
self.async_queue.push_back(processor);
346360
}
347361

348-
pub fn schedule_tail(mut self, global: &ExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
349-
let mut tasks = VecDeque::with_capacity(self.sync_queue.len());
350-
351-
while let Some(processor) = self.sync_queue.pop_front() {
352-
tasks.push_back(ExecutorTask::Sync(processor));
353-
}
354-
355-
global.push_tasks(ctx, tasks)
356-
}
357-
358362
pub fn schedule(
359363
mut self,
360364
global: &Arc<ExecutorTasksQueue>,
@@ -384,7 +388,7 @@ impl ScheduleQueue {
384388
}
385389

386390
pub fn schedule_async_task(
387-
proc: ProcessorPtr,
391+
proc: ProcessorWrapper,
388392
query_id: Arc<String>,
389393
executor: &Arc<PipelineExecutor>,
390394
wakeup_worker_id: usize,
@@ -394,18 +398,20 @@ impl ScheduleQueue {
394398
unsafe {
395399
workers_condvar.inc_active_async_worker();
396400
let weak_executor = Arc::downgrade(executor);
397-
let node_profile = executor.graph.get_node_profile(proc.id()).clone();
398-
let process_future = proc.async_process();
401+
let graph = proc.graph;
402+
let node_profile = executor.graph.get_node_profile(proc.processor.id()).clone();
403+
let process_future = proc.processor.async_process();
399404
executor.async_runtime.spawn(
400405
query_id.as_ref().clone(),
401406
TrackedFuture::create(ProcessorAsyncTask::create(
402407
query_id,
403408
wakeup_worker_id,
404-
proc.clone(),
409+
proc.processor.clone(),
405410
global_queue,
406411
workers_condvar,
407412
weak_executor,
408413
node_profile,
414+
graph,
409415
process_future,
410416
))
411417
.in_span(Span::enter_with_local_parent(std::any::type_name::<
@@ -420,36 +426,46 @@ impl ScheduleQueue {
420426
ctx.set_task(ExecutorTask::Sync(processor));
421427
}
422428
}
429+
430+
pub fn schedule_tail(mut self, global: &ExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
431+
let mut tasks = VecDeque::with_capacity(self.sync_queue.len());
432+
433+
while let Some(processor) = self.sync_queue.pop_front() {
434+
tasks.push_back(ExecutorTask::Sync(processor));
435+
}
436+
437+
global.push_tasks(ctx, tasks)
438+
}
423439
}
424440

425441
pub struct RunningGraph(ExecutingGraph);
426442

427443
impl RunningGraph {
428-
pub fn create(pipeline: Pipeline) -> Result<RunningGraph> {
444+
pub fn create(pipeline: Pipeline) -> Result<Arc<RunningGraph>> {
429445
let graph_state = ExecutingGraph::create(pipeline)?;
430446
debug!("Create running graph:{:?}", graph_state);
431-
Ok(RunningGraph(graph_state))
447+
Ok(Arc::new(RunningGraph(graph_state)))
432448
}
433449

434-
pub fn from_pipelines(pipelines: Vec<Pipeline>) -> Result<RunningGraph> {
450+
pub fn from_pipelines(pipelines: Vec<Pipeline>) -> Result<Arc<RunningGraph>> {
435451
let graph_state = ExecutingGraph::from_pipelines(pipelines)?;
436452
debug!("Create running graph:{:?}", graph_state);
437-
Ok(RunningGraph(graph_state))
453+
Ok(Arc::new(RunningGraph(graph_state)))
438454
}
439455

440456
/// # Safety
441457
///
442458
/// Method is thread unsafe and require thread safe call
443-
pub unsafe fn init_schedule_queue(&self, capacity: usize) -> Result<ScheduleQueue> {
444-
ExecutingGraph::init_schedule_queue(&self.0, capacity)
459+
pub unsafe fn init_schedule_queue(self: Arc<Self>, capacity: usize) -> Result<ScheduleQueue> {
460+
ExecutingGraph::init_schedule_queue(&self.0, capacity, &self)
445461
}
446462

447463
/// # Safety
448464
///
449465
/// Method is thread unsafe and require thread safe call
450-
pub unsafe fn schedule_queue(&self, node_index: NodeIndex) -> Result<ScheduleQueue> {
466+
pub unsafe fn schedule_queue(self: Arc<Self>, node_index: NodeIndex) -> Result<ScheduleQueue> {
451467
let mut schedule_queue = ScheduleQueue::with_capacity(0);
452-
ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue)?;
468+
ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue, &self)?;
453469
Ok(schedule_queue)
454470
}
455471

@@ -627,15 +643,15 @@ impl Debug for ScheduleQueue {
627643

628644
for item in &self.sync_queue {
629645
sync_queue.push(QueueItem {
630-
id: item.id().index(),
631-
name: item.name().to_string(),
646+
id: item.processor.id().index(),
647+
name: item.processor.name().to_string(),
632648
})
633649
}
634650

635651
for item in &self.async_queue {
636652
async_queue.push(QueueItem {
637-
id: item.id().index(),
638-
name: item.name().to_string(),
653+
id: item.processor.id().index(),
654+
name: item.processor.name().to_string(),
639655
})
640656
}
641657

โ€Žsrc/query/service/src/pipelines/executor/executor_tasks.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ use databend_common_exception::Result;
2121
use parking_lot::Mutex;
2222
use petgraph::prelude::NodeIndex;
2323

24+
use crate::pipelines::executor::executor_graph::ProcessorWrapper;
2425
use crate::pipelines::executor::ExecutorTask;
2526
use crate::pipelines::executor::ExecutorWorkerContext;
27+
use crate::pipelines::executor::RunningGraph;
2628
use crate::pipelines::executor::WatchNotify;
2729
use crate::pipelines::executor::WorkersCondvar;
2830
use crate::pipelines::executor::WorkersWaitingStatus;
29-
use crate::pipelines::processors::ProcessorPtr;
3031

3132
pub struct ExecutorTasksQueue {
3233
finished: Arc<AtomicBool>,
@@ -117,7 +118,7 @@ impl ExecutorTasksQueue {
117118
workers_condvar.wait(worker_id, self.finished.clone());
118119
}
119120

120-
pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorPtr>) {
121+
pub fn init_sync_tasks(&self, tasks: VecDeque<ProcessorWrapper>) {
121122
let mut workers_tasks = self.workers_tasks.lock();
122123

123124
let mut worker_id = 0;
@@ -197,18 +198,29 @@ pub struct CompletedAsyncTask {
197198
pub id: NodeIndex,
198199
pub worker_id: usize,
199200
pub res: Result<()>,
201+
pub graph: Arc<RunningGraph>,
200202
}
201203

202204
impl CompletedAsyncTask {
203-
pub fn create(id: NodeIndex, worker_id: usize, res: Result<()>) -> Self {
204-
CompletedAsyncTask { id, worker_id, res }
205+
pub fn create(
206+
id: NodeIndex,
207+
worker_id: usize,
208+
res: Result<()>,
209+
graph: Arc<RunningGraph>,
210+
) -> Self {
211+
CompletedAsyncTask {
212+
id,
213+
worker_id,
214+
res,
215+
graph,
216+
}
205217
}
206218
}
207219

208220
struct ExecutorTasks {
209221
tasks_size: usize,
210222
workers_waiting_status: WorkersWaitingStatus,
211-
workers_sync_tasks: Vec<VecDeque<ProcessorPtr>>,
223+
workers_sync_tasks: Vec<VecDeque<ProcessorWrapper>>,
212224
workers_completed_async_tasks: Vec<VecDeque<CompletedAsyncTask>>,
213225
}
214226

โ€Žsrc/query/service/src/pipelines/executor/executor_worker_context.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ use databend_common_pipeline_core::processors::Profile;
2424
use databend_common_pipeline_core::processors::ProfileStatisticsName;
2525
use petgraph::prelude::NodeIndex;
2626

27+
use crate::pipelines::executor::executor_graph::ProcessorWrapper;
2728
use crate::pipelines::executor::CompletedAsyncTask;
29+
use crate::pipelines::executor::PipelineExecutor;
2830
use crate::pipelines::executor::RunningGraph;
2931
use crate::pipelines::executor::WorkersCondvar;
30-
use crate::pipelines::processors::ProcessorPtr;
3132

3233
pub enum ExecutorTask {
3334
None,
34-
Sync(ProcessorPtr),
35+
Sync(ProcessorWrapper),
3536
AsyncCompleted(CompletedAsyncTask),
3637
}
3738

@@ -73,12 +74,15 @@ impl ExecutorWorkerContext {
7374
}
7475

7576
/// # Safety
76-
pub unsafe fn execute_task(&mut self, graph: &RunningGraph) -> Result<NodeIndex> {
77+
pub unsafe fn execute_task(
78+
&mut self,
79+
_: &Arc<PipelineExecutor>,
80+
) -> Result<Option<(NodeIndex, Arc<RunningGraph>)>> {
7781
match std::mem::replace(&mut self.task, ExecutorTask::None) {
7882
ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")),
79-
ExecutorTask::Sync(processor) => self.execute_sync_task(processor, graph),
83+
ExecutorTask::Sync(processor) => self.execute_sync_task(processor),
8084
ExecutorTask::AsyncCompleted(task) => match task.res {
81-
Ok(_) => Ok(task.id),
85+
Ok(_) => Ok(Some((task.id, task.graph))),
8286
Err(cause) => Err(cause),
8387
},
8488
}
@@ -87,19 +91,18 @@ impl ExecutorWorkerContext {
8791
/// # Safety
8892
unsafe fn execute_sync_task(
8993
&mut self,
90-
proc: ProcessorPtr,
91-
graph: &RunningGraph,
92-
) -> Result<NodeIndex> {
93-
Profile::track_profile(graph.get_node_profile(proc.id()));
94+
proc: ProcessorWrapper,
95+
) -> Result<Option<(NodeIndex, Arc<RunningGraph>)>> {
96+
Profile::track_profile(proc.graph.get_node_profile(proc.processor.id()));
9497

9598
let instant = Instant::now();
9699

97-
proc.process()?;
100+
proc.processor.process()?;
98101

99102
let nanos = instant.elapsed().as_nanos();
100103
assume(nanos < 18446744073709551615_u128);
101104
Profile::record_usize_profile(ProfileStatisticsName::CpuTime, nanos as usize);
102-
Ok(proc.id())
105+
Ok(Some((proc.processor.id(), proc.graph)))
103106
}
104107

105108
pub fn get_workers_condvar(&self) -> &Arc<WorkersCondvar> {
@@ -115,8 +118,8 @@ impl Debug for ExecutorTask {
115118
ExecutorTask::Sync(p) => write!(
116119
f,
117120
"ExecutorTask::Sync {{ id: {}, name: {}}}",
118-
p.id().index(),
119-
p.name()
121+
p.processor.id().index(),
122+
p.processor.name()
120123
),
121124
ExecutorTask::AsyncCompleted(_) => write!(f, "ExecutorTask::CompletedAsync"),
122125
}

โ€Žsrc/query/service/src/pipelines/executor/pipeline_executor.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ pub type FinishedCallback =
5353

5454
pub struct PipelineExecutor {
5555
threads_num: usize,
56-
pub(crate) graph: RunningGraph,
56+
pub(crate) graph: Arc<RunningGraph>,
5757
workers_condvar: Arc<WorkersCondvar>,
5858
pub async_runtime: Arc<Runtime>,
5959
pub global_tasks_queue: Arc<ExecutorTasksQueue>,
@@ -173,7 +173,7 @@ impl PipelineExecutor {
173173
}
174174

175175
fn try_create(
176-
graph: RunningGraph,
176+
graph: Arc<RunningGraph>,
177177
threads_num: usize,
178178
on_init_callback: Mutex<Option<InitCallback>>,
179179
on_finished_callback: Mutex<Option<FinishedCallback>>,
@@ -228,7 +228,7 @@ impl PipelineExecutor {
228228

229229
#[minitrace::trace]
230230
pub fn execute(self: &Arc<Self>) -> Result<()> {
231-
self.init()?;
231+
self.init(self.graph.clone())?;
232232

233233
self.start_executor_daemon()?;
234234

@@ -265,7 +265,7 @@ impl PipelineExecutor {
265265
Ok(())
266266
}
267267

268-
fn init(self: &Arc<Self>) -> Result<()> {
268+
fn init(self: &Arc<Self>, graph: Arc<RunningGraph>) -> Result<()> {
269269
unsafe {
270270
// TODO: the on init callback cannot be killed.
271271
{
@@ -285,7 +285,7 @@ impl PipelineExecutor {
285285
);
286286
}
287287

288-
let mut init_schedule_queue = self.graph.init_schedule_queue(self.threads_num)?;
288+
let mut init_schedule_queue = graph.init_schedule_queue(self.threads_num)?;
289289

290290
let mut wakeup_worker_id = 0;
291291
while let Some(proc) = init_schedule_queue.async_queue.pop_front() {
@@ -400,13 +400,13 @@ impl PipelineExecutor {
400400
}
401401

402402
while !self.global_tasks_queue.is_finished() && context.has_task() {
403-
let executed_pid = context.execute_task(&self.graph)?;
404-
405-
// Not scheduled graph if pipeline is finished.
406-
if !self.global_tasks_queue.is_finished() {
407-
// We immediately schedule the processor again.
408-
let schedule_queue = self.graph.schedule_queue(executed_pid)?;
409-
schedule_queue.schedule(&self.global_tasks_queue, &mut context, self);
403+
if let Some((executed_pid, graph)) = context.execute_task(self)? {
404+
// Not scheduled graph if pipeline is finished.
405+
if !self.global_tasks_queue.is_finished() {
406+
// We immediately schedule the processor again.
407+
let schedule_queue = graph.schedule_queue(executed_pid)?;
408+
schedule_queue.schedule(&self.global_tasks_queue, &mut context, self);
409+
}
410410
}
411411
}
412412
}

0 commit comments

Comments
ย (0)