Skip to content

Commit 768ef7a

Browse files
gbn: use the syncer in the gbn queue
1 parent 3b5967f commit 768ef7a

File tree

3 files changed

+183
-4
lines changed

3 files changed

+183
-4
lines changed

gbn/gbn_conn.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,8 @@ func (g *GoBackNConn) Close() error {
335335
// initialisation.
336336
g.cancel()
337337

338+
g.sendQueue.stop()
339+
338340
g.wg.Wait()
339341

340342
if g.pingTicker != nil {
@@ -374,7 +376,28 @@ func (g *GoBackNConn) sendPacket(ctx context.Context, msg Message) error {
374376
func (g *GoBackNConn) sendPacketsForever() error {
375377
// resendQueue re-sends the current contents of the queue.
376378
resendQueue := func() error {
377-
return g.sendQueue.resend()
379+
err := g.sendQueue.resend()
380+
if err != nil {
381+
return err
382+
}
383+
384+
// After resending the queue, we reset the resend ticker.
385+
// This is so that we don't immediately resend the queue again,
386+
// if the sendQueue.resend call above took a long time to
387+
// execute. That can happen if the function was awaiting the
388+
// expected ACK for a long time, or times out while awaiting the
389+
// catch up.
390+
g.resendTicker.Reset(g.cfg.resendTimeout)
391+
392+
// Also drain the resend signal channel, as resendTicker.Reset
393+
// doesn't drain the channel if the ticker ticked during the
394+
// sendQueue.resend() call above.
395+
select {
396+
case <-g.resendTicker.C:
397+
default:
398+
}
399+
400+
return nil
378401
}
379402

380403
for {

gbn/queue.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ type queue struct {
5151
// topMtx is used to guard sequenceTop.
5252
topMtx sync.RWMutex
5353

54+
syncer *syncer
55+
5456
lastResend time.Time
57+
58+
quit chan struct{}
5559
}
5660

5761
// newQueue creates a new queue.
@@ -60,10 +64,19 @@ func newQueue(cfg *queueCfg) *queue {
6064
cfg.log = log
6165
}
6266

63-
return &queue{
67+
q := &queue{
6468
cfg: cfg,
6569
content: make([]*PacketData, cfg.s),
70+
quit: make(chan struct{}),
6671
}
72+
73+
q.syncer = newSyncer(cfg.s, cfg.log, cfg.timeout, q.quit)
74+
75+
return q
76+
}
77+
78+
func (q *queue) stop() {
79+
close(q.quit)
6780
}
6881

6982
// size is used to calculate the current sender queueSize.
@@ -91,7 +104,9 @@ func (q *queue) addPacket(packet *PacketData) {
91104
q.sequenceTop = (q.sequenceTop + 1) % q.cfg.s
92105
}
93106

94-
// resend invokes the callback for each packet that needs to be re-sent.
107+
// resend resends the current contents of the queue. It allows some time for the
108+
// two parties to be seen as synced; this may fail in which case the caller is
109+
// expected to call resend again.
95110
func (q *queue) resend() error {
96111
if time.Since(q.lastResend) < q.cfg.timeout {
97112
q.cfg.log.Tracef("Resent the queue recently.")
@@ -117,6 +132,9 @@ func (q *queue) resend() error {
117132
return nil
118133
}
119134

135+
// Prepare the queue for awaiting the resend catch up.
136+
q.syncer.initResendUpTo(top)
137+
120138
q.cfg.log.Tracef("Resending the queue")
121139

122140
for base != top {
@@ -125,19 +143,22 @@ func (q *queue) resend() error {
125143
if err := q.cfg.sendPkt(packet); err != nil {
126144
return err
127145
}
146+
128147
base = (base + 1) % q.cfg.s
129148

130149
q.cfg.log.Tracef("Resent %d", packet.Seq)
131150
}
132151

152+
// Then wait until we know that both parties are in sync.
153+
q.syncer.waitForSync()
154+
133155
return nil
134156
}
135157

136158
// processACK processes an incoming ACK of a given sequence number. The function
137159
// returns true if the passed seq is an ACK for a packet we have sent but not
138160
// yet received an ACK for.
139161
func (q *queue) processACK(seq uint8) bool {
140-
141162
// If our queue is empty, an ACK should not have any effect.
142163
if q.size() == 0 {
143164
q.cfg.log.Tracef("Received ack %d, but queue is empty. "+
@@ -146,6 +167,8 @@ func (q *queue) processACK(seq uint8) bool {
146167
return false
147168
}
148169

170+
q.syncer.processACK(seq)
171+
149172
q.baseMtx.Lock()
150173
defer q.baseMtx.Unlock()
151174

@@ -205,6 +228,8 @@ func (q *queue) processNACK(seq uint8) (bool, bool) {
205228

206229
q.cfg.log.Tracef("Received NACK %d", seq)
207230

231+
q.syncer.processNACK(seq)
232+
208233
// If the NACK is the same as sequenceTop, it probably means that queue
209234
// was sent successfully, but due to latency we timed out and resent the
210235
// queue before we received the ACKs for the sent packages.

gbn/queue_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package gbn
22

33
import (
4+
"sync"
45
"testing"
6+
"time"
57

8+
"github.com/lightningnetwork/lnd/lntest/wait"
69
"github.com/stretchr/testify/require"
710
)
811

@@ -19,3 +22,131 @@ func TestQueueSize(t *testing.T) {
1922
q.sequenceTop = 2
2023
require.Equal(t, uint8(3), q.size())
2124
}
25+
26+
// TestQueueResend tests that the queue resend functionality works as expected.
27+
// It specifically tests that we actually resend packets, and await the expected
28+
// durations for cases when we resend and 1) don't receive the expected
29+
// ACK/NACK, 2) receive the expected ACK, and 3) receive the expected NACK.
30+
func TestQueueResend(t *testing.T) {
31+
t.Parallel()
32+
33+
resentPackets := make(map[uint8]struct{})
34+
queueTimeout := time.Second * 1
35+
36+
cfg := &queueCfg{
37+
s: 5,
38+
timeout: queueTimeout,
39+
sendPkt: func(packet *PacketData) error {
40+
resentPackets[packet.Seq] = struct{}{}
41+
42+
return nil
43+
},
44+
}
45+
q := newQueue(cfg)
46+
47+
pkt1 := &PacketData{Seq: 1}
48+
pkt2 := &PacketData{Seq: 2}
49+
pkt3 := &PacketData{Seq: 3}
50+
51+
q.addPacket(pkt1)
52+
53+
// First test that we shouldn't resend if the timeout hasn't passed.
54+
q.lastResend = time.Now()
55+
56+
err := q.resend()
57+
require.NoError(t, err)
58+
59+
require.Empty(t, resentPackets)
60+
61+
// Secondly, let's test that we do resend if the timeout has passed, and
62+
// that we then start a sync once we've resent the packet.
63+
q.lastResend = time.Now().Add(-queueTimeout * 2)
64+
65+
// Let's first test the syncing scenario where we don't receive
66+
// the expected ACK/NACK for the resent packet. This should trigger a
67+
// timeout of the syncing, which should be the
68+
// queueTimeout * awaitingTimeoutMultiplier.
69+
startTime := time.Now()
70+
71+
var wg sync.WaitGroup
72+
resend(t, q, &wg)
73+
74+
wg.Wait()
75+
76+
// Check that the resend took at least the
77+
// queueTimeout * awaitingTimeoutMultiplier for the syncing to
78+
// complete, and that we actually resent the packet.
79+
require.GreaterOrEqual(
80+
t, time.Since(startTime),
81+
queueTimeout*awaitingTimeoutMultiplier,
82+
)
83+
require.Contains(t, resentPackets, pkt1.Seq)
84+
85+
// Now let's test the syncing scenario where we do receive the
86+
// expected ACK for the resent packet. This should trigger a
87+
// queue.proceedAfterTime call, which should finish the syncing
88+
// after the queueTimeout.
89+
q.lastResend = time.Now().Add(-queueTimeout * 2)
90+
91+
q.addPacket(pkt2)
92+
93+
startTime = time.Now()
94+
95+
resend(t, q, &wg)
96+
97+
// Simulate that we receive the expected ACK for the resent packet.
98+
q.processACK(pkt2.Seq)
99+
100+
wg.Wait()
101+
102+
// Now check that the resend took at least the queueTimeout for the
103+
// syncing to complete, and that we actually resent the packet.
104+
require.GreaterOrEqual(t, time.Since(startTime), queueTimeout)
105+
require.LessOrEqual(t, time.Since(startTime), queueTimeout*2)
106+
require.Contains(t, resentPackets, pkt2.Seq)
107+
108+
// Finally, let's test the syncing scenario where we do receive the
109+
// expected NACK for the resent packet. This make the syncing
110+
// complete immediately.
111+
q.lastResend = time.Now().Add(-queueTimeout * 2)
112+
113+
q.addPacket(pkt3)
114+
115+
startTime = time.Now()
116+
resend(t, q, &wg)
117+
118+
// Simulate that we receive the expected NACK for the resent packet.
119+
q.processNACK(pkt3.Seq + 1)
120+
121+
wg.Wait()
122+
123+
// Finally let's check that we didn't await any timeout now, and that
124+
// we actually resent the packet.
125+
require.Less(t, time.Since(startTime), queueTimeout)
126+
require.Contains(t, resentPackets, pkt3.Seq)
127+
}
128+
129+
// resend is a helper function that resends packets in a goroutine, and notifies
130+
// the WaitGroup when the resend + syncing has completed.
131+
// The function will block until the resend has actually started.
132+
func resend(t *testing.T, q *queue, wg *sync.WaitGroup) {
133+
t.Helper()
134+
135+
wg.Add(1)
136+
137+
// This will trigger a sync, so we launch the resend in a
138+
// goroutine.
139+
go func() {
140+
err := q.resend()
141+
require.NoError(t, err)
142+
143+
wg.Done()
144+
}()
145+
146+
// We also ensure that the above goroutine is has started the resend
147+
// before this function returns.
148+
err := wait.Predicate(func() bool {
149+
return q.syncer.getState() == syncStateResending
150+
}, time.Second)
151+
require.NoError(t, err)
152+
}

0 commit comments

Comments
 (0)