Skip to content

Commit 43aa566

Browse files
committed
notifications: implement incremental backoff for invalid L402 tokens
1 parent 7cfceb7 commit 43aa566

File tree

2 files changed

+230
-20
lines changed

2 files changed

+230
-20
lines changed

notifications/manager.go

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ const (
2626
NotificationTypeStaticLoopInSweepRequest
2727
)
2828

29+
const (
30+
// defaultMinAliveConnTime is the default minimum time that the
31+
// connection to the server needs to be alive before we consider it a
32+
// successful connection.
33+
defaultMinAliveConnTime = time.Minute
34+
)
35+
2936
// Client is the interface that the notification manager needs to implement in
3037
// order to be able to subscribe to notifications.
3138
type Client interface {
@@ -45,6 +52,10 @@ type Config struct {
4552
// CurrentToken returns the token that is currently contained in the
4653
// store or an l402.ErrNoToken error if there is none.
4754
CurrentToken func() (*l402.Token, error)
55+
56+
// MinAliveConnTime is the minimum time that the connection to the
57+
// server needs to be alive before we consider it a successful.
58+
MinAliveConnTime time.Duration
4859
}
4960

5061
// Manager is a manager for notifications that the swap server sends to the
@@ -60,6 +71,11 @@ type Manager struct {
6071

6172
// NewManager creates a new notification manager.
6273
func NewManager(cfg *Config) *Manager {
74+
// Set the default minimum alive connection time if it's not set.
75+
if cfg.MinAliveConnTime == 0 {
76+
cfg.MinAliveConnTime = defaultMinAliveConnTime
77+
}
78+
6379
return &Manager{
6480
cfg: cfg,
6581
subscribers: make(map[NotificationType][]subscriber),
@@ -128,13 +144,18 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
128144
// close the readyChan to signal that the manager is ready.
129145
func (m *Manager) Run(ctx context.Context) error {
130146
// Initially we want to immediately try to connect to the server.
131-
waitTime := time.Duration(0)
147+
var (
148+
waitTime time.Duration
149+
backoff time.Duration
150+
connAttempts int
151+
)
132152

133153
// Start the notification runloop.
134154
for {
135-
timer := time.NewTimer(waitTime)
136155
// Increase the wait time for the next iteration.
137-
waitTime += time.Second * 1
156+
backoff = waitTime + time.Duration(connAttempts)*time.Second
157+
waitTime = 0
158+
timer := time.NewTimer(backoff)
138159

139160
// Return if the context has been canceled.
140161
select {
@@ -145,37 +166,55 @@ func (m *Manager) Run(ctx context.Context) error {
145166
}
146167

147168
// In order to create a valid l402 we first are going to call
148-
// the FetchL402 method. As a client might not have outbound capacity
149-
// yet, we'll retry until we get a valid response.
169+
// the FetchL402 method. As a client might not have outbound
170+
// capacity yet, we'll retry until we get a valid response.
150171
if !m.hasL402 {
151172
_, err := m.cfg.CurrentToken()
152173
if err != nil {
153-
// We only log the error if it's not the case that we
154-
// don't have a token yet to avoid spamming the logs.
174+
// We only log the error if it's not the case
175+
// that we don't have a token yet to avoid
176+
// spamming the logs.
155177
if err != l402.ErrNoToken {
156-
log.Errorf("Error getting L402 from store: %v", err)
178+
log.Errorf("Error getting L402 from "+
179+
"the store: %v", err)
157180
}
158181
continue
159182
}
160183
m.hasL402 = true
161184
}
162185

163-
connectedFunc := func() {
164-
// Reset the wait time to 10 seconds.
165-
waitTime = time.Second * 10
166-
}
167-
168-
err := m.subscribeNotifications(ctx, connectedFunc)
186+
connectAttempted := time.Now()
187+
err := m.subscribeNotifications(ctx)
169188
if err != nil {
170-
log.Errorf("Error subscribing to notifications: %v", err)
189+
log.Errorf("Error subscribing to notifications: %v",
190+
err)
191+
}
192+
connectionAliveTime := time.Since(connectAttempted)
193+
194+
// Note that we may be able to connet to the stream but not
195+
// able to use it if the client is unable to pay for their
196+
// L402. In this case the subscription will fail on the first
197+
// read immediately after connecting. We'll therefore only
198+
// consider the connection successful if we were able to use
199+
// the stream for at least the minimum alive connection time
200+
// (which defaults to 1 minute).
201+
if connectionAliveTime > m.cfg.MinAliveConnTime {
202+
// Reset the backoff to 10 seconds and the connect
203+
// attempts to zero if we were really connected for a
204+
// considerable amount of time (1 minute).
205+
waitTime = time.Second * 10
206+
connAttempts = 0
207+
} else {
208+
// We either failed to connect or the stream
209+
// disconnected immediately, so we just increase the
210+
// backoff.
211+
connAttempts++
171212
}
172213
}
173214
}
174215

175216
// subscribeNotifications subscribes to the notifications from the server.
176-
func (m *Manager) subscribeNotifications(ctx context.Context,
177-
connectedFunc func()) error {
178-
217+
func (m *Manager) subscribeNotifications(ctx context.Context) error {
179218
callCtx, cancel := context.WithCancel(ctx)
180219
defer cancel()
181220

@@ -186,8 +225,6 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
186225
return err
187226
}
188227

189-
// Signal that we're connected to the server.
190-
connectedFunc()
191228
log.Debugf("Successfully subscribed to server notifications")
192229

193230
for {

notifications/manager_test.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package notifications
22

33
import (
44
"context"
5+
"errors"
56
"io"
67
"sync"
78
"testing"
@@ -23,6 +24,7 @@ var (
2324
type mockNotificationsClient struct {
2425
mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient
2526
subscribeErr error
27+
attemptTimes []time.Time
2628
timesCalled int
2729
sync.Mutex
2830
}
@@ -36,6 +38,7 @@ func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context,
3638
defer m.Unlock()
3739

3840
m.timesCalled++
41+
m.attemptTimes = append(m.attemptTimes, time.Now())
3942
if m.subscribeErr != nil {
4043
return nil, m.subscribeErr
4144
}
@@ -87,7 +90,11 @@ func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error {
8790
return nil
8891
}
8992

93+
// TestManager_ReservationNotification tests that the Manager correctly
94+
// forwards reservation notifications to subscribers.
9095
func TestManager_ReservationNotification(t *testing.T) {
96+
t.Parallel()
97+
9198
// Create a mock notification client
9299
recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1)
93100
errChan := make(chan error, 1)
@@ -172,3 +179,169 @@ func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResp
172179
},
173180
}
174181
}
182+
183+
// TestManager_Backoff verifies that repeated failures in
184+
// subscribeNotifications cause the Manager to space out subscription attempts
185+
// via a predictable incremental backoff.
186+
func TestManager_Backoff(t *testing.T) {
187+
t.Parallel()
188+
189+
// We'll tolerate a bit of jitter in the timing checks.
190+
const tolerance = 300 * time.Millisecond
191+
192+
recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse)
193+
recvErrChan := make(chan error)
194+
195+
mockStream := &mockSubscribeNotificationsClient{
196+
recvChan: recvChan,
197+
recvErrChan: recvErrChan,
198+
}
199+
200+
// Create a new mock client that will fail to subscribe.
201+
mockClient := &mockNotificationsClient{
202+
mockStream: mockStream,
203+
subscribeErr: errors.New("failing on purpose"),
204+
}
205+
206+
// Manager with a successful CurrentToken so that it always tries
207+
// to subscribe.
208+
mgr := NewManager(&Config{
209+
Client: mockClient,
210+
CurrentToken: func() (*l402.Token, error) {
211+
return &l402.Token{}, nil
212+
},
213+
})
214+
215+
// Run the manager in a background goroutine.
216+
ctx, cancel := context.WithCancel(context.Background())
217+
defer cancel()
218+
219+
var wg sync.WaitGroup
220+
wg.Add(1)
221+
go func() {
222+
defer wg.Done()
223+
// We ignore the returned error because the Manager returns
224+
// nil on context cancel.
225+
_ = mgr.Run(ctx)
226+
}()
227+
228+
// Wait long enough to see at least 3 subscription attempts using
229+
// the Manager's default pattern.
230+
// We'll wait ~5 seconds total so we capture at least 3 attempts:
231+
// - Attempt #1: immediate
232+
// - Attempt #2: ~1 second
233+
// - Attempt #3: ~3 seconds after that etc.
234+
time.Sleep(5 * time.Second)
235+
236+
// Cancel the contedt to stop the manager.
237+
cancel()
238+
wg.Wait()
239+
240+
// Check how many attempts we made.
241+
require.GreaterOrEqual(t, len(mockClient.attemptTimes), 3,
242+
"expected at least 3 attempts within 5 seconds",
243+
)
244+
245+
expectedDelay := time.Second
246+
for i := 1; i < len(mockClient.attemptTimes); i++ {
247+
// The expected delay for the i-th gap (comparing attempt i to
248+
// attempt i-1) is i seconds (because the manager increments
249+
// the backoff by 1 second each time).
250+
actualDelay := mockClient.attemptTimes[i].Sub(
251+
mockClient.attemptTimes[i-1],
252+
)
253+
254+
require.InDeltaf(
255+
t, expectedDelay, actualDelay, float64(tolerance),
256+
"Attempt %d -> Attempt %d delay should be ~%v, got %v",
257+
i, i+1, expectedDelay, actualDelay,
258+
)
259+
260+
expectedDelay += time.Second
261+
}
262+
}
263+
264+
// TestManager_MinAliveConnTime verifies that the Manager enforces the minimum
265+
// alive connection time before considering a subscription successful.
266+
func TestManager_MinAliveConnTime(t *testing.T) {
267+
t.Parallel()
268+
269+
// Tolerance to allow for scheduling jitter.
270+
const tolerance = 300 * time.Millisecond
271+
272+
// Set a small MinAliveConnTime so the test doesn't run too long.
273+
// Once a subscription stays alive longer than 2s, the manager resets
274+
// its backoff to 10s on the next loop iteration.
275+
const minAlive = 1 * time.Second
276+
277+
// We'll provide a channel for incoming notifications
278+
// and another for forcing errors to close the subscription.
279+
recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse)
280+
recvErrChan := make(chan error)
281+
282+
mockStream := &mockSubscribeNotificationsClient{
283+
recvChan: recvChan,
284+
recvErrChan: recvErrChan,
285+
}
286+
287+
// No immediate error from SubscribeNotifications, so it "succeeds".
288+
// We trigger subscription closure by sending an error to recvErrChan.
289+
mockClient := &mockNotificationsClient{
290+
mockStream: mockStream,
291+
// subscribeErr stays nil => success on each call.
292+
}
293+
294+
// Create a Manager that uses our mock client and enforces
295+
// MinAliveConnTime=2s.
296+
mgr := NewManager(&Config{
297+
Client: mockClient,
298+
MinAliveConnTime: minAlive,
299+
CurrentToken: func() (*l402.Token, error) {
300+
return &l402.Token{}, nil
301+
},
302+
})
303+
304+
ctx, cancel := context.WithCancel(context.Background())
305+
var wg sync.WaitGroup
306+
wg.Add(1)
307+
go func() {
308+
defer wg.Done()
309+
_ = mgr.Run(ctx)
310+
}()
311+
312+
// Let the subscription stay alive for 2s, which is >1s (minAlive).
313+
// Then force an error to end the subscription. The manager sees
314+
// it stayed connected ~2s and resets its backoff to 10s.
315+
go func() {
316+
time.Sleep(2 * time.Second)
317+
recvErrChan <- errors.New("mock subscription closed")
318+
}()
319+
320+
// Wait enough time (~13s) to see:
321+
// - First subscription (2s)
322+
// - Manager resets to 10s
323+
// - Second subscription attempt starts ~10s later.
324+
time.Sleep(13 * time.Second)
325+
326+
// Signal EOF so the subscription stops.
327+
close(recvChan)
328+
329+
// Stop the manager and wait for cleanup.
330+
cancel()
331+
wg.Wait()
332+
333+
// Expect at least 2 attempts in attemptTimes:
334+
// 1) The one that stayed alive for 2s,
335+
// 2) The next attempt ~10s after that.
336+
require.GreaterOrEqual(
337+
t, len(mockClient.attemptTimes), 2,
338+
"expected at least 2 attempts with a successful subscription",
339+
)
340+
341+
require.InDeltaf(
342+
t, 12*time.Second,
343+
mockClient.attemptTimes[1].Sub(mockClient.attemptTimes[0]),
344+
float64(tolerance),
345+
"Second attempt should occur ~2s after the first",
346+
)
347+
}

0 commit comments

Comments
 (0)