@@ -18,6 +18,7 @@ use std::future::Future;
18
18
use std:: sync:: Arc ;
19
19
use std:: time:: Duration ;
20
20
21
+ use anyerror:: func_name;
21
22
use anyerror:: AnyError ;
22
23
use async_trait:: async_trait;
23
24
use backon:: BackoffBuilder ;
@@ -36,6 +37,7 @@ use databend_common_meta_types::protobuf::RaftRequest;
36
37
use databend_common_meta_types:: protobuf:: SnapshotChunkRequest ;
37
38
use databend_common_meta_types:: AppendEntriesRequest ;
38
39
use databend_common_meta_types:: AppendEntriesResponse ;
40
+ use databend_common_meta_types:: Endpoint ;
39
41
use databend_common_meta_types:: Fatal ;
40
42
use databend_common_meta_types:: GrpcConfig ;
41
43
use databend_common_meta_types:: GrpcHelper ;
@@ -191,29 +193,37 @@ pub struct NetworkConnection {
191
193
impl NetworkConnection {
192
194
#[ logcall:: logcall( err = "debug" ) ]
193
195
#[ minitrace:: trace]
194
- pub async fn make_client ( & self ) -> Result < RaftClient , Unreachable > {
196
+ pub async fn make_client ( & self ) -> Result < ( RaftClient , Endpoint ) , Unreachable > {
195
197
let target = self . target ;
196
198
197
199
let endpoint = self
198
200
. sto
199
201
. get_node_raft_endpoint ( & target)
200
202
. await
201
- . map_err ( |e| Unreachable :: new ( & e) ) ?;
203
+ . map_err ( |e| {
204
+ let any_err = AnyError :: new ( & e)
205
+ . add_context ( || format ! ( "{} target: {}" , func_name!( ) , self . target) ) ;
206
+ Unreachable :: new ( & any_err)
207
+ } ) ?;
202
208
203
209
let addr = format ! ( "http://{}" , endpoint) ;
204
210
205
211
debug ! ( id = self . id; "connect: target={}: {}" , target, addr) ;
206
212
207
213
match self . conn_pool . get ( & addr) . await {
208
214
Ok ( channel) => {
209
- let client = RaftClientApi :: new ( target, endpoint, channel) ;
215
+ let client = RaftClientApi :: new ( target, endpoint. clone ( ) , channel) ;
210
216
debug ! ( "connected: target={}: {}" , target, addr) ;
211
217
212
- Ok ( client)
218
+ Ok ( ( client, endpoint ) )
213
219
}
214
220
Err ( err) => {
215
221
raft_metrics:: network:: incr_connect_failure ( & target, & endpoint. to_string ( ) ) ;
216
- Err ( Unreachable :: new ( & err) )
222
+ let any_err = AnyError :: new ( & err) . add_context ( || {
223
+ format ! ( "{} target: {}, addr: {}" , func_name!( ) , self . target, addr)
224
+ } ) ;
225
+
226
+ Err ( Unreachable :: new ( & any_err) )
217
227
}
218
228
}
219
229
}
@@ -287,11 +297,22 @@ impl NetworkConnection {
287
297
}
288
298
289
299
/// Convert gRPC status to `RPCError`
290
- fn status_to_unreachable < E > ( & self , status : tonic:: Status ) -> RPCError < RaftError < E > >
291
- where E : std:: error:: Error {
292
- warn ! ( "target={}, gRPC error: {:?}" , self . target, status) ;
300
+ fn status_to_unreachable < E > (
301
+ & self ,
302
+ status : tonic:: Status ,
303
+ endpoint : Endpoint ,
304
+ ) -> RPCError < RaftError < E > >
305
+ where
306
+ E : std:: error:: Error ,
307
+ {
308
+ warn ! (
309
+ "target={}, endpoint={} gRPC error: {:?}" ,
310
+ self . target, endpoint, status
311
+ ) ;
293
312
294
- RPCError :: Unreachable ( Unreachable :: new ( & status) )
313
+ let any_err = AnyError :: new ( & status)
314
+ . add_context ( || format ! ( "gRPC target={}, endpoint={}" , self . target, endpoint) ) ;
315
+ RPCError :: Unreachable ( Unreachable :: new ( & any_err) )
295
316
}
296
317
}
297
318
@@ -310,7 +331,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
310
331
"send_append_entries" ,
311
332
) ;
312
333
313
- let mut client = self . make_client ( ) . await ?;
334
+ let ( mut client, endpoint ) = self . make_client ( ) . await ?;
314
335
315
336
let raft_req = self . new_append_entries_raft_req ( & rpc) ?;
316
337
let req = GrpcHelper :: traced_req ( raft_req) ;
@@ -327,7 +348,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
327
348
self . target, grpc_res
328
349
) ;
329
350
330
- let resp = grpc_res. map_err ( |e| self . status_to_unreachable ( e) ) ?;
351
+ let resp = grpc_res. map_err ( |e| self . status_to_unreachable ( e, endpoint ) ) ?;
331
352
332
353
let raft_res = GrpcHelper :: parse_raft_reply ( resp)
333
354
. map_err ( |serde_err| new_net_err ( & serde_err, || "parse append_entries reply" ) ) ?;
@@ -369,7 +390,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
369
390
370
391
let _g = snapshot_send_inflight ( self . target ) . counter_guard ( ) ;
371
392
372
- let mut client = self . make_client ( ) . await ?;
393
+ let ( mut client, endpoint ) = self . make_client ( ) . await ?;
373
394
374
395
let bytes = rpc. data . len ( ) as u64 ;
375
396
raft_metrics:: network:: incr_sendto_bytes ( & self . target , bytes) ;
@@ -422,7 +443,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
422
443
423
444
let resp = grpc_res. map_err ( |e| {
424
445
self . report_metrics_snapshot ( false ) ;
425
- self . status_to_unreachable ( e)
446
+ self . status_to_unreachable ( e, endpoint )
426
447
} ) ?;
427
448
428
449
let raft_res = GrpcHelper :: parse_raft_reply ( resp)
@@ -442,7 +463,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
442
463
) -> Result < VoteResponse , RPCError < RaftError > > {
443
464
info ! ( id = self . id, target = self . target, rpc = rpc. summary( ) ; "send_vote" ) ;
444
465
445
- let mut client = self . make_client ( ) . await ?;
466
+ let ( mut client, endpoint ) = self . make_client ( ) . await ?;
446
467
447
468
let raft_req = GrpcHelper :: encode_raft_request ( & rpc) . map_err ( |e| Unreachable :: new ( & e) ) ?;
448
469
@@ -454,7 +475,7 @@ impl RaftNetwork<TypeConfig> for NetworkConnection {
454
475
let grpc_res = client. vote ( req) . await ;
455
476
info ! ( "vote: resp from target={} {:?}" , self . target, grpc_res) ;
456
477
457
- let resp = grpc_res. map_err ( |e| self . status_to_unreachable ( e) ) ?;
478
+ let resp = grpc_res. map_err ( |e| self . status_to_unreachable ( e, endpoint ) ) ?;
458
479
459
480
let raft_res = GrpcHelper :: parse_raft_reply ( resp)
460
481
. map_err ( |serde_err| new_net_err ( & serde_err, || "parse vote reply" ) ) ?;
0 commit comments