@@ -51,6 +51,7 @@ struct Client::Data {
51
51
State state = State::kNone ;
52
52
53
53
int cb_level = 0 ; // ! 回调层级
54
+ int reconnect_wait_remain_sec = 0 ; // ! 重连等待剩余时长
54
55
55
56
std::thread *sp_thread = nullptr ;
56
57
@@ -205,8 +206,9 @@ void Client::cleanup()
205
206
206
207
bool Client::start ()
207
208
{
208
- if (d_->state != State::kInited ) {
209
- LogWarn (" state != kInited" );
209
+ if (d_->state != State::kInited &&
210
+ d_->state != State::kDisconnected ) {
211
+ LogWarn (" state is not kInited or kDisconnected" );
210
212
return false ;
211
213
}
212
214
@@ -312,7 +314,8 @@ bool Client::start()
312
314
313
315
void Client::stop ()
314
316
{
315
- if (d_->state <= State::kInited )
317
+ if (d_->state <= State::kInited ||
318
+ d_->state == State::kDisconnected )
316
319
return ;
317
320
318
321
RECORD_SCOPE ();
@@ -400,14 +403,14 @@ void Client::onTimerTick()
400
403
mosquitto_loop_misc (d_->sp_mosq );
401
404
402
405
if (mosquitto_socket (d_->sp_mosq ) < 0 ) {
403
- LogInfo (" disconnected with broker, retry." );
404
- d_->state = State::kConnecting ;
406
+ LogNotice (" mosquitto_socket() < 0" );
407
+ handleDisconnectEvent ();
408
+
405
409
} else {
406
410
enableSocketWriteIfNeed ();
407
411
}
408
- }
409
412
410
- if (d_->state == State::kConnecting ) {
413
+ } else if (d_->state == State::kConnecting ) {
411
414
if (d_->sp_thread == nullptr ) {
412
415
auto is_alive = d_->alive_tag .get (); // ! 原理见Q1
413
416
d_->sp_thread = new thread (
@@ -425,6 +428,16 @@ void Client::onTimerTick()
425
428
}
426
429
);
427
430
}
431
+
432
+ } else if (d_->state == State::kReconnWaiting ) {
433
+ auto &remain_sec = d_->reconnect_wait_remain_sec ;
434
+ if (remain_sec > 0 ) {
435
+ --remain_sec;
436
+ if (remain_sec == 0 ) {
437
+ d_->state = State::kConnecting ;
438
+ LogDbg (" wait timeout, reconnect now" );
439
+ }
440
+ }
428
441
}
429
442
}
430
443
@@ -504,21 +517,8 @@ void Client::onConnected(int rc)
504
517
void Client::onDisconnected (int rc)
505
518
{
506
519
RECORD_SCOPE ();
507
- disableSocketRead ();
508
- disableSocketWrite ();
509
-
510
- LogInfo (" disconnected" );
511
-
512
- if (d_->state >= State::kTcpConnected ) {
513
- if (d_->state == State::kMqttConnected ) {
514
- ++d_->cb_level ;
515
- if (d_->callbacks .disconnected )
516
- d_->callbacks .disconnected ();
517
- --d_->cb_level ;
518
- }
519
- d_->state = State::kConnecting ;
520
- }
521
- (void )rc;
520
+ LogNotice (" disconnected, rc:%d" , rc);
521
+ handleDisconnectEvent ();
522
522
}
523
523
524
524
void Client::onPublish (int mid)
@@ -663,5 +663,39 @@ void Client::disableTimer()
663
663
d_->sp_timer_ev ->disable ();
664
664
}
665
665
666
+ void Client::handleDisconnectEvent ()
667
+ {
668
+ disableSocketRead ();
669
+ disableSocketWrite ();
670
+
671
+ if (d_->state >= State::kTcpConnected ) {
672
+ if (d_->state == State::kMqttConnected ) {
673
+ ++d_->cb_level ;
674
+ if (d_->callbacks .disconnected )
675
+ d_->callbacks .disconnected ();
676
+ --d_->cb_level ;
677
+ }
678
+
679
+ // ! 如果开启了自动重连
680
+ if (d_->config .auto_reconnect_enabled ) {
681
+ if (d_->config .auto_reconnect_wait_sec > 0 ) {
682
+ LogDbg (" reconnect after %d sec" , d_->reconnect_wait_remain_sec );
683
+ d_->reconnect_wait_remain_sec = d_->config .auto_reconnect_wait_sec ;
684
+ d_->state = State::kReconnWaiting ;
685
+
686
+ } else {
687
+ LogDbg (" reconnect now" );
688
+ d_->reconnect_wait_remain_sec = 0 ;
689
+ d_->state = State::kConnecting ;
690
+ }
691
+
692
+ } else { // ! 如果不需要自动重连
693
+ LogDbg (" no need reconnect" );
694
+ d_->state = State::kDisconnected ;
695
+ disableTimer ();
696
+ }
697
+ }
698
+ }
699
+
666
700
}
667
701
}
0 commit comments