7
7
8
8
"github.com/lightninglabs/aperture/l402"
9
9
"github.com/lightninglabs/loop/swapserverrpc"
10
+ "github.com/lightningnetwork/lnd/lntypes"
10
11
"google.golang.org/grpc"
11
12
)
12
13
@@ -26,6 +27,13 @@ const (
26
27
NotificationTypeStaticLoopInSweepRequest
27
28
)
28
29
30
+ const (
31
+ // defaultMinAliveConnTime is the default minimum time that the
32
+ // connection to the server needs to be alive before we consider it a
33
+ // successful connection.
34
+ defaultMinAliveConnTime = time .Minute
35
+ )
36
+
29
37
// Client is the interface that the notification manager needs to implement in
30
38
// order to be able to subscribe to notifications.
31
39
type Client interface {
@@ -45,6 +53,10 @@ type Config struct {
45
53
// CurrentToken returns the token that is currently contained in the
46
54
// store or an l402.ErrNoToken error if there is none.
47
55
CurrentToken func () (* l402.Token , error )
56
+
57
+ // MinAliveConnTime is the minimum time that the connection to the
58
+ // server needs to be alive before we consider it a successful.
59
+ MinAliveConnTime time.Duration
48
60
}
49
61
50
62
// Manager is a manager for notifications that the swap server sends to the
@@ -60,6 +72,11 @@ type Manager struct {
60
72
61
73
// NewManager creates a new notification manager.
62
74
func NewManager (cfg * Config ) * Manager {
75
+ // Set the default minimum alive connection time if it's not set.
76
+ if cfg .MinAliveConnTime == 0 {
77
+ cfg .MinAliveConnTime = defaultMinAliveConnTime
78
+ }
79
+
63
80
return & Manager {
64
81
cfg : cfg ,
65
82
subscribers : make (map [NotificationType ][]subscriber ),
@@ -128,13 +145,18 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
128
145
// close the readyChan to signal that the manager is ready.
129
146
func (m * Manager ) Run (ctx context.Context ) error {
130
147
// Initially we want to immediately try to connect to the server.
131
- waitTime := time .Duration (0 )
148
+ var (
149
+ waitTime time.Duration
150
+ backoff time.Duration
151
+ attempts int
152
+ )
132
153
133
154
// Start the notification runloop.
134
155
for {
135
- timer := time .NewTimer (waitTime )
136
156
// Increase the wait time for the next iteration.
137
- waitTime += time .Second * 1
157
+ backoff = waitTime + time .Duration (attempts )* time .Second
158
+ waitTime = 0
159
+ timer := time .NewTimer (backoff )
138
160
139
161
// Return if the context has been canceled.
140
162
select {
@@ -145,37 +167,66 @@ func (m *Manager) Run(ctx context.Context) error {
145
167
}
146
168
147
169
// In order to create a valid l402 we first are going to call
148
- // the FetchL402 method. As a client might not have outbound capacity
149
- // yet, we'll retry until we get a valid response.
170
+ // the FetchL402 method. As a client might not have outbound
171
+ // capacity yet, we'll retry until we get a valid response.
150
172
if ! m .hasL402 {
151
- _ , err := m .cfg .CurrentToken ()
173
+ token , err := m .cfg .CurrentToken ()
152
174
if err != nil {
153
- // We only log the error if it's not the case that we
154
- // don't have a token yet to avoid spamming the logs.
175
+ // We only log the error if it's not the case
176
+ // that we don't have a token yet to avoid
177
+ // spamming the logs.
155
178
if err != l402 .ErrNoToken {
156
- log .Errorf ("Error getting L402 from store: %v" , err )
179
+ log .Errorf ("Error getting L402 from " +
180
+ "the store: %v" , err )
157
181
}
158
182
continue
159
183
}
160
- m .hasL402 = true
161
- }
162
184
163
- connectedFunc := func () {
164
- // Reset the wait time to 10 seconds.
165
- waitTime = time .Second * 10
185
+ // If the preimage is empty, we don't have a valid L402
186
+ // yet so we'll continue to retry with the incremental
187
+ // backoff.
188
+ emptyPreimage := lntypes.Preimage {}
189
+ if token .Preimage == emptyPreimage {
190
+ attempts ++
191
+ continue
192
+ }
193
+
194
+ attempts = 0
195
+ m .hasL402 = true
166
196
}
167
197
168
- err := m .subscribeNotifications (ctx , connectedFunc )
198
+ connectAttempted := time .Now ()
199
+ err := m .subscribeNotifications (ctx )
169
200
if err != nil {
170
- log .Errorf ("Error subscribing to notifications: %v" , err )
201
+ log .Errorf ("Error subscribing to notifications: %v" ,
202
+ err )
203
+ }
204
+ connectionAliveTime := time .Since (connectAttempted )
205
+
206
+ // Note that we may be able to connet to the stream but not
207
+ // able to use it if the client is unable to pay for their
208
+ // L402. In this case the subscription will fail on the first
209
+ // read immediately after connecting. We'll therefore only
210
+ // consider the connection successful if we were able to use
211
+ // the stream for at least the minimum alive connection time
212
+ // (which defaults to 1 minute).
213
+ if connectionAliveTime > m .cfg .MinAliveConnTime {
214
+ // Reset the backoff to 10 seconds and the connect
215
+ // attempts to zero if we were really connected for a
216
+ // considerable amount of time (1 minute).
217
+ waitTime = time .Second * 10
218
+ attempts = 0
219
+ } else {
220
+ // We either failed to connect or the stream
221
+ // disconnected immediately, so we just increase the
222
+ // backoff.
223
+ attempts ++
171
224
}
172
225
}
173
226
}
174
227
175
228
// subscribeNotifications subscribes to the notifications from the server.
176
- func (m * Manager ) subscribeNotifications (ctx context.Context ,
177
- connectedFunc func ()) error {
178
-
229
+ func (m * Manager ) subscribeNotifications (ctx context.Context ) error {
179
230
callCtx , cancel := context .WithCancel (ctx )
180
231
defer cancel ()
181
232
@@ -186,8 +237,6 @@ func (m *Manager) subscribeNotifications(ctx context.Context,
186
237
return err
187
238
}
188
239
189
- // Signal that we're connected to the server.
190
- connectedFunc ()
191
240
log .Debugf ("Successfully subscribed to server notifications" )
192
241
193
242
for {
0 commit comments