Skip to content

Commit 1ff87bb

Browse files
authored
chore(meta): tracking query id for meta log (#15497)
1 parent 6f6df79 commit 1ff87bb

File tree

3 files changed

+88
-34
lines changed

3 files changed

+88
-34
lines changed

src/common/base/src/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub use runtime::TrySpawn;
4949
pub use runtime::GLOBAL_TASK;
5050
pub use runtime_tracker::LimitMemGuard;
5151
pub use runtime_tracker::ThreadTracker;
52+
pub use runtime_tracker::TrackingGuard;
5253
pub use runtime_tracker::TrackingPayload;
5354
pub use runtime_tracker::UnlimitedFuture;
5455
pub use thread::Thread;

src/meta/client/src/grpc_client.rs

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::fmt::Debug;
1616
use std::fmt::Display;
1717
use std::fmt::Formatter;
18+
use std::str::FromStr;
1819
use std::sync::atomic::AtomicU64;
1920
use std::sync::atomic::Ordering;
2021
use std::sync::Arc;
@@ -34,6 +35,8 @@ use databend_common_base::containers::ItemManager;
3435
use databend_common_base::containers::Pool;
3536
use databend_common_base::future::TimingFutureExt;
3637
use databend_common_base::runtime::Runtime;
38+
use databend_common_base::runtime::ThreadTracker;
39+
use databend_common_base::runtime::TrackingPayload;
3740
use databend_common_base::runtime::TrySpawn;
3841
use databend_common_base::runtime::UnlimitedFuture;
3942
use databend_common_base::GLOBAL_TASK;
@@ -230,7 +233,7 @@ impl ItemManager for MetaChannelManager {
230233
/// The worker will be actually running in a dedicated runtime: `MetaGrpcClient.rt`.
231234
pub struct ClientHandle {
232235
/// For sending request to meta-client worker.
233-
pub(crate) req_tx: Sender<message::ClientWorkerRequest>,
236+
pub(crate) req_tx: Sender<(TrackingPayload, message::ClientWorkerRequest)>,
234237
/// Notify auto sync to stop.
235238
/// `oneshot::Receiver` impl `Drop` by sending a closed notification to the `Sender` half.
236239
#[allow(dead_code)]
@@ -268,12 +271,18 @@ impl ClientHandle {
268271

269272
grpc_metrics::incr_meta_grpc_client_request_inflight(1);
270273

271-
let res = self.req_tx.send(req).await.map_err(|e| {
272-
let cli_err = MetaClientError::ClientRuntimeError(
273-
AnyError::new(&e).add_context(|| "when sending req to MetaGrpcClient worker"),
274-
);
275-
cli_err.into()
276-
});
274+
let tracking_payload = ThreadTracker::new_tracking_payload();
275+
let res = self
276+
.req_tx
277+
.send((tracking_payload, req))
278+
.await
279+
.map_err(|e| {
280+
let cli_err = MetaClientError::ClientRuntimeError(
281+
AnyError::new(&e)
282+
.add_context(|| "when sending req to MetaGrpcClient worker"),
283+
);
284+
cli_err.into()
285+
});
277286

278287
if let Err(err) = res {
279288
grpc_metrics::incr_meta_grpc_client_request_inflight(-1);
@@ -445,19 +454,24 @@ impl MetaGrpcClient {
445454

446455
/// A worker runs a receiving-loop to accept user-request to metasrv and deals with request in the dedicated runtime.
447456
#[minitrace::trace]
448-
async fn worker_loop(self: Arc<Self>, mut req_rx: Receiver<message::ClientWorkerRequest>) {
457+
async fn worker_loop(
458+
self: Arc<Self>,
459+
mut req_rx: Receiver<(TrackingPayload, message::ClientWorkerRequest)>,
460+
) {
449461
info!("MetaGrpcClient::worker spawned");
450462

451463
loop {
452464
let recv_res = req_rx.recv().await;
453-
let worker_request = match recv_res {
465+
let (tracking_payload, worker_request) = match recv_res {
454466
None => {
455467
warn!("MetaGrpcClient handle closed. worker quit");
456468
return;
457469
}
458470
Some(x) => x,
459471
};
460472

473+
let _guard = ThreadTracker::tracking(tracking_payload);
474+
461475
debug!(worker_request :? =(&worker_request); "MetaGrpcClient worker handle request");
462476

463477
let span = Span::enter_with_parent(full_name!(), &worker_request.span);
@@ -1083,8 +1097,7 @@ impl MetaGrpcClient {
10831097
"MetaGrpcClient::transaction request"
10841098
);
10851099

1086-
let req: Request<TxnRequest> = Request::new(txn.clone());
1087-
let req = databend_common_tracing::inject_span_to_tonic_request(req);
1100+
let req = traced_req(txn.clone());
10881101

10891102
let mut client = self.make_established_client().await?;
10901103
let result = client.transaction(req).await;
@@ -1095,8 +1108,7 @@ impl MetaGrpcClient {
10951108
if status_is_retryable(&s) {
10961109
self.choose_next_endpoint();
10971110
let mut client = self.make_established_client().await?;
1098-
let req: Request<TxnRequest> = Request::new(txn);
1099-
let req = databend_common_tracing::inject_span_to_tonic_request(req);
1111+
let req = traced_req(txn);
11001112
let ret = client.transaction(req).await?.into_inner();
11011113
return Ok(ret);
11021114
} else {
@@ -1129,7 +1141,18 @@ impl MetaGrpcClient {
11291141
/// Inject span into a tonic request, so that on the remote peer the tracing context can be restored.
11301142
fn traced_req<T>(t: T) -> Request<T> {
11311143
let req = Request::new(t);
1132-
databend_common_tracing::inject_span_to_tonic_request(req)
1144+
let mut req = databend_common_tracing::inject_span_to_tonic_request(req);
1145+
1146+
if let Some(query_id) = ThreadTracker::query_id() {
1147+
let key = tonic::metadata::AsciiMetadataKey::from_str("QueryID");
1148+
let value = tonic::metadata::AsciiMetadataValue::from_str(query_id);
1149+
1150+
if let Some((key, value)) = key.ok().zip(value.ok()) {
1151+
req.metadata_mut().insert(key, value);
1152+
}
1153+
}
1154+
1155+
req
11331156
}
11341157

11351158
fn status_is_retryable(status: &Status) -> bool {

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use std::sync::Arc;
1919
use databend_common_arrow::arrow_format::flight::data::BasicAuth;
2020
use databend_common_base::base::tokio::sync::mpsc;
2121
use databend_common_base::future::TimingFutureExt;
22+
use databend_common_base::runtime::ThreadTracker;
23+
use databend_common_base::runtime::TrackingGuard;
2224
use databend_common_grpc::GrpcClaim;
2325
use databend_common_grpc::GrpcToken;
2426
use databend_common_meta_client::MetaGrpcReadReq;
@@ -281,15 +283,20 @@ impl MetaService for MetaServiceImpl {
281283
async fn kv_api(&self, request: Request<RaftRequest>) -> Result<Response<RaftReply>, Status> {
282284
self.check_token(request.metadata())?;
283285

284-
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
285-
let _guard = RequestInFlight::guard();
286+
let _guard = thread_tracking_guard(&request);
287+
ThreadTracker::tracking_future(async move {
288+
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
289+
let _guard = RequestInFlight::guard();
286290

287-
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
288-
let reply = self.handle_kv_api(request).in_span(root).await?;
291+
let root =
292+
databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
293+
let reply = self.handle_kv_api(request).in_span(root).await?;
289294

290-
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);
295+
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);
291296

292-
Ok(Response::new(reply))
297+
Ok(Response::new(reply))
298+
})
299+
.await
293300
}
294301

295302
type KvReadV1Stream = BoxStream<StreamItem>;
@@ -300,15 +307,20 @@ impl MetaService for MetaServiceImpl {
300307
) -> Result<Response<Self::KvReadV1Stream>, Status> {
301308
self.check_token(request.metadata())?;
302309

303-
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
304-
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
310+
let _guard = thread_tracking_guard(&request);
311+
ThreadTracker::tracking_future(async move {
312+
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
313+
let root =
314+
databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
305315

306-
let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?;
316+
let (endpoint, strm) = self.handle_kv_read_v1(request).in_span(root).await?;
307317

308-
let mut resp = Response::new(strm);
309-
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());
318+
let mut resp = Response::new(strm);
319+
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());
310320

311-
Ok(resp)
321+
Ok(resp)
322+
})
323+
.await
312324
}
313325

314326
async fn transaction(
@@ -317,18 +329,24 @@ impl MetaService for MetaServiceImpl {
317329
) -> Result<Response<TxnReply>, Status> {
318330
self.check_token(request.metadata())?;
319331

320-
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
321-
let _guard = RequestInFlight::guard();
332+
let _guard = thread_tracking_guard(&request);
322333

323-
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
324-
let (endpoint, reply) = self.handle_txn(request).in_span(root).await?;
334+
ThreadTracker::tracking_future(async move {
335+
network_metrics::incr_recv_bytes(request.get_ref().encoded_len() as u64);
336+
let _guard = RequestInFlight::guard();
325337

326-
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);
338+
let root =
339+
databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
340+
let (endpoint, reply) = self.handle_txn(request).in_span(root).await?;
327341

328-
let mut resp = Response::new(reply);
329-
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());
342+
network_metrics::incr_sent_bytes(reply.encoded_len() as u64);
330343

331-
Ok(resp)
344+
let mut resp = Response::new(reply);
345+
GrpcHelper::add_response_meta_leader(&mut resp, endpoint.as_ref());
346+
347+
Ok(resp)
348+
})
349+
.await
332350
}
333351

334352
type ExportStream = Pin<Box<dyn Stream<Item = Result<ExportedChunk, Status>> + Send + 'static>>;
@@ -483,3 +501,15 @@ impl MetaService for MetaServiceImpl {
483501
Err(Status::unavailable("can not get client ip address"))
484502
}
485503
}
504+
505+
fn thread_tracking_guard<T>(req: &tonic::Request<T>) -> Option<TrackingGuard> {
506+
if let Some(value) = req.metadata().get("QueryID") {
507+
if let Ok(value) = value.to_str() {
508+
let mut tracking_payload = ThreadTracker::new_tracking_payload();
509+
tracking_payload.query_id = Some(value.to_string());
510+
return Some(ThreadTracker::tracking(tracking_payload));
511+
}
512+
}
513+
514+
None
515+
}

0 commit comments

Comments
 (0)