Skip to content

Commit 8bd6549

Browse files
authored
chore(query): tracking query log for mysql handler (#15495)
* chore(query): tracking query log for mysql handler * feat(query): tracking query log for mysql handler
1 parent 943f593 commit 8bd6549

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+235
-300
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/base/src/runtime/runtime.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub trait TrySpawn {
4343
///
4444
/// It allows to return an error before spawning the task.
4545
#[track_caller]
46-
fn try_spawn<T>(&self, id: impl Into<String>, task: T) -> Result<JoinHandle<T::Output>>
46+
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
4747
where
4848
T: Future + Send + 'static,
4949
T::Output: Send + 'static;
@@ -52,32 +52,32 @@ pub trait TrySpawn {
5252
///
5353
/// A default impl of this method just calls `try_spawn` and just panics if there is an error.
5454
#[track_caller]
55-
fn spawn<T>(&self, id: impl Into<String>, task: T) -> JoinHandle<T::Output>
55+
fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
5656
where
5757
T: Future + Send + 'static,
5858
T::Output: Send + 'static,
5959
{
60-
self.try_spawn(id, task).unwrap()
60+
self.try_spawn(task).unwrap()
6161
}
6262
}
6363

6464
impl<S: TrySpawn> TrySpawn for Arc<S> {
6565
#[track_caller]
66-
fn try_spawn<T>(&self, id: impl Into<String>, task: T) -> Result<JoinHandle<T::Output>>
66+
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
6767
where
6868
T: Future + Send + 'static,
6969
T::Output: Send + 'static,
7070
{
71-
self.as_ref().try_spawn(id, task)
71+
self.as_ref().try_spawn(task)
7272
}
7373

7474
#[track_caller]
75-
fn spawn<T>(&self, id: impl Into<String>, task: T) -> JoinHandle<T::Output>
75+
fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
7676
where
7777
T: Future + Send + 'static,
7878
T::Output: Send + 'static,
7979
{
80-
self.as_ref().spawn(id, task)
80+
self.as_ref().spawn(task)
8181
}
8282
}
8383

@@ -302,19 +302,20 @@ impl Runtime {
302302

303303
impl TrySpawn for Runtime {
304304
#[track_caller]
305-
fn try_spawn<T>(&self, id: impl Into<String>, task: T) -> Result<JoinHandle<T::Output>>
305+
fn try_spawn<T>(&self, task: T) -> Result<JoinHandle<T::Output>>
306306
where
307307
T: Future + Send + 'static,
308308
T::Output: Send + 'static,
309309
{
310310
let task = ThreadTracker::tracking_future(task);
311-
let id = id.into();
312-
let task = match id == GLOBAL_TASK {
313-
true => async_backtrace::location!(String::from(GLOBAL_TASK_DESC)).frame(task),
314-
false => {
315-
async_backtrace::location!(format!("Running query {} spawn task", id)).frame(task)
311+
let task = match ThreadTracker::query_id() {
312+
None => async_backtrace::location!(String::from(GLOBAL_TASK_DESC)).frame(task),
313+
Some(query_id) => {
314+
async_backtrace::location!(format!("Running query {} spawn task", query_id))
315+
.frame(task)
316316
}
317317
};
318+
318319
#[expect(clippy::disallowed_methods)]
319320
Ok(self.handle.spawn(task))
320321
}

src/common/base/tests/it/runtime.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::time::Instant;
2020

2121
use databend_common_base::runtime::Runtime;
2222
use databend_common_base::runtime::TrySpawn;
23-
use databend_common_base::GLOBAL_TASK;
2423
use databend_common_exception::Result;
2524
use rand::distributions::Distribution;
2625
use rand::distributions::Uniform;
@@ -33,16 +32,16 @@ async fn test_runtime() -> Result<()> {
3332

3433
let runtime = Runtime::with_default_worker_threads()?;
3534
let runtime_counter = Arc::clone(&counter);
36-
let runtime_header = runtime.spawn(GLOBAL_TASK, async move {
35+
let runtime_header = runtime.spawn(async move {
3736
let rt1 = Runtime::with_default_worker_threads().unwrap();
3837
let rt1_counter = Arc::clone(&runtime_counter);
39-
let rt1_header = rt1.spawn(GLOBAL_TASK, async move {
38+
let rt1_header = rt1.spawn(async move {
4039
let rt2 = Runtime::with_worker_threads(1, None).unwrap();
4140
let rt2_counter = Arc::clone(&rt1_counter);
42-
let rt2_header = rt2.spawn(GLOBAL_TASK, async move {
41+
let rt2_header = rt2.spawn(async move {
4342
let rt3 = Runtime::with_default_worker_threads().unwrap();
4443
let rt3_counter = Arc::clone(&rt2_counter);
45-
let rt3_header = rt3.spawn(GLOBAL_TASK, async move {
44+
let rt3_header = rt3.spawn(async move {
4645
let mut num = rt3_counter.lock().unwrap();
4746
*num += 1;
4847
});
@@ -73,7 +72,7 @@ async fn test_runtime() -> Result<()> {
7372
async fn test_shutdown_long_run_runtime() -> Result<()> {
7473
let runtime = Runtime::with_default_worker_threads()?;
7574

76-
runtime.spawn(GLOBAL_TASK, async move {
75+
runtime.spawn(async move {
7776
tokio::time::sleep(Duration::from_secs(6)).await;
7877
});
7978

src/common/storage/src/operator.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use anyhow::anyhow;
2424
use databend_common_base::base::GlobalInstance;
2525
use databend_common_base::runtime::GlobalIORuntime;
2626
use databend_common_base::runtime::TrySpawn;
27-
use databend_common_base::GLOBAL_TASK;
2827
use databend_common_exception::ErrorCode;
2928
use databend_common_meta_app::storage::StorageAzblobConfig;
3029
use databend_common_meta_app::storage::StorageCosConfig;
@@ -467,7 +466,7 @@ impl DataOperator {
467466
// IO hang on reuse connection.
468467
let op = operator.clone();
469468
if let Err(cause) = GlobalIORuntime::instance()
470-
.spawn(GLOBAL_TASK, async move {
469+
.spawn(async move {
471470
let res = op.stat("/").await;
472471
match res {
473472
Ok(_) => Ok(()),

src/common/storage/src/runtime_layer.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::Arc;
1818

1919
use databend_common_base::runtime::Runtime;
2020
use databend_common_base::runtime::TrySpawn;
21-
use databend_common_base::GLOBAL_TASK;
2221
use futures::Future;
2322
use opendal::raw::oio;
2423
use opendal::raw::Access;
@@ -106,7 +105,7 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
106105
let op = self.inner.clone();
107106
let path = path.to_string();
108107
self.runtime
109-
.spawn(GLOBAL_TASK, async move { op.create_dir(&path, args).await })
108+
.spawn(async move { op.create_dir(&path, args).await })
110109
.await
111110
.expect("join must success")
112111
}
@@ -117,7 +116,7 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
117116
let path = path.to_string();
118117

119118
self.runtime
120-
.spawn(GLOBAL_TASK, async move { op.read(&path, args).await })
119+
.spawn(async move { op.read(&path, args).await })
121120
.await
122121
.expect("join must success")
123122
.map(|(rp, r)| {
@@ -131,7 +130,7 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
131130
let op = self.inner.clone();
132131
let path = path.to_string();
133132
self.runtime
134-
.spawn(GLOBAL_TASK, async move { op.write(&path, args).await })
133+
.spawn(async move { op.write(&path, args).await })
135134
.await
136135
.expect("join must success")
137136
}
@@ -141,7 +140,7 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
141140
let op = self.inner.clone();
142141
let path = path.to_string();
143142
self.runtime
144-
.spawn(GLOBAL_TASK, async move { op.stat(&path, args).await })
143+
.spawn(async move { op.stat(&path, args).await })
145144
.await
146145
.expect("join must success")
147146
}
@@ -151,7 +150,7 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
151150
let op = self.inner.clone();
152151
let path = path.to_string();
153152
self.runtime
154-
.spawn(GLOBAL_TASK, async move { op.delete(&path, args).await })
153+
.spawn(async move { op.delete(&path, args).await })
155154
.await
156155
.expect("join must success")
157156
}
@@ -161,7 +160,7 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
161160
let op = self.inner.clone();
162161
let path = path.to_string();
163162
self.runtime
164-
.spawn(GLOBAL_TASK, async move { op.list(&path, args).await })
163+
.spawn(async move { op.list(&path, args).await })
165164
.await
166165
.expect("join must success")
167166
}
@@ -204,7 +203,7 @@ impl<R: oio::Read> oio::Read for RuntimeIO<R> {
204203
let runtime = self.runtime.clone();
205204
async move {
206205
runtime
207-
.spawn(GLOBAL_TASK, async move { r.read_at(offset, limit).await })
206+
.spawn(async move { r.read_at(offset, limit).await })
208207
.await
209208
.expect("join must success")
210209
}

src/meta/client/src/grpc_client.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use databend_common_base::runtime::ThreadTracker;
3939
use databend_common_base::runtime::TrackingPayload;
4040
use databend_common_base::runtime::TrySpawn;
4141
use databend_common_base::runtime::UnlimitedFuture;
42-
use databend_common_base::GLOBAL_TASK;
4342
use databend_common_grpc::ConnectionFactory;
4443
use databend_common_grpc::GrpcConnectionError;
4544
use databend_common_grpc::RpcClientConf;
@@ -440,14 +439,13 @@ impl MetaGrpcClient {
440439
rt: rt.clone(),
441440
});
442441

443-
rt.spawn(
444-
GLOBAL_TASK,
445-
UnlimitedFuture::create(Self::worker_loop(worker.clone(), rx)),
446-
);
447-
rt.spawn(
448-
GLOBAL_TASK,
449-
UnlimitedFuture::create(Self::auto_sync_endpoints(worker, one_tx)),
450-
);
442+
rt.spawn(UnlimitedFuture::create(Self::worker_loop(
443+
worker.clone(),
444+
rx,
445+
)));
446+
rt.spawn(UnlimitedFuture::create(Self::auto_sync_endpoints(
447+
worker, one_tx,
448+
)));
451449

452450
Ok(handle)
453451
}

src/query/pipeline/sinks/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ test = true
1313

1414
[dependencies]
1515
databend-common-base = { path = "../../../common/base" }
16-
databend-common-catalog = { path = "../../catalog" }
1716
databend-common-exception = { path = "../../../common/exception" }
1817
databend-common-expression = { path = "../../expression" }
1918
databend-common-pipeline-core = { path = "../core" }

src/query/pipeline/sinks/src/async_sink.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use async_trait::unboxed_simple;
2020
use databend_common_base::runtime::drop_guard;
2121
use databend_common_base::runtime::GlobalIORuntime;
2222
use databend_common_base::runtime::TrySpawn;
23-
use databend_common_catalog::table_context::TableContext;
2423
use databend_common_exception::Result;
2524
use databend_common_expression::DataBlock;
2625
use databend_common_pipeline_core::processors::Event;
@@ -53,21 +52,15 @@ pub struct AsyncSinker<T: AsyncSink + 'static> {
5352
inner: Option<T>,
5453
finished: bool,
5554
input: Arc<InputPort>,
56-
query_id: String,
5755
input_data: Option<DataBlock>,
5856
called_on_start: bool,
5957
called_on_finish: bool,
6058
}
6159

6260
impl<T: AsyncSink + 'static> AsyncSinker<T> {
63-
pub fn create(
64-
input: Arc<InputPort>,
65-
ctx: Arc<dyn TableContext>,
66-
inner: T,
67-
) -> Box<dyn Processor> {
61+
pub fn create(input: Arc<InputPort>, inner: T) -> Box<dyn Processor> {
6862
Box::new(AsyncSinker {
6963
input,
70-
query_id: ctx.get_id(),
7164
finished: false,
7265
input_data: None,
7366
inner: Some(inner),
@@ -82,7 +75,7 @@ impl<T: AsyncSink + 'static> Drop for AsyncSinker<T> {
8275
drop_guard(move || {
8376
if !self.called_on_start || !self.called_on_finish {
8477
if let Some(mut inner) = self.inner.take() {
85-
GlobalIORuntime::instance().spawn(self.query_id.clone(), {
78+
GlobalIORuntime::instance().spawn({
8679
let called_on_start = self.called_on_start;
8780
let called_on_finish = self.called_on_finish;
8881
async move {

src/query/pipeline/sinks/src/union_receive_sink.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::sync::Arc;
1717
use async_channel::Sender;
1818
use async_trait::async_trait;
1919
use async_trait::unboxed_simple;
20-
use databend_common_catalog::table_context::TableContext;
2120
use databend_common_exception::ErrorCode;
2221
use databend_common_exception::Result;
2322
use databend_common_expression::DataBlock;
@@ -32,12 +31,8 @@ pub struct UnionReceiveSink {
3231
}
3332

3433
impl UnionReceiveSink {
35-
pub fn create(
36-
sender: Option<Sender<DataBlock>>,
37-
input: Arc<InputPort>,
38-
ctx: Arc<dyn TableContext>,
39-
) -> Box<dyn Processor> {
40-
AsyncSinker::create(input, ctx, UnionReceiveSink { sender })
34+
pub fn create(tx: Option<Sender<DataBlock>>, input: Arc<InputPort>) -> Box<dyn Processor> {
35+
AsyncSinker::create(input, UnionReceiveSink { sender: tx })
4136
}
4237
}
4338

src/query/pipeline/sources/src/input_formats/input_pipeline.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub trait InputFormatPipe: Sized + Send + 'static {
117117
let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?);
118118
Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?;
119119

120-
GlobalIORuntime::instance().spawn(ctx.table_context.get_id(), async move {
120+
GlobalIORuntime::instance().spawn(async move {
121121
let mut sender: Option<Sender<Result<Self::ReadBatch>>> = None;
122122
while let Some(batch_result) = input.recv().await {
123123
match batch_result {
@@ -161,7 +161,7 @@ pub trait InputFormatPipe: Sized + Send + 'static {
161161
Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?;
162162

163163
let ctx_clone = ctx.clone();
164-
GlobalIORuntime::instance().spawn(ctx.table_context.get_id(), async move {
164+
GlobalIORuntime::instance().spawn(async move {
165165
debug!("start copy splits feeder");
166166
for s in &ctx_clone.splits {
167167
let (data_tx, data_rx) = tokio::sync::mpsc::channel(ctx.num_prefetch_per_split());

0 commit comments

Comments
 (0)