Skip to content

Commit 63ff86d

Browse files
authored
Merge pull request #9244 from sandflee/log2
chore(executor): add query id in slow log
2 parents 00f9f74 + e702220 commit 63ff86d

File tree

14 files changed

+41
-14
lines changed

14 files changed

+41
-14
lines changed

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,9 @@ impl QueryCoordinator {
552552
}
553553
}
554554

555-
let executor_settings = ExecutorSettings::try_create(&info.query_ctx.get_settings())?;
555+
let query_id = info.query_ctx.get_id();
556+
let executor_settings =
557+
ExecutorSettings::try_create(&info.query_ctx.get_settings(), query_id)?;
556558

557559
let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
558560

src/query/service/src/interpreters/interpreter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,9 @@ pub trait Interpreter: Sync + Send {
8585
});
8686

8787
let settings = ctx.get_settings();
88+
let query_id = ctx.get_id();
8889
build_res.set_max_threads(settings.get_max_threads()? as usize);
89-
let settings = ExecutorSettings::try_create(&settings)?;
90+
let settings = ExecutorSettings::try_create(&settings, query_id)?;
9091

9192
if build_res.main_pipeline.is_complete_pipeline()? {
9293
let mut pipelines = build_res.sources_pipelines;

src/query/service/src/interpreters/interpreter_delete.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ impl Interpreter for DeleteInterpreter {
7676
if !pipeline.pipes.is_empty() {
7777
let settings = self.ctx.get_settings();
7878
pipeline.set_max_threads(settings.get_max_threads()? as usize);
79-
let executor_settings = ExecutorSettings::try_create(&settings)?;
79+
let query_id = self.ctx.get_id();
80+
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
8081
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
8182

8283
self.ctx.set_executor(Arc::downgrade(&executor.get_inner()));

src/query/service/src/interpreters/interpreter_table_optimize.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ impl Interpreter for OptimizeTableInterpreter {
9393
{
9494
let settings = ctx.get_settings();
9595
pipeline.set_max_threads(settings.get_max_threads()? as usize);
96-
let executor_settings = ExecutorSettings::try_create(&settings)?;
96+
let query_id = ctx.get_id();
97+
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
9798
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
9899

99100
ctx.set_executor(Arc::downgrade(&executor.get_inner()));

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ impl Interpreter for ReclusterTableInterpreter {
8585

8686
pipeline.set_max_threads(settings.get_max_threads()? as usize);
8787

88-
let executor_settings = ExecutorSettings::try_create(&settings)?;
88+
let query_id = ctx.get_id();
89+
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
8990
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
9091

9192
ctx.set_executor(Arc::downgrade(&executor.get_inner()));

src/query/service/src/pipelines/executor/executor_settings.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,22 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::sync::Arc;
1516
use std::time::Duration;
1617

1718
use common_exception::Result;
1819
use common_settings::Settings;
1920

2021
pub struct ExecutorSettings {
22+
pub query_id: Arc<String>,
2123
pub max_execute_time: Duration,
2224
}
2325

2426
impl ExecutorSettings {
25-
pub fn try_create(settings: &Settings) -> Result<ExecutorSettings> {
27+
pub fn try_create(settings: &Settings, query_id: String) -> Result<ExecutorSettings> {
2628
let max_execute_time = settings.get_max_execute_time()?;
2729
Ok(ExecutorSettings {
30+
query_id: Arc::new(query_id),
2831
max_execute_time: Duration::from_millis(max_execute_time),
2932
})
3033
}

src/query/service/src/pipelines/executor/executor_worker_context.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,20 @@ pub enum ExecutorTask {
3838
}
3939

4040
pub struct ExecutorWorkerContext {
41+
query_id: Arc<String>,
4142
worker_num: usize,
4243
task: ExecutorTask,
4344
workers_condvar: Arc<WorkersCondvar>,
4445
}
4546

4647
impl ExecutorWorkerContext {
47-
pub fn create(worker_num: usize, workers_condvar: Arc<WorkersCondvar>) -> Self {
48+
pub fn create(
49+
worker_num: usize,
50+
workers_condvar: Arc<WorkersCondvar>,
51+
query_id: Arc<String>,
52+
) -> Self {
4853
ExecutorWorkerContext {
54+
query_id,
4955
worker_num,
5056
workers_condvar,
5157
task: ExecutorTask::None,
@@ -97,6 +103,7 @@ impl ExecutorWorkerContext {
97103
executor.async_runtime.spawn(TrackedFuture::create(
98104
ThreadTracker::fork(),
99105
ProcessorAsyncTask::create(
106+
self.query_id.clone(),
100107
worker_id,
101108
processor.clone(),
102109
tasks_queue,

src/query/service/src/pipelines/executor/pipeline_executor.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,11 @@ impl PipelineExecutor {
284284
/// Method is thread unsafe and require thread safe call
285285
pub unsafe fn execute_single_thread(&self, thread_num: usize) -> Result<()> {
286286
let workers_condvar = self.workers_condvar.clone();
287-
let mut context = ExecutorWorkerContext::create(thread_num, workers_condvar);
287+
let mut context = ExecutorWorkerContext::create(
288+
thread_num,
289+
workers_condvar,
290+
self.settings.query_id.clone(),
291+
);
288292

289293
while !self.global_tasks_queue.is_finished() {
290294
// When there are not enough tasks, the thread will be blocked, so we need loop check.

src/query/service/src/pipelines/executor/processor_async_task.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct ProcessorAsyncTask {
4242

4343
impl ProcessorAsyncTask {
4444
pub fn create<Inner: Future<Output = Result<()>> + Send + 'static>(
45+
query_id: Arc<String>,
4546
worker_id: usize,
4647
processor: ProcessorPtr,
4748
queue: Arc<ExecutorTasksQueue>,
@@ -73,7 +74,8 @@ impl ProcessorAsyncTask {
7374
Either::Left((_, right)) => {
7475
inner = right;
7576
tracing::warn!(
76-
"Very slow processor async task, processor id: {:?}, name: {:?}, elapsed: {:?}",
77+
"Very slow processor async task, query_id:{:?}, processor id: {:?}, name: {:?}, elapsed: {:?}",
78+
query_id,
7779
wraning_processor.id(),
7880
wraning_processor.name(),
7981
start.elapsed()

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ impl HttpQueryHandle {
342342
});
343343

344344
let query_ctx = ctx.clone();
345-
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings())?;
345+
let query_id = ctx.get_id();
346+
let executor_settings = ExecutorSettings::try_create(&ctx.get_settings(), query_id)?;
346347

347348
let run = move || -> Result<()> {
348349
let mut pipelines = build_res.sources_pipelines;

0 commit comments

Comments
 (0)