@@ -89,13 +89,10 @@ func newGoBackNConn(ctx context.Context, cfg *config,
89
89
prefix := fmt .Sprintf ("(%s)" , loggerPrefix )
90
90
plog := build .NewPrefixLog (prefix , log )
91
91
92
- return & GoBackNConn {
93
- cfg : cfg ,
94
- recvDataChan : make (chan * PacketData , cfg .n ),
95
- sendDataChan : make (chan * PacketData ),
96
- sendQueue : newQueue (
97
- cfg .n + 1 , defaultHandshakeTimeout , plog ,
98
- ),
92
+ g := & GoBackNConn {
93
+ cfg : cfg ,
94
+ recvDataChan : make (chan * PacketData , cfg .n ),
95
+ sendDataChan : make (chan * PacketData ),
99
96
recvTimeout : DefaultRecvTimeout ,
100
97
sendTimeout : DefaultSendTimeout ,
101
98
receivedACKSignal : make (chan struct {}),
@@ -106,6 +103,17 @@ func newGoBackNConn(ctx context.Context, cfg *config,
106
103
log : plog ,
107
104
quit : make (chan struct {}),
108
105
}
106
+
107
+ g .sendQueue = newQueue (& queueCfg {
108
+ s : cfg .n + 1 ,
109
+ timeout : cfg .resendTimeout ,
110
+ log : plog ,
111
+ sendPkt : func (packet * PacketData ) error {
112
+ return g .sendPacket (g .ctx , packet )
113
+ },
114
+ })
115
+
116
+ return g
109
117
}
110
118
111
119
// setN sets the current N to use. This _must_ be set before the handshake is
@@ -114,7 +122,14 @@ func (g *GoBackNConn) setN(n uint8) {
114
122
g .cfg .n = n
115
123
g .cfg .s = n + 1
116
124
g .recvDataChan = make (chan * PacketData , n )
117
- g .sendQueue = newQueue (n + 1 , defaultHandshakeTimeout , g .log )
125
+ g .sendQueue = newQueue (& queueCfg {
126
+ s : n + 1 ,
127
+ timeout : g .cfg .resendTimeout ,
128
+ log : g .log ,
129
+ sendPkt : func (packet * PacketData ) error {
130
+ return g .sendPacket (g .ctx , packet )
131
+ },
132
+ })
118
133
}
119
134
120
135
// SetSendTimeout sets the timeout used in the Send function.
@@ -320,6 +335,8 @@ func (g *GoBackNConn) Close() error {
320
335
// initialisation.
321
336
g .cancel ()
322
337
338
+ g .sendQueue .stop ()
339
+
323
340
g .wg .Wait ()
324
341
325
342
if g .pingTicker != nil {
@@ -359,9 +376,28 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
359
376
func (g * GoBackNConn ) sendPacketsForever () error {
360
377
// resendQueue re-sends the current contents of the queue.
361
378
resendQueue := func () error {
362
- return g .sendQueue .resend (func (packet * PacketData ) error {
363
- return g .sendPacket (g .ctx , packet )
364
- })
379
+ err := g .sendQueue .resend ()
380
+ if err != nil {
381
+ return err
382
+ }
383
+
384
+ // After resending the queue, we reset the resend ticker.
385
+ // This is so that we don't immediately resend the queue again,
386
+ // if the sendQueue.resend call above took a long time to
387
+ // execute. That can happen if the function was awaiting the
388
+ // expected ACK for a long time, or times out while awaiting the
389
+ // catch up.
390
+ g .resendTicker .Reset (g .cfg .resendTimeout )
391
+
392
+ // Also drain the resend signal channel, as resendTicker.Reset
393
+ // doesn't drain the channel if the ticker ticked during the
394
+ // sendQueue.resend() call above.
395
+ select {
396
+ case <- g .resendTicker .C :
397
+ default :
398
+ }
399
+
400
+ return nil
365
401
}
366
402
367
403
for {
@@ -478,6 +514,8 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
478
514
g .pongTicker .Pause ()
479
515
}
480
516
517
+ g .resendTicker .Reset (g .cfg .resendTimeout )
518
+
481
519
switch m := msg .(type ) {
482
520
case * PacketData :
483
521
switch m .Seq == g .recvSeq {
@@ -526,9 +564,19 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
526
564
527
565
// If we recently sent a NACK for the same
528
566
// sequence number then back off.
529
- if lastNackSeq == g .recvSeq &&
530
- time .Since (lastNackTime ) <
531
- g .cfg .resendTimeout {
567
+ // We wait 2 times the resendTimeout before
568
+ // sending a new nack, as this case is likely
569
+ // hit if the sender is currently resending
570
+ // the queue, and therefore the threads that
571
+ // are resending the queue is likely busy with
572
+ // the resend, and therefore won't react to the
573
+ // NACK we send here in time.
574
+ sinceSent := time .Since (lastNackTime )
575
+ recentlySent := sinceSent <
576
+ g .cfg .resendTimeout * 2
577
+
578
+ if lastNackSeq == g .recvSeq && recentlySent {
579
+ g .log .Tracef ("Recently sent NACK" )
532
580
533
581
continue
534
582
}
@@ -552,8 +600,6 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
552
600
case * PacketACK :
553
601
gotValidACK := g .sendQueue .processACK (m .Seq )
554
602
if gotValidACK {
555
- g .resendTicker .Reset (g .cfg .resendTimeout )
556
-
557
603
// Send a signal to indicate that new
558
604
// ACKs have been received.
559
605
select {
@@ -569,15 +615,12 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
569
615
// sent was dropped, or maybe we sent a duplicate
570
616
// message. The NACK message contains the sequence
571
617
// number that the receiver was expecting.
572
- inQueue , bumped := g .sendQueue .processNACK (m .Seq )
573
-
574
- // If the NACK sequence number is not in our queue
575
- // then we ignore it. We must have received the ACK
576
- // for the sequence number in the meantime.
577
- if ! inQueue {
578
- g .log .Tracef ("NACK seq %d is not in the " +
579
- "queue. Ignoring" , m .Seq )
618
+ shouldResend , bumped := g .sendQueue .processNACK (m .Seq )
580
619
620
+ // If we don't need to resend the queue after processing
621
+ // the NACK, we can continue without sending the resend
622
+ // signal.
623
+ if ! shouldResend {
581
624
continue
582
625
}
583
626
@@ -606,6 +649,10 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
606
649
607
650
close (g .remoteClosed )
608
651
652
+ if g .cfg .onFIN != nil {
653
+ g .cfg .onFIN ()
654
+ }
655
+
609
656
return errTransportClosing
610
657
611
658
default :
0 commit comments