|
| 1 | +package gbn |
| 2 | + |
| 3 | +import ( |
| 4 | + "sync" |
| 5 | + "time" |
| 6 | + |
| 7 | + "github.com/btcsuite/btclog" |
| 8 | +) |
| 9 | + |
| 10 | +const ( |
| 11 | + // awaitingTimeoutMultiplier defines the multiplier we use when |
| 12 | + // multiplying the resend timeout, resulting in the duration we wait for |
| 13 | + // the sync to be complete before timing out. |
| 14 | + // We set this to 3X the resend timeout. The reason we wait exactly 3X |
| 15 | + // the resend timeout is that we expect that the max time that the |
| 16 | + // correct behavior would take, would be: |
| 17 | + // * 1X the resendTimeout for the time it would take for the party |
| 18 | + // respond with an ACK for the last packet in the resend queue, i.e. the |
| 19 | + // expectedACK. |
| 20 | + // * 1X the resendTimeout while waiting in proceedAfterTime before |
| 21 | + // completing the sync. |
| 22 | + // * 1X extra resendTimeout as buffer, to ensure that we have enough |
| 23 | + // time to process the ACKS/NACKS by other party + some extra margin. |
| 24 | + awaitingTimeoutMultiplier = 3 |
| 25 | +) |
| 26 | + |
| 27 | +type syncState uint8 |
| 28 | + |
| 29 | +const ( |
| 30 | + // syncStateIdle is the state representing that the syncer is idle and |
| 31 | + // has not yet initiated a resend sync. |
| 32 | + syncStateIdle syncState = iota |
| 33 | + |
| 34 | + // syncStateResending is the state representing that the syncer has |
| 35 | + // initiated a resend sync, and is awaiting that the sync is completed. |
| 36 | + syncStateResending |
| 37 | +) |
| 38 | + |
| 39 | +// syncer is used to ensure that both the sender and the receiver are in sync |
| 40 | +// before the waitForSync function is completed. This is done by waiting until |
| 41 | +// we receive either the expected ACK or NACK after resending the queue. |
| 42 | +// |
| 43 | +// To understand why we need to wait for the expected ACK/NACK after resending |
| 44 | +// the queue, it ensures that we don't end up in a situation where we resend the |
| 45 | +// queue over and over again due to latency and delayed NACKs by the other |
| 46 | +// party. |
| 47 | +// |
| 48 | +// Consider the following scenario: |
| 49 | +// 1. |
| 50 | +// Alice sends packets 1, 2, 3 & 4 to Bob. |
| 51 | +// 2. |
| 52 | +// Bob receives packets 1, 2, 3 & 4, and sends back the respective ACKs. |
| 53 | +// 3. |
| 54 | +// Alice receives ACKs for packets 1 & 2, but due to latency the ACKs for |
| 55 | +// packets 3 & 4 are delayed and aren't received until Alice resend timeout |
| 56 | +// has passed, which leads to Alice resending packets 3 & 4. Alice will after |
| 57 | +// that receive the delayed ACKs for packets 3 & 4, but will consider that as |
| 58 | +// the ACKs for the resent packets, and not the original packets which they were |
| 59 | +// actually sent for. If we didn't wait after resending the queue, Alice would |
| 60 | +// then proceed to send more packets (5 & 6). |
| 61 | +// 4. |
| 62 | +// When Bob receives the resent packets 3 & 4, Bob will respond with NACK 5. Due |
| 63 | +// to latency, the packets 5 & 6 that Alice sent in step (3) above will then be |
| 64 | +// received by Bob, and be processed as the correct response to the NACK 5. Bob |
| 65 | +// will after that await packet 7. |
| 66 | +// 5. |
| 67 | +// Alice will receive the NACK 5, and now resend packets 5 & 6. But as Bob is |
| 68 | +// now awaiting packet 7, this send will lead to a NACK 7. But due to latency, |
| 69 | +// if Alice doesn't wait resending the queue, Alice will proceed to send new |
| 70 | +// packet(s) before receiving the NACK 7. |
| 71 | +// 6. |
| 72 | +// This resend loop would continue indefinitely, so we need to ensure that Alice |
| 73 | +// waits after she has resent the queue, to ensure that she doesn't proceed to |
| 74 | +// send new packets before she is sure that both parties are in sync. |
| 75 | +// |
| 76 | +// To ensure that we are in sync, after we have resent the queue, we will await |
| 77 | +// that we either: |
| 78 | +// 1. Receive a NACK for the sequence number succeeding the last packet in the |
| 79 | +// resent queue i.e. in step (3) above, that would be NACK 5. |
| 80 | +// OR |
| 81 | +// 2. Receive an ACK for the last packet in the resent queue i.e. in step (3) |
| 82 | +// above, that would be ACK 4. After we receive the expected ACK, we will then |
| 83 | +// wait for the duration of the resend timeout before continuing. The reason why |
| 84 | +// we wait for the resend timeout before continuing, is that the ACKs we are |
| 85 | +// getting after a resend, could be delayed ACKs for the original packets we |
| 86 | +// sent, and not ACKs for the resent packets. In step (3) above, the ACKs for |
| 87 | +// packets 3 & 4 that Alice received were delayed ACKs for the original packets. |
| 88 | +// If Alice would have immediately continued to send new packets (5 & 6) after |
| 89 | +// receiving the ACK 4, she would have then received the NACK 5 from Bob which |
| 90 | +// was the actual response to the resent queue. But as Alice had already |
| 91 | +// continued to send packets 5 & 6 when receiving the NACK 5, the resend queue |
| 92 | +// response to that NACK would cause the resend loop to continue indefinitely. |
| 93 | +// OR |
| 94 | +// 3. If neither of condition 1 or 2 above is met within 3X the resend timeout, |
| 95 | +// we will time out and consider the sync to be completed. See the docs for |
| 96 | +// awaitingTimeoutMultiplier for more details on why we wait 3X the resend |
| 97 | +// timeout. |
| 98 | +// |
| 99 | +// When either of the 3 conditions above are met, we will consider both parties |
| 100 | +// to be in sync. |
| 101 | +type syncer struct { |
| 102 | + s uint8 |
| 103 | + log btclog.Logger |
| 104 | + timeout time.Duration |
| 105 | + |
| 106 | + state syncState |
| 107 | + |
| 108 | + // expectedACK defines the sequence number for the last packet in the |
| 109 | + // resend queue. If we receive an ACK for this sequence number while |
| 110 | + // waiting to sync, we wait for the duration of the resend timeout, |
| 111 | + // and then proceed to send new packets, unless we receive the |
| 112 | + // expectedNACK during the wait time. If that happens, we will proceed |
| 113 | + // to send new packets as soon as we have processed the NACK. |
| 114 | + expectedACK uint8 |
| 115 | + |
| 116 | + // expectedNACK is set to the sequence number that follows the last item |
| 117 | + // in resend queue, when a sync is initiated. In case we get a NACK with |
| 118 | + // this sequence number when waiting to sync, we'd consider the sync to |
| 119 | + // be completed and we can proceed to send new packets. |
| 120 | + expectedNACK uint8 |
| 121 | + |
| 122 | + // cancel is used to mark that the sync has been completed. |
| 123 | + cancel chan struct{} |
| 124 | + |
| 125 | + quit chan struct{} |
| 126 | + mu sync.Mutex |
| 127 | +} |
| 128 | + |
| 129 | +// newSyncer creates a new syncer instance. |
| 130 | +func newSyncer(s uint8, prefixLogger btclog.Logger, timeout time.Duration, |
| 131 | + quit chan struct{}) *syncer { |
| 132 | + |
| 133 | + if prefixLogger == nil { |
| 134 | + prefixLogger = log |
| 135 | + } |
| 136 | + |
| 137 | + return &syncer{ |
| 138 | + s: s, |
| 139 | + log: prefixLogger, |
| 140 | + timeout: timeout, |
| 141 | + state: syncStateIdle, |
| 142 | + cancel: make(chan struct{}), |
| 143 | + quit: quit, |
| 144 | + } |
| 145 | +} |
| 146 | + |
| 147 | +// reset resets the syncer state to idle and marks the sync as completed. |
| 148 | +func (c *syncer) reset() { |
| 149 | + c.mu.Lock() |
| 150 | + defer c.mu.Unlock() |
| 151 | + |
| 152 | + c.resetUnsafe() |
| 153 | +} |
| 154 | + |
| 155 | +// resetUnsafe resets the syncer state to idle and marks the sync as completed. |
| 156 | +// |
| 157 | +// NOTE: when calling this function, the caller must hold the syncer mutex. |
| 158 | +func (c *syncer) resetUnsafe() { |
| 159 | + c.state = syncStateIdle |
| 160 | + |
| 161 | + // Cancel any pending sync. |
| 162 | + select { |
| 163 | + case c.cancel <- struct{}{}: |
| 164 | + default: |
| 165 | + } |
| 166 | +} |
| 167 | + |
| 168 | +// initResendUpTo initializes the syncer to the resending state, and will after |
| 169 | +// this call be ready to wait for the sync to be completed when calling the |
| 170 | +// waitForSync function. |
| 171 | +// The top argument defines the sequence number of the next packet to be sent |
| 172 | +// after resending the queue. |
| 173 | +func (c *syncer) initResendUpTo(top uint8) { |
| 174 | + c.mu.Lock() |
| 175 | + defer c.mu.Unlock() |
| 176 | + |
| 177 | + c.state = syncStateResending |
| 178 | + |
| 179 | + // Drain the cancel channel, to reinitialize it for the new sync. |
| 180 | + select { |
| 181 | + case <-c.cancel: |
| 182 | + default: |
| 183 | + } |
| 184 | + |
| 185 | + c.expectedACK = (c.s + top - 1) % c.s |
| 186 | + c.expectedNACK = top |
| 187 | + |
| 188 | + c.log.Tracef("Set expectedACK to %d & expectedNACK to %d", |
| 189 | + c.expectedACK, c.expectedNACK) |
| 190 | +} |
| 191 | + |
| 192 | +// getState returns the current state of the syncer. |
| 193 | +func (c *syncer) getState() syncState { |
| 194 | + c.mu.Lock() |
| 195 | + defer c.mu.Unlock() |
| 196 | + |
| 197 | + return c.state |
| 198 | +} |
| 199 | + |
| 200 | +// waitForSync waits for the sync to be completed. The sync is completed when we |
| 201 | +// receive either the expectedNACK, the expectedACK + resend timeout has passed, |
| 202 | +// or when timing out. |
| 203 | +func (c *syncer) waitForSync() { |
| 204 | + c.log.Tracef("Awaiting sync after resending the queue") |
| 205 | + |
| 206 | + select { |
| 207 | + case <-c.quit: |
| 208 | + return |
| 209 | + |
| 210 | + case <-c.cancel: |
| 211 | + c.log.Tracef("sync canceled or reset") |
| 212 | + |
| 213 | + case <-time.After(c.timeout * awaitingTimeoutMultiplier): |
| 214 | + c.log.Tracef("Timed out while waiting for sync") |
| 215 | + } |
| 216 | + |
| 217 | + c.reset() |
| 218 | +} |
| 219 | + |
| 220 | +// processACK marks the sync as completed if the passed sequence number matches |
| 221 | +// the expectedACK, after the resend timeout has passed. |
| 222 | +// If we are not resending or waiting after a resend, this is a no-op. |
| 223 | +func (c *syncer) processACK(seq uint8) { |
| 224 | + c.mu.Lock() |
| 225 | + defer c.mu.Unlock() |
| 226 | + |
| 227 | + // If we are not resending or waiting after a resend, just swallow the |
| 228 | + // ACK. |
| 229 | + if c.state != syncStateResending { |
| 230 | + return |
| 231 | + } |
| 232 | + |
| 233 | + // Else, if we are waiting but this is not the ack we are waiting for, |
| 234 | + // just swallow it. |
| 235 | + if seq != c.expectedACK { |
| 236 | + return |
| 237 | + } |
| 238 | + |
| 239 | + c.log.Tracef("Got expected ACK") |
| 240 | + |
| 241 | + // We start the proceedAfterTime function in a goroutine, as we |
| 242 | + // don't want to block the processing of other NACKs/ACKs while |
| 243 | + // we're waiting for the resend timeout to expire. |
| 244 | + go c.proceedAfterTime() |
| 245 | +} |
| 246 | + |
| 247 | +// processNACK marks the sync as completed if the passed sequence number matches |
| 248 | +// the expectedNACK. |
| 249 | +// If we are not resending or waiting after a resend, this is a no-op. |
| 250 | +func (c *syncer) processNACK(seq uint8) { |
| 251 | + c.mu.Lock() |
| 252 | + defer c.mu.Unlock() |
| 253 | + |
| 254 | + // If we are not resending or waiting after a resend, just swallow the |
| 255 | + // NACK. |
| 256 | + if c.state != syncStateResending { |
| 257 | + return |
| 258 | + } |
| 259 | + |
| 260 | + // Else, if we are waiting but this is not the NACK we are waiting for, |
| 261 | + // just swallow it. |
| 262 | + if seq != c.expectedNACK { |
| 263 | + return |
| 264 | + } |
| 265 | + |
| 266 | + c.log.Tracef("Got expected NACK") |
| 267 | + |
| 268 | + c.resetUnsafe() |
| 269 | +} |
| 270 | + |
| 271 | +// proceedAfterTime will wait for the resendTimeout and then complete the sync, |
| 272 | +// if we haven't completed the sync yet by receiving the expectedNACK. |
| 273 | +func (c *syncer) proceedAfterTime() { |
| 274 | + // We await for the duration of the resendTimeout before completing the |
| 275 | + // sync, as that's the time we'd expect it to take for the other party |
| 276 | + // to respond with a NACK, if the resent last packet in the |
| 277 | + // queue would lead to a NACK. If we receive the expectedNACK |
| 278 | + // before the timeout, the cancel channel will be sent over, and we can |
| 279 | + // stop the execution early. |
| 280 | + select { |
| 281 | + case <-c.quit: |
| 282 | + return |
| 283 | + |
| 284 | + case <-c.cancel: |
| 285 | + c.log.Tracef("sync succeeded or was reset") |
| 286 | + |
| 287 | + // As we can't be sure that waitForSync cancel listener was |
| 288 | + // triggered before this one, we send over the cancel channel |
| 289 | + // again, to make sure that both listeners are triggered. |
| 290 | + c.reset() |
| 291 | + |
| 292 | + return |
| 293 | + |
| 294 | + case <-time.After(c.timeout): |
| 295 | + c.mu.Lock() |
| 296 | + defer c.mu.Unlock() |
| 297 | + |
| 298 | + if c.state != syncStateResending { |
| 299 | + return |
| 300 | + } |
| 301 | + |
| 302 | + c.log.Tracef("Completing sync after expectedACK timeout") |
| 303 | + |
| 304 | + c.resetUnsafe() |
| 305 | + } |
| 306 | +} |
0 commit comments