@@ -128,7 +128,10 @@ bool tx_svr::init()
128
128
if ( !tsvr_.init () ) {
129
129
return set_err_msg ( tsvr_.get_err_msg () );
130
130
}
131
- PC_LOG_INF (" listening" ).add (" port" ,tsvr_.get_port ()).end ();
131
+ PC_LOG_INF (" initialized" )
132
+ .add (" listen_port" ,tsvr_.get_port ())
133
+ .add (" rpc_host" , rhost )
134
+ .end ();
132
135
wait_conn_ = true ;
133
136
return true ;
134
137
}
@@ -241,15 +244,27 @@ void tx_svr::on_response( rpc::slot_subscribe *res )
241
244
avec_.clear ();
242
245
pub_key *pkey = nullptr ;
243
246
ip_addr iaddr;
244
- for ( uint64_t slot = slot_-1 ; slot <= slot_+4 ; ++slot ) {
247
+ uint64_t max_slot = std::min ( slot_+5 , lreq_->get_last_slot () );
248
+ for ( uint64_t slot = slot_-1 ; slot < max_slot; ++slot ) {
245
249
pub_key *ikey = lreq_->get_leader ( slot );
246
250
if ( ikey && ( !pkey || *ikey != *pkey) ) {
247
251
if ( creq_->get_ip_addr ( *ikey, iaddr ) ) {
248
252
add_addr ( iaddr );
249
- } else if ( creq_->get_is_recv () ) {
250
- PC_LOG_WRN ( " missing leader addr: get_cluster_nodes" )
251
- .add ( " slot" , slot ).end ();
252
- clnt_.send ( creq_ );
253
+ } else {
254
+ PC_LOG_WRN ( " missing leader addr" )
255
+ .add ( " leader" , *ikey )
256
+ .add ( " curr_slot" , slot )
257
+ .add ( " start_slot" , slot_-1 )
258
+ .add ( " end_slot" , slot_+4 )
259
+ .add ( " last_slot" , lreq_->get_last_slot () )
260
+ .end ();
261
+ if ( creq_->get_is_recv () ) {
262
+ clnt_.send ( creq_ );
263
+ }
264
+ if ( lreq_->get_is_recv () ) {
265
+ lreq_->set_slot ( slot_ - PC_LEADER_MIN );
266
+ clnt_.send ( lreq_ );
267
+ }
253
268
}
254
269
}
255
270
pkey = ikey;
@@ -265,19 +280,23 @@ void tx_svr::on_response( rpc::get_cluster_nodes *m )
265
280
if ( m->get_is_err () ) {
266
281
set_err_msg ( " failed to get cluster nodes["
267
282
+ m->get_err_msg () + " ]" );
283
+ m->reset_err ();
268
284
return ;
269
285
}
270
- PC_LOG_INF ( " received get_cluster_nodes" ).end ();
286
+ PC_LOG_DBG ( " received get_cluster_nodes" ).end ();
271
287
}
272
288
273
289
void tx_svr::on_response ( rpc::get_slot_leaders *m )
274
290
{
275
291
if ( m->get_is_err () ) {
276
292
set_err_msg ( " failed to get slot leaders ["
277
293
+ m->get_err_msg () + " ]" );
294
+ m->reset_err ();
278
295
return ;
279
296
}
280
- PC_LOG_DBG ( " received get_slot_leaders" ).end ();
297
+ PC_LOG_DBG ( " received get_slot_leaders" )
298
+ .add ( " curr_slot" , slot_ )
299
+ .add ( " last_slot" , m->get_last_slot () ).end ();
281
300
}
282
301
283
302
void tx_svr::on_response ( rpc::get_health *m )
@@ -286,6 +305,7 @@ void tx_svr::on_response( rpc::get_health *m )
286
305
PC_LOG_WRN ( " get_health error" )
287
306
.add ( " emsg" , m->get_err_msg () )
288
307
.end ();
308
+ m->reset_err ();
289
309
}
290
310
PC_LOG_DBG ( " health update" ).end ();
291
311
}
0 commit comments