@@ -7,13 +7,7 @@ import (
7
7
"github.com/btcsuite/btclog"
8
8
)
9
9
10
- // queue is a fixed size queue with a sliding window that has a base and a top
11
- // modulo s.
12
- type queue struct {
13
- // content is the current content of the queue. This is always a slice
14
- // of length s but can contain nil elements if the queue isn't full.
15
- content []* PacketData
16
-
10
+ type queueCfg struct {
17
11
// s is the maximum sequence number used to label packets. Packets
18
12
// are labelled with incrementing sequence numbers modulo s.
19
13
// s must be strictly larger than the window size, n. This
@@ -23,6 +17,22 @@ type queue struct {
23
17
// no way to tell.
24
18
s uint8
25
19
20
+ timeout time.Duration
21
+
22
+ log btclog.Logger
23
+
24
+ sendPkt func (packet * PacketData ) error
25
+ }
26
+
27
+ // queue is a fixed size queue with a sliding window that has a base and a top
28
+ // modulo s.
29
+ type queue struct {
30
+ cfg * queueCfg
31
+
32
+ // content is the current content of the queue. This is always a slice
33
+ // of length s but can contain nil elements if the queue isn't full.
34
+ content []* PacketData
35
+
26
36
// sequenceBase keeps track of the base of the send window and so
27
37
// represents the next ack that we expect from the receiver. The
28
38
// maximum value of sequenceBase is s.
@@ -41,26 +51,18 @@ type queue struct {
41
51
// topMtx is used to guard sequenceTop.
42
52
topMtx sync.RWMutex
43
53
44
- lastResend time.Time
45
- handshakeTimeout time.Duration
46
-
47
- log btclog.Logger
54
+ lastResend time.Time
48
55
}
49
56
50
57
// newQueue creates a new queue.
51
- func newQueue (s uint8 , handshakeTimeout time.Duration ,
52
- logger btclog.Logger ) * queue {
53
-
54
- log := log
55
- if logger != nil {
56
- log = logger
58
+ func newQueue (cfg * queueCfg ) * queue {
59
+ if cfg .log == nil {
60
+ cfg .log = log
57
61
}
58
62
59
63
return & queue {
60
- content : make ([]* PacketData , s ),
61
- s : s ,
62
- handshakeTimeout : handshakeTimeout ,
63
- log : log ,
64
+ cfg : cfg ,
65
+ content : make ([]* PacketData , cfg .s ),
64
66
}
65
67
}
66
68
@@ -76,7 +78,7 @@ func (q *queue) size() uint8 {
76
78
return q .sequenceTop - q .sequenceBase
77
79
}
78
80
79
- return q .sequenceTop + (q .s - q .sequenceBase )
81
+ return q .sequenceTop + (q .cfg . s - q .sequenceBase )
80
82
}
81
83
82
84
// addPacket adds a new packet to the queue.
@@ -86,13 +88,13 @@ func (q *queue) addPacket(packet *PacketData) {
86
88
87
89
packet .Seq = q .sequenceTop
88
90
q .content [q .sequenceTop ] = packet
89
- q .sequenceTop = (q .sequenceTop + 1 ) % q .s
91
+ q .sequenceTop = (q .sequenceTop + 1 ) % q .cfg . s
90
92
}
91
93
92
94
// resend invokes the callback for each packet that needs to be re-sent.
93
- func (q * queue ) resend (cb func ( packet * PacketData ) error ) error {
94
- if time .Since (q .lastResend ) < q .handshakeTimeout {
95
- q .log .Tracef ("Resent the queue recently." )
95
+ func (q * queue ) resend () error {
96
+ if time .Since (q .lastResend ) < q .cfg . timeout {
97
+ q .cfg . log .Tracef ("Resent the queue recently." )
96
98
97
99
return nil
98
100
}
@@ -115,17 +117,17 @@ func (q *queue) resend(cb func(packet *PacketData) error) error {
115
117
return nil
116
118
}
117
119
118
- q .log .Tracef ("Resending the queue" )
120
+ q .cfg . log .Tracef ("Resending the queue" )
119
121
120
122
for base != top {
121
123
packet := q .content [base ]
122
124
123
- if err := cb (packet ); err != nil {
125
+ if err := q . cfg . sendPkt (packet ); err != nil {
124
126
return err
125
127
}
126
- base = (base + 1 ) % q .s
128
+ base = (base + 1 ) % q .cfg . s
127
129
128
- q .log .Tracef ("Resent %d" , packet .Seq )
130
+ q .cfg . log .Tracef ("Resent %d" , packet .Seq )
129
131
}
130
132
131
133
return nil
@@ -136,8 +138,8 @@ func (q *queue) processACK(seq uint8) bool {
136
138
137
139
// If our queue is empty, an ACK should not have any effect.
138
140
if q .size () == 0 {
139
- q .log .Tracef ("Received ack %d, but queue is empty. Ignoring." ,
140
- seq )
141
+ q .cfg . log .Tracef ("Received ack %d, but queue is empty. " +
142
+ "Ignoring." , seq )
141
143
142
144
return false
143
145
}
@@ -150,9 +152,9 @@ func (q *queue) processACK(seq uint8) bool {
150
152
// equal to the one we were expecting. So we increase our base
151
153
// accordingly and send a signal to indicate that the queue size
152
154
// has decreased.
153
- q .log .Tracef ("Received correct ack %d" , seq )
155
+ q .cfg . log .Tracef ("Received correct ack %d" , seq )
154
156
155
- q .sequenceBase = (q .sequenceBase + 1 ) % q .s
157
+ q .sequenceBase = (q .sequenceBase + 1 ) % q .cfg . s
156
158
157
159
// We did receive an ACK.
158
160
return true
@@ -162,7 +164,8 @@ func (q *queue) processACK(seq uint8) bool {
162
164
// This could be a duplicate ACK before or it could be that we just
163
165
// missed the ACK for the current base and this is actually an ACK for
164
166
// another packet in the queue.
165
- q .log .Tracef ("Received wrong ack %d, expected %d" , seq , q .sequenceBase )
167
+ q .cfg .log .Tracef ("Received wrong ack %d, expected %d" , seq ,
168
+ q .sequenceBase )
166
169
167
170
q .topMtx .RLock ()
168
171
defer q .topMtx .RUnlock ()
@@ -171,9 +174,10 @@ func (q *queue) processACK(seq uint8) bool {
171
174
// just missed a previous ACK. We can bump the base to be equal to this
172
175
// sequence number.
173
176
if containsSequence (q .sequenceBase , q .sequenceTop , seq ) {
174
- q .log .Tracef ("Sequence %d is in the queue. Bump the base." , seq )
177
+ q .cfg .log .Tracef ("Sequence %d is in the queue. Bump the base." ,
178
+ seq )
175
179
176
- q .sequenceBase = (seq + 1 ) % q .s
180
+ q .sequenceBase = (seq + 1 ) % q .cfg . s
177
181
178
182
// We did receive an ACK.
179
183
return true
@@ -191,7 +195,7 @@ func (q *queue) processNACK(seq uint8) (bool, bool) {
191
195
q .topMtx .RLock ()
192
196
defer q .topMtx .RUnlock ()
193
197
194
- q .log .Tracef ("Received NACK %d" , seq )
198
+ q .cfg . log .Tracef ("Received NACK %d" , seq )
195
199
196
200
// If the NACK is the same as sequenceTop, it probably means that queue
197
201
// was sent successfully, but we just missed the necessary ACKs. So we
0 commit comments