Skip to content

refactor(executor): refactor cancell processor async task #11954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::pipelines::executor::executor_condvar::WorkersCondvar;
use crate::pipelines::executor::executor_tasks::ExecutorTasksQueue;
use crate::pipelines::executor::executor_worker_context::ExecutorTask;
use crate::pipelines::executor::executor_worker_context::ExecutorWorkerContext;
use crate::pipelines::executor::processor_async_task::ProcessorAsyncTask;
use crate::pipelines::executor::processor_async_task::ProcessorAsyncFuture;
use crate::pipelines::executor::PipelineExecutor;
use crate::pipelines::pipeline::Pipeline;
use crate::pipelines::processors::connect;
Expand Down Expand Up @@ -337,7 +337,7 @@ impl ScheduleQueue {
let process_future = proc.async_process();
executor
.async_runtime
.spawn(TrackedFuture::create(ProcessorAsyncTask::create(
.spawn(TrackedFuture::create(ProcessorAsyncFuture::create(
query_id,
wakeup_worker_id,
proc.clone(),
Expand Down
36 changes: 34 additions & 2 deletions src/query/service/src/pipelines/executor/executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Weak;

use common_base::base::tokio::sync::Notify;
use common_exception::Result;
Expand All @@ -26,6 +28,7 @@ use crate::pipelines::executor::executor_condvar::WorkersCondvar;
use crate::pipelines::executor::executor_condvar::WorkersWaitingStatus;
use crate::pipelines::executor::executor_worker_context::ExecutorTask;
use crate::pipelines::executor::executor_worker_context::ExecutorWorkerContext;
use crate::pipelines::executor::processor_async_task::ProcessorAsyncTask;
use crate::pipelines::processors::processor::ProcessorPtr;

pub struct ExecutorTasksQueue {
Expand All @@ -48,6 +51,7 @@ impl ExecutorTasksQueue {
self.finished_notify.notify_waiters();

let mut workers_tasks = self.workers_tasks.lock();

let mut wakeup_workers =
Vec::with_capacity(workers_tasks.workers_waiting_status.waiting_size());

Expand All @@ -56,6 +60,14 @@ impl ExecutorTasksQueue {
wakeup_workers.push(worker_id);
}

for pending_sync_tasks in &mut workers_tasks.workers_pending_async_tasks {
for (_proc, async_task) in pending_sync_tasks.drain() {
if let Some(async_task) = Weak::upgrade(&async_task) {
async_task.wakeup();
}
}
}

drop(workers_tasks);
for wakeup_worker in wakeup_workers {
workers_condvar.wakeup(wakeup_worker);
Expand Down Expand Up @@ -163,6 +175,10 @@ impl ExecutorTasksQueue {
let mut workers_tasks = self.workers_tasks.lock();

let mut worker_id = task.worker_id;
if task.has_pending {
workers_tasks.workers_pending_async_tasks[worker_id].remove(&task.id.index());
}

workers_tasks.tasks_size += 1;
workers_tasks.workers_completed_async_tasks[worker_id].push_back(task);

Expand All @@ -182,6 +198,12 @@ impl ExecutorTasksQueue {
}
}

pub fn pending_async_task(&self, task: &Arc<ProcessorAsyncTask>) {
let mut workers_tasks = self.workers_tasks.lock();
workers_tasks.workers_pending_async_tasks[task.worker_id]
.insert(task.processor_id.index(), Arc::downgrade(task));
}

pub fn get_finished_notify(&self) -> Arc<Notify> {
self.finished_notify.clone()
}
Expand All @@ -197,11 +219,17 @@ pub struct CompletedAsyncTask {
pub id: NodeIndex,
pub worker_id: usize,
pub res: Result<()>,
pub has_pending: bool,
}

impl CompletedAsyncTask {
pub fn create(id: NodeIndex, worker_id: usize, res: Result<()>) -> Self {
CompletedAsyncTask { id, worker_id, res }
pub fn create(id: NodeIndex, worker_id: usize, pending: bool, res: Result<()>) -> Self {
CompletedAsyncTask {
id,
worker_id,
res,
has_pending: pending,
}
}
}

Expand All @@ -210,23 +238,27 @@ struct ExecutorTasks {
workers_waiting_status: WorkersWaitingStatus,
workers_sync_tasks: Vec<VecDeque<ProcessorPtr>>,
workers_completed_async_tasks: Vec<VecDeque<CompletedAsyncTask>>,
workers_pending_async_tasks: Vec<HashMap<usize, Weak<ProcessorAsyncTask>>>,
}

unsafe impl Send for ExecutorTasks {}

impl ExecutorTasks {
pub fn create(workers_size: usize) -> ExecutorTasks {
let mut workers_sync_tasks = Vec::with_capacity(workers_size);
let mut workers_pending_async_tasks = Vec::with_capacity(workers_size);
let mut workers_completed_async_tasks = Vec::with_capacity(workers_size);

for _index in 0..workers_size {
workers_sync_tasks.push(VecDeque::new());
workers_pending_async_tasks.push(HashMap::new());
workers_completed_async_tasks.push(VecDeque::new());
}

ExecutorTasks {
tasks_size: 0,
workers_sync_tasks,
workers_pending_async_tasks,
workers_completed_async_tasks,
workers_waiting_status: WorkersWaitingStatus::create(workers_size),
}
Expand Down
187 changes: 109 additions & 78 deletions src/query/service/src/pipelines/executor/processor_async_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,123 +13,154 @@
// limitations under the License.

use std::future::Future;
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use std::task::Waker;

use common_base::base::tokio::time::sleep;
use common_base::runtime::catch_unwind;
use common_exception::ErrorCode;
use common_exception::Result;
use common_pipeline_core::processors::processor::ProcessorPtr;
use futures_util::future::BoxFuture;
use futures_util::future::Either;
use futures_util::FutureExt;
use petgraph::prelude::NodeIndex;

use crate::pipelines::executor::executor_condvar::WorkersCondvar;
use crate::pipelines::executor::executor_tasks::CompletedAsyncTask;
use crate::pipelines::executor::executor_tasks::ExecutorTasksQueue;

pub struct ProcessorAsyncTask {
worker_id: usize,
processor_id: NodeIndex,
pub worker_id: usize,
pub processor_id: NodeIndex,
queue: Arc<ExecutorTasksQueue>,
workers_condvar: Arc<WorkersCondvar>,
inner: BoxFuture<'static, Result<()>>,
pending_waker: AtomicPtr<Waker>,
}

impl ProcessorAsyncTask {
pub fn create<Inner: Future<Output = Result<()>> + Send + 'static>(
query_id: Arc<String>,
pub fn create(
worker_id: usize,
processor: ProcessorPtr,
processor_id: NodeIndex,
queue: Arc<ExecutorTasksQueue>,
workers_condvar: Arc<WorkersCondvar>,
inner: Inner,
) -> ProcessorAsyncTask {
let finished_notify = queue.get_finished_notify();

let inner = async move {
let left = Box::pin(inner);
let right = Box::pin(finished_notify.notified());
match futures::future::select(left, right).await {
Either::Left((res, _)) => res,
Either::Right((_, _)) => Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
)),
) -> Arc<ProcessorAsyncTask> {
Arc::new(ProcessorAsyncTask {
worker_id,
processor_id,
queue,
workers_condvar,
pending_waker: AtomicPtr::new(std::ptr::null_mut()),
})
}

#[inline(always)]
pub fn is_finished(&self) -> bool {
self.queue.is_finished()
}

#[inline(always)]
pub fn finish(self: Arc<Self>, res: Result<()>) -> Poll<()> {
self.queue.completed_async_task(
self.workers_condvar.clone(),
CompletedAsyncTask::create(
self.processor_id,
self.worker_id,
self.pending_waker.load(Ordering::SeqCst) != std::ptr::null_mut(),
res,
),
);

Poll::Ready(())
}

pub fn wakeup(&self) {
let waker = self
.pending_waker
.swap(std::ptr::null_mut(), Ordering::SeqCst);
if !waker.is_null() {
unsafe {
let waker = std::ptr::read(waker as *const Waker);
waker.wake();
}
};

let processor_id = unsafe { processor.id() };
let processor_name = unsafe { processor.name() };
let queue_clone = queue.clone();
let inner = async move {
let start = Instant::now();
let mut inner = inner.boxed();

loop {
let interval = Box::pin(sleep(Duration::from_secs(5)));
match futures::future::select(interval, inner).await {
Either::Left((_, right)) => {
inner = right;
let active_workers = queue_clone.active_workers();
tracing::warn!(
"Very slow processor async task, query_id:{:?}, processor id: {:?}, name: {:?}, elapsed: {:?}, active sync workers: {:?}",
query_id,
processor_id,
processor_name,
start.elapsed(),
active_workers,
);
}
}

#[inline(always)]
pub fn set_pending_watcher(self: Arc<Self>, waker: Waker) -> Poll<()> {
let mut expected = std::ptr::null_mut();
let desired = Box::into_raw(Box::new(waker));

loop {
match self.pending_waker.compare_exchange_weak(
expected,
desired,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Err(new_expected) => unsafe {
if !new_expected.is_null() && (&*new_expected).will_wake(&*desired) {
return Poll::Pending;
}
Either::Right((res, _)) => {
return res;

expected = new_expected;
},
Ok(old_value) => {
if !old_value.is_null() {
unsafe { drop(Box::from_raw(old_value)) };
}

self.queue.pending_async_task(&self);
return Poll::Pending;
}
}
};
}
}
}

ProcessorAsyncTask {
worker_id,
processor_id,
queue,
workers_condvar,
inner: inner.boxed(),
pub struct ProcessorAsyncFuture<Inner: Future<Output = Result<()>> + Send + 'static> {
inner: Inner,
task: Arc<ProcessorAsyncTask>,
}

impl<Inner: Future<Output = Result<()>> + Send + 'static> ProcessorAsyncFuture<Inner> {
pub fn create(
_query_id: Arc<String>,
worker_id: usize,
processor: ProcessorPtr,
queue: Arc<ExecutorTasksQueue>,
workers_condvar: Arc<WorkersCondvar>,
inner: Inner,
) -> ProcessorAsyncFuture<Inner> {
ProcessorAsyncFuture {
inner,
task: ProcessorAsyncTask::create(
worker_id,
unsafe { processor.id() },
queue,
workers_condvar,
),
}
}
}

impl Future for ProcessorAsyncTask {
impl<Inner: Future<Output = Result<()>> + Send + 'static> Future for ProcessorAsyncFuture<Inner> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.queue.is_finished() {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.task.is_finished() {
return Poll::Ready(());
}

let inner = self.inner.as_mut();

match catch_unwind(move || inner.poll(cx)) {
Ok(Poll::Pending) => Poll::Pending,
Ok(Poll::Ready(res)) => {
self.queue.completed_async_task(
self.workers_condvar.clone(),
CompletedAsyncTask::create(self.processor_id, self.worker_id, res),
);
Poll::Ready(())
}
Err(cause) => {
self.queue.completed_async_task(
self.workers_condvar.clone(),
CompletedAsyncTask::create(self.processor_id, self.worker_id, Err(cause)),
);
let task = self.task.clone();
let inner = unsafe { self.map_unchecked_mut(|x| &mut x.inner) };

Poll::Ready(())
}
match catch_unwind(move || (inner.poll(cx), cx)) {
Ok((Poll::Ready(res), _)) => task.finish(res),
Err(cause) => task.finish(Err(cause)),
Ok((Poll::Pending, cx)) => task.set_pending_watcher(cx.waker().clone()),
}
}
}
1 change: 1 addition & 0 deletions src/query/service/src/table_functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod infer_schema;
mod list_stage;
mod numbers;
mod openai;
mod slow_async;
mod srf;
mod sync_crash_me;
mod table_function;
Expand Down
Loading