Skip to content

Commit 50db412

Browse files
gbn: use timeout manager to handle timeouts
This commit moves the responsibility of managing the timeout values throughout the gbn package to the `TimeoutManager` struct. Even though the `TimeoutManager` is now used to manage timeouts, the resend timeout is not yet set dynamically for the connections of the gbn package. Support and usage of that functionality will added in the upcoming commits.
1 parent 6000041 commit 50db412

File tree

7 files changed

+88
-89
lines changed

7 files changed

+88
-89
lines changed

gbn/gbn_client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ handshake:
128128
default:
129129
}
130130

131+
timeout := g.timeoutManager.GetHandshakeTimeout()
132+
131133
var b []byte
132134
select {
133-
case <-time.After(g.cfg.handshakeTimeout):
135+
case <-time.After(timeout):
134136
g.log.Debugf("SYN resendTimeout. Resending " +
135137
"SYN.")
136138

gbn/gbn_conn.go

Lines changed: 50 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"fmt"
77
"io"
8-
"math"
98
"sync"
109
"time"
1110

@@ -41,10 +40,6 @@ type GoBackNConn struct {
4140
recvDataChan chan *PacketData
4241
sendDataChan chan *PacketData
4342

44-
sendTimeout time.Duration
45-
recvTimeout time.Duration
46-
timeoutsMu sync.RWMutex
47-
4843
log btclog.Logger
4944

5045
// receivedACKSignal channel is used to signal that the queue size has
@@ -65,6 +60,10 @@ type GoBackNConn struct {
6560
// remoteClosed is closed if the remote party initiated the FIN sequence.
6661
remoteClosed chan struct{}
6762

63+
// timeoutManager is used to manage all the timeouts used by the
64+
// GoBackNConn.
65+
timeoutManager *TimeoutManager
66+
6867
// quit is used to stop the normal operations of the connection.
6968
// Once closed, the send and receive streams will still be available
7069
// for the FIN sequence.
@@ -84,63 +83,62 @@ func newGoBackNConn(ctx context.Context, cfg *config,
8483
prefix := fmt.Sprintf("(%s)", loggerPrefix)
8584
plog := build.NewPrefixLog(prefix, log)
8685

86+
timeoutManager := NewTimeOutManager(plog)
87+
8788
g := &GoBackNConn{
8889
cfg: cfg,
8990
recvDataChan: make(chan *PacketData, cfg.n),
9091
sendDataChan: make(chan *PacketData),
91-
recvTimeout: DefaultRecvTimeout,
92-
sendTimeout: DefaultSendTimeout,
9392
receivedACKSignal: make(chan struct{}),
9493
resendSignal: make(chan struct{}, 1),
9594
remoteClosed: make(chan struct{}),
9695
ctx: ctxc,
9796
cancel: cancel,
9897
log: plog,
9998
quit: make(chan struct{}),
99+
timeoutManager: timeoutManager,
100100
}
101101

102-
g.sendQueue = newQueue(&queueCfg{
103-
s: cfg.n + 1,
104-
timeout: cfg.resendTimeout,
105-
log: plog,
106-
sendPkt: func(packet *PacketData) error {
107-
return g.sendPacket(g.ctx, packet)
102+
g.sendQueue = newQueue(
103+
&queueCfg{
104+
s: cfg.n + 1,
105+
log: plog,
106+
sendPkt: func(packet *PacketData) error {
107+
return g.sendPacket(g.ctx, packet)
108+
},
108109
},
109-
})
110+
timeoutManager,
111+
)
110112

111113
return g
112114
}
113115

114-
// setN sets the current N to use. This _must_ be set before the handshake is
115-
// completed.
116-
func (g *GoBackNConn) setN(n uint8) {
117-
g.cfg.n = n
118-
g.cfg.s = n + 1
119-
g.recvDataChan = make(chan *PacketData, n)
120-
g.sendQueue = newQueue(&queueCfg{
121-
s: n + 1,
122-
timeout: g.cfg.resendTimeout,
123-
log: g.log,
124-
sendPkt: func(packet *PacketData) error {
125-
return g.sendPacket(g.ctx, packet)
126-
},
127-
})
128-
}
129-
130116
// SetSendTimeout sets the timeout used in the Send function.
131117
func (g *GoBackNConn) SetSendTimeout(timeout time.Duration) {
132-
g.timeoutsMu.Lock()
133-
defer g.timeoutsMu.Unlock()
134-
135-
g.sendTimeout = timeout
118+
g.timeoutManager.SetSendTimeout(timeout)
136119
}
137120

138121
// SetRecvTimeout sets the timeout used in the Recv function.
139122
func (g *GoBackNConn) SetRecvTimeout(timeout time.Duration) {
140-
g.timeoutsMu.Lock()
141-
defer g.timeoutsMu.Unlock()
123+
g.timeoutManager.SetRecvTimeout(timeout)
124+
}
142125

143-
g.recvTimeout = timeout
126+
// setN sets the current N to use. This _must_ be set before the handshake is
127+
// completed.
128+
func (g *GoBackNConn) setN(n uint8) {
129+
g.cfg.n = n
130+
g.cfg.s = n + 1
131+
g.recvDataChan = make(chan *PacketData, n)
132+
g.sendQueue = newQueue(
133+
&queueCfg{
134+
s: n + 1,
135+
log: g.log,
136+
sendPkt: func(packet *PacketData) error {
137+
return g.sendPacket(g.ctx, packet)
138+
},
139+
},
140+
g.timeoutManager,
141+
)
144142
}
145143

146144
// Send blocks until an ack is received for the packet sent N packets before.
@@ -152,9 +150,7 @@ func (g *GoBackNConn) Send(data []byte) error {
152150
default:
153151
}
154152

155-
g.timeoutsMu.RLock()
156-
ticker := time.NewTimer(g.sendTimeout)
157-
g.timeoutsMu.RUnlock()
153+
ticker := time.NewTimer(g.timeoutManager.GetSendTimeout())
158154
defer ticker.Stop()
159155

160156
sendPacket := func(packet *PacketData) error {
@@ -216,9 +212,7 @@ func (g *GoBackNConn) Recv() ([]byte, error) {
216212
msg *PacketData
217213
)
218214

219-
g.timeoutsMu.RLock()
220-
ticker := time.NewTimer(g.recvTimeout)
221-
g.timeoutsMu.RUnlock()
215+
ticker := time.NewTimer(g.timeoutManager.GetRecvTimeout())
222216
defer ticker.Stop()
223217

224218
for {
@@ -245,22 +239,16 @@ func (g *GoBackNConn) Recv() ([]byte, error) {
245239
func (g *GoBackNConn) start() {
246240
g.log.Debugf("Starting")
247241

248-
pingTime := time.Duration(math.MaxInt64)
249-
if g.cfg.pingTime != 0 {
250-
pingTime = g.cfg.pingTime
251-
}
252-
253-
g.pingTicker = NewIntervalAwareForceTicker(pingTime)
242+
g.pingTicker = NewIntervalAwareForceTicker(
243+
g.timeoutManager.GetPingTime(),
244+
)
254245
g.pingTicker.Resume()
255246

256-
pongTime := time.Duration(math.MaxInt64)
257-
if g.cfg.pongTime != 0 {
258-
pongTime = g.cfg.pongTime
259-
}
260-
261-
g.pongTicker = NewIntervalAwareForceTicker(pongTime)
247+
g.pongTicker = NewIntervalAwareForceTicker(
248+
g.timeoutManager.GetPongTime(),
249+
)
262250

263-
g.resendTicker = time.NewTicker(g.cfg.resendTimeout)
251+
g.resendTicker = time.NewTicker(g.timeoutManager.GetResendTimeout())
264252

265253
g.wg.Add(1)
266254
go func() {
@@ -317,7 +305,7 @@ func (g *GoBackNConn) Close() error {
317305
g.log.Tracef("Try sending FIN")
318306

319307
ctxc, cancel := context.WithTimeout(
320-
g.ctx, defaultFinSendTimeout,
308+
g.ctx, g.timeoutManager.GetFinSendTimeout(),
321309
)
322310
defer cancel()
323311
if err := g.sendPacket(ctxc, &PacketFIN{}); err != nil {
@@ -382,7 +370,7 @@ func (g *GoBackNConn) sendPacketsForever() error {
382370
// execute. That can happen if the function was awaiting the
383371
// expected ACK for a long time, or times out while awaiting the
384372
// catch up.
385-
g.resendTicker.Reset(g.cfg.resendTimeout)
373+
g.resendTicker.Reset(g.timeoutManager.GetResendTimeout())
386374

387375
// Also drain the resend signal channel, as resendTicker.Reset
388376
// doesn't drain the channel if the ticker ticked during the
@@ -509,7 +497,7 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
509497
g.pongTicker.Pause()
510498
}
511499

512-
g.resendTicker.Reset(g.cfg.resendTimeout)
500+
g.resendTicker.Reset(g.timeoutManager.GetResendTimeout())
513501

514502
switch m := msg.(type) {
515503
case *PacketData:
@@ -567,8 +555,9 @@ func (g *GoBackNConn) receivePacketsForever() error { // nolint:gocyclo
567555
// the resend, and therefore won't react to the
568556
// NACK we send here in time.
569557
sinceSent := time.Since(lastNackTime)
570-
recentlySent := sinceSent <
571-
g.cfg.resendTimeout*2
558+
559+
timeout := g.timeoutManager.GetResendTimeout()
560+
recentlySent := sinceSent < timeout*2
572561

573562
if lastNackSeq == g.recvSeq && recentlySent {
574563
g.log.Tracef("Recently sent NACK")

gbn/gbn_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (g *GoBackNConn) serverHandshake() error { // nolint:gocyclo
143143
}
144144

145145
select {
146-
case <-time.After(g.cfg.handshakeTimeout):
146+
case <-time.After(g.timeoutManager.GetHandshakeTimeout()):
147147
g.log.Debugf("SYNCACK resendTimeout. Abort and wait " +
148148
"for client to re-initiate")
149149
continue

gbn/queue.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ type queueCfg struct {
1717
// no way to tell.
1818
s uint8
1919

20-
timeout time.Duration
21-
2220
log btclog.Logger
2321

2422
sendPkt func(packet *PacketData) error
@@ -29,6 +27,8 @@ type queueCfg struct {
2927
type queue struct {
3028
cfg *queueCfg
3129

30+
timeoutManager *TimeoutManager
31+
3232
// content is the current content of the queue. This is always a slice
3333
// of length s but can contain nil elements if the queue isn't full.
3434
content []*PacketData
@@ -59,18 +59,19 @@ type queue struct {
5959
}
6060

6161
// newQueue creates a new queue.
62-
func newQueue(cfg *queueCfg) *queue {
62+
func newQueue(cfg *queueCfg, timeoutManager *TimeoutManager) *queue {
6363
if cfg.log == nil {
6464
cfg.log = log
6565
}
6666

6767
q := &queue{
68-
cfg: cfg,
69-
content: make([]*PacketData, cfg.s),
70-
quit: make(chan struct{}),
68+
cfg: cfg,
69+
content: make([]*PacketData, cfg.s),
70+
quit: make(chan struct{}),
71+
timeoutManager: timeoutManager,
7172
}
7273

73-
q.syncer = newSyncer(cfg.s, cfg.log, cfg.timeout, q.quit)
74+
q.syncer = newSyncer(cfg.s, cfg.log, timeoutManager, q.quit)
7475

7576
return q
7677
}
@@ -108,7 +109,7 @@ func (q *queue) addPacket(packet *PacketData) {
108109
// two parties to be seen as synced; this may fail in which case the caller is
109110
// expected to call resend again.
110111
func (q *queue) resend() error {
111-
if time.Since(q.lastResend) < q.cfg.timeout {
112+
if time.Since(q.lastResend) < q.timeoutManager.GetHandshakeTimeout() {
112113
q.cfg.log.Tracef("Resent the queue recently.")
113114

114115
return nil

gbn/queue_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
func TestQueueSize(t *testing.T) {
13-
q := newQueue(&queueCfg{s: 4})
13+
q := newQueue(&queueCfg{s: 4}, NewTimeOutManager(nil))
1414

1515
require.Equal(t, uint8(0), q.size())
1616

@@ -33,16 +33,18 @@ func TestQueueResend(t *testing.T) {
3333
resentPackets := make(map[uint8]struct{})
3434
queueTimeout := time.Second * 1
3535

36+
tm := NewTimeOutManager(nil)
37+
tm.resendTimeout = queueTimeout
38+
3639
cfg := &queueCfg{
37-
s: 5,
38-
timeout: queueTimeout,
40+
s: 5,
3941
sendPkt: func(packet *PacketData) error {
4042
resentPackets[packet.Seq] = struct{}{}
4143

4244
return nil
4345
},
4446
}
45-
q := newQueue(cfg)
47+
q := newQueue(cfg, tm)
4648

4749
pkt1 := &PacketData{Seq: 1}
4850
pkt2 := &PacketData{Seq: 2}

gbn/syncer.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ const (
9999
// When either of the 3 conditions above are met, we will consider both parties
100100
// to be in sync.
101101
type syncer struct {
102-
s uint8
103-
log btclog.Logger
104-
timeout time.Duration
102+
s uint8
103+
log btclog.Logger
104+
timeoutManager *TimeoutManager
105105

106106
state syncState
107107

@@ -127,20 +127,20 @@ type syncer struct {
127127
}
128128

129129
// newSyncer creates a new syncer instance.
130-
func newSyncer(s uint8, prefixLogger btclog.Logger, timeout time.Duration,
131-
quit chan struct{}) *syncer {
130+
func newSyncer(s uint8, prefixLogger btclog.Logger,
131+
timeoutManager *TimeoutManager, quit chan struct{}) *syncer {
132132

133133
if prefixLogger == nil {
134134
prefixLogger = log
135135
}
136136

137137
return &syncer{
138-
s: s,
139-
log: prefixLogger,
140-
timeout: timeout,
141-
state: syncStateIdle,
142-
cancel: make(chan struct{}),
143-
quit: quit,
138+
s: s,
139+
log: prefixLogger,
140+
timeoutManager: timeoutManager,
141+
state: syncStateIdle,
142+
cancel: make(chan struct{}),
143+
quit: quit,
144144
}
145145
}
146146

@@ -210,7 +210,9 @@ func (c *syncer) waitForSync() {
210210
case <-c.cancel:
211211
c.log.Tracef("sync canceled or reset")
212212

213-
case <-time.After(c.timeout * awaitingTimeoutMultiplier):
213+
case <-time.After(
214+
c.timeoutManager.GetResendTimeout() * awaitingTimeoutMultiplier,
215+
):
214216
c.log.Tracef("Timed out while waiting for sync")
215217
}
216218

@@ -291,7 +293,7 @@ func (c *syncer) proceedAfterTime() {
291293

292294
return
293295

294-
case <-time.After(c.timeout):
296+
case <-time.After(c.timeoutManager.GetResendTimeout()):
295297
c.mu.Lock()
296298
defer c.mu.Unlock()
297299

gbn/syncer_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ func TestSyncer(t *testing.T) {
1919
syncTimeout := time.Second * 1
2020
expectedNACK := uint8(3)
2121

22-
syncer := newSyncer(5, nil, syncTimeout, make(chan struct{}))
22+
tm := NewTimeOutManager(nil)
23+
tm.resendTimeout = syncTimeout
24+
25+
syncer := newSyncer(5, nil, tm, make(chan struct{}))
2326

2427
// Let's first test the scenario where we don't receive the expected
2528
// ACK/NACK after initiating the resend. This should trigger a timeout

0 commit comments

Comments
 (0)