Skip to content

Commit 44df0b0

Browse files
committed
Merge remote-tracking branch 'up/main' into range-shuffle
Signed-off-by: coldWater <forsaken628@gmail.com>
2 parents cef81b4 + 33dc48c commit 44df0b0

File tree

10 files changed

+166
-26
lines changed

10 files changed

+166
-26
lines changed

src/query/service/src/pipelines/executor/processor_async_task.rs

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use databend_common_pipeline_core::processors::ProcessorPtr;
3232
use futures_util::future::BoxFuture;
3333
use futures_util::future::Either;
3434
use futures_util::FutureExt;
35-
use log::error;
3635
use log::warn;
3736
use petgraph::prelude::NodeIndex;
3837

@@ -125,11 +124,9 @@ impl ProcessorAsyncTask {
125124
let processor_id = unsafe { processor.id() };
126125
let processor_name = unsafe { processor.name() };
127126
let queue_clone = queue.clone();
128-
let graph_clone = graph.clone();
129127
let inner = async move {
130128
let start = Instant::now();
131129
let mut inner = inner.boxed();
132-
let mut log_graph = false;
133130

134131
loop {
135132
let interval = Box::pin(sleep(Duration::from_secs(5)));
@@ -138,29 +135,10 @@ impl ProcessorAsyncTask {
138135
inner = right;
139136
let elapsed = start.elapsed();
140137
let active_workers = queue_clone.active_workers();
141-
match elapsed >= Duration::from_secs(200)
142-
&& active_workers == 0
143-
&& !log_graph
144-
{
145-
false => {
146-
warn!(
147-
"Very slow processor async task, query_id:{:?}, processor id: {:?}, name: {:?}, elapsed: {:?}, active sync workers: {:?}",
148-
query_id, processor_id, processor_name, elapsed, active_workers
149-
);
150-
}
151-
true => {
152-
log_graph = true;
153-
error!(
154-
"Very slow processor async task, query_id:{:?}, processor id: {:?}, name: {:?}, elapsed: {:?}, active sync workers: {:?}, {}",
155-
query_id,
156-
processor_id,
157-
processor_name,
158-
elapsed,
159-
active_workers,
160-
graph_clone.format_graph_nodes(false)
161-
);
162-
}
163-
};
138+
warn!(
139+
"Very slow processor async task, query_id:{:?}, processor id: {:?}, name: {:?}, elapsed: {:?}, active sync workers: {:?}",
140+
query_id, processor_id, processor_name, elapsed, active_workers
141+
);
164142
}
165143
Either::Right((res, _)) => {
166144
return res;

src/query/service/src/servers/admin/admin_service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ impl AdminService {
7979
"v1/queries/:query_id/profiling",
8080
get(super::v1::query_profiling::query_profiling_handler),
8181
)
82+
.at(
83+
"v1/queries/:query_id/graph_dump",
84+
get(super::v1::query_dump::running_query_dump),
85+
)
8286
.at("/debug/home", get(debug_home_handler))
8387
.at("/debug/pprof/profile", get(debug_pprof_handler))
8488
.at("/debug/async_tasks/dump", get(debug_dump_stack));

src/query/service/src/servers/admin/v1/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod cluster;
1616
pub mod config;
1717
pub mod instance_status;
1818
pub mod processes;
19+
pub mod query_dump;
1920
pub mod query_profiling;
2021
pub mod settings;
2122
pub mod stream_status;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
17+
use databend_common_config::GlobalConfig;
18+
use databend_common_exception::Result;
19+
use http::StatusCode;
20+
use poem::web::Json;
21+
use poem::web::Path;
22+
use poem::IntoResponse;
23+
24+
use crate::clusters::ClusterDiscovery;
25+
use crate::clusters::ClusterHelper;
26+
use crate::clusters::FlightParams;
27+
use crate::servers::flight::v1::actions::GET_RUNNING_QUERY_DUMP;
28+
29+
#[poem::handler]
30+
#[async_backtrace::framed]
31+
pub async fn running_query_dump(Path(query_id): Path<String>) -> poem::Result<impl IntoResponse> {
32+
#[derive(serde::Serialize)]
33+
struct QueryRunningGraphDump {
34+
query_id: String,
35+
graph_dump: HashMap<String, String>,
36+
}
37+
38+
let graph_dump = match get_running_query_dump(&query_id).await {
39+
Ok(graph_dump) => graph_dump,
40+
Err(cause) => {
41+
return Err(poem::Error::from_string(
42+
format!("Failed to fetch executor dump. cause: {cause}"),
43+
StatusCode::INTERNAL_SERVER_ERROR,
44+
))
45+
}
46+
};
47+
48+
Ok(Json(QueryRunningGraphDump {
49+
graph_dump,
50+
query_id: query_id.clone(),
51+
}))
52+
}
53+
54+
async fn get_running_query_dump(query_id: &str) -> Result<HashMap<String, String>> {
55+
let config = GlobalConfig::instance();
56+
let cluster = ClusterDiscovery::instance().discover(&config).await?;
57+
58+
let mut message = HashMap::with_capacity(cluster.nodes.len());
59+
60+
for node_info in &cluster.nodes {
61+
message.insert(node_info.id.clone(), query_id.to_owned());
62+
}
63+
64+
let flight_params = FlightParams {
65+
timeout: 60,
66+
retry_times: 3,
67+
retry_interval: 3,
68+
};
69+
cluster
70+
.do_action::<_, String>(GET_RUNNING_QUERY_DUMP, message, flight_params)
71+
.await
72+
}

src/query/service/src/servers/flight/v1/actions/flight_actions.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use serde::Deserialize;
2626
use serde::Serialize;
2727

2828
use crate::servers::flight::v1::actions::get_profile::get_profile;
29+
use crate::servers::flight::v1::actions::get_running_query_dump::get_running_query_dump;
2930
use crate::servers::flight::v1::actions::init_query_env::init_query_env;
3031
use crate::servers::flight::v1::actions::init_query_env::INIT_QUERY_ENV;
3132
use crate::servers::flight::v1::actions::init_query_fragments::init_query_fragments;
@@ -37,6 +38,7 @@ use crate::servers::flight::v1::actions::system_action::system_action;
3738
use crate::servers::flight::v1::actions::truncate_table::truncate_table;
3839
use crate::servers::flight::v1::actions::truncate_table::TRUNCATE_TABLE;
3940
use crate::servers::flight::v1::actions::GET_PROFILE;
41+
use crate::servers::flight::v1::actions::GET_RUNNING_QUERY_DUMP;
4042
use crate::servers::flight::v1::actions::INIT_QUERY_FRAGMENTS;
4143
use crate::servers::flight::v1::actions::KILL_QUERY;
4244
use crate::servers::flight::v1::actions::START_PREPARED_QUERY;
@@ -135,4 +137,5 @@ pub fn flight_actions() -> FlightActions {
135137
.action(SET_PRIORITY, set_priority)
136138
.action(SYSTEM_ACTION, system_action)
137139
.action(GET_PROFILE, get_profile)
140+
.action(GET_RUNNING_QUERY_DUMP, get_running_query_dump)
138141
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::ErrorCode;
16+
use databend_common_exception::Result;
17+
18+
use crate::servers::flight::v1::exchange::DataExchangeManager;
19+
use crate::sessions::SessionManager;
20+
21+
pub static GET_RUNNING_QUERY_DUMP: &str = "/actions/get_running_query_dump";
22+
23+
pub async fn get_running_query_dump(query_id: String) -> Result<String> {
24+
match SessionManager::instance().get_running_graph_dump(&query_id) {
25+
Ok(running_graph_dump) => Ok(running_graph_dump),
26+
Err(cause) => match cause.code() == ErrorCode::UNKNOWN_QUERY {
27+
false => Err(cause),
28+
true => DataExchangeManager::instance().get_running_query_graph_dump(&query_id),
29+
},
30+
}
31+
}

src/query/service/src/servers/flight/v1/actions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod flight_actions;
1616
mod get_profile;
17+
mod get_running_query_dump;
1718
mod init_query_env;
1819
mod init_query_fragments;
1920
mod kill_query;
@@ -30,6 +31,7 @@ use databend_common_settings::Settings;
3031
pub use flight_actions::flight_actions;
3132
pub use flight_actions::FlightActions;
3233
pub use get_profile::GET_PROFILE;
34+
pub use get_running_query_dump::GET_RUNNING_QUERY_DUMP;
3335
pub use init_query_env::INIT_QUERY_ENV;
3436
pub use init_query_fragments::init_query_fragments;
3537
pub use init_query_fragments::INIT_QUERY_FRAGMENTS;

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,27 @@ impl DataExchangeManager {
9393
GlobalInstance::get()
9494
}
9595

96+
pub fn get_running_query_graph_dump(&self, query_id: &str) -> Result<String> {
97+
let running_executor = {
98+
let queries_coordinator_guard = self.queries_coordinator.lock();
99+
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
100+
let Some(coordinator) = queries_coordinator.get(query_id) else {
101+
return Ok(format!("Unknown query {}", query_id));
102+
};
103+
104+
let Some(info) = &coordinator.info else {
105+
return Ok(format!("Unknown running query {}", query_id));
106+
};
107+
108+
info.query_executor.clone()
109+
};
110+
111+
Ok(match running_executor {
112+
None => format!("Unknown running query {}", query_id),
113+
Some(executor) => executor.get_inner().format_graph_nodes(),
114+
})
115+
}
116+
96117
pub fn get_query_ctx(&self, query_id: &str) -> Result<Arc<QueryContext>> {
97118
let queries_coordinator_guard = self.queries_coordinator.lock();
98119
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };

src/query/service/src/sessions/query_ctx_shared.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,13 @@ impl QueryContextShared {
678678
}
679679
}
680680

681+
pub fn get_executor_graph_dump(&self) -> String {
682+
match self.executor.read().upgrade() {
683+
None => String::new(),
684+
Some(executor) => executor.format_graph_nodes(),
685+
}
686+
}
687+
681688
pub fn get_query_profiles(&self) -> Vec<PlanProfile> {
682689
if let Some(executor) = self.executor.read().upgrade() {
683690
self.add_query_profiles(&executor.fetch_profiling(false));

src/query/service/src/sessions/session_mgr.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,27 @@ impl SessionManager {
373373
)))
374374
}
375375

376+
pub fn get_running_graph_dump(&self, query_id: &str) -> Result<String> {
377+
for weak_ptr in self.active_sessions_snapshot() {
378+
let Some(arc_session) = weak_ptr.upgrade() else {
379+
continue;
380+
};
381+
382+
let session_ctx = arc_session.session_ctx.as_ref();
383+
384+
if let Some(context_shared) = session_ctx.get_query_context_shared() {
385+
if query_id == *context_shared.init_query_id.as_ref().read() {
386+
return Ok(context_shared.get_executor_graph_dump());
387+
}
388+
}
389+
}
390+
391+
Err(ErrorCode::UnknownQuery(format!(
392+
"Unknown query {}",
393+
query_id
394+
)))
395+
}
396+
376397
fn active_sessions_snapshot(&self) -> Vec<Weak<Session>> {
377398
// Here the situation is the same of method `graceful_shutdown`:
378399
//

0 commit comments

Comments
 (0)