Skip to content

Commit b07f698

Browse files
committed
utils: add utility subscribers
1 parent 33fa191 commit b07f698

File tree

2 files changed

+283
-0
lines changed

2 files changed

+283
-0
lines changed

utils/listeners.go

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
package utils
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/btcsuite/btcd/chaincfg/chainhash"
8+
"github.com/lightninglabs/lndclient"
9+
"github.com/lightningnetwork/lnd/chainntnfs"
10+
"github.com/lightningnetwork/lnd/lntypes"
11+
)
12+
13+
// ExpiryManager is a manager for block height expiry events.
14+
type ExpiryManager struct {
15+
chainNotifier lndclient.ChainNotifierClient
16+
17+
expiryHeightMap map[[32]byte]int32
18+
expiryFuncMap map[[32]byte]func()
19+
20+
currentBlockHeight int32
21+
22+
sync.Mutex
23+
}
24+
25+
// NewExpiryManager creates a new expiry manager.
26+
func NewExpiryManager(
27+
chainNotifier lndclient.ChainNotifierClient) *ExpiryManager {
28+
29+
return &ExpiryManager{
30+
chainNotifier: chainNotifier,
31+
expiryHeightMap: make(map[[32]byte]int32),
32+
expiryFuncMap: make(map[[32]byte]func()),
33+
}
34+
}
35+
36+
// Start starts the expiry manager and listens for block height notifications.
37+
func (e *ExpiryManager) Start(ctx context.Context, startingBlockHeight int32,
38+
) error {
39+
40+
e.Lock()
41+
e.currentBlockHeight = startingBlockHeight
42+
e.Unlock()
43+
44+
log.Debugf("Starting expiry manager at height %d", startingBlockHeight)
45+
defer log.Debugf("Expiry manager stopped")
46+
47+
blockHeightChan, errChan, err := e.chainNotifier.RegisterBlockEpochNtfn(
48+
ctx,
49+
)
50+
if err != nil {
51+
return err
52+
}
53+
54+
for {
55+
select {
56+
case blockHeight := <-blockHeightChan:
57+
58+
log.Debugf("Received block height %d", blockHeight)
59+
60+
e.Lock()
61+
e.currentBlockHeight = blockHeight
62+
e.Unlock()
63+
64+
e.checkExpiry(blockHeight)
65+
66+
case err := <-errChan:
67+
log.Debugf("Expiry manager error")
68+
return err
69+
70+
case <-ctx.Done():
71+
log.Debugf("Expiry manager stopped")
72+
return nil
73+
}
74+
}
75+
}
76+
77+
// GetBlockHeight returns the current block height.
78+
func (e *ExpiryManager) GetBlockHeight() int32 {
79+
e.Lock()
80+
defer e.Unlock()
81+
82+
return e.currentBlockHeight
83+
}
84+
85+
// checkExpiry checks if any swaps have expired and calls the expiry function if
86+
// they have.
87+
func (e *ExpiryManager) checkExpiry(blockHeight int32) {
88+
e.Lock()
89+
defer e.Unlock()
90+
91+
for swapHash, expiryHeight := range e.expiryHeightMap {
92+
if blockHeight >= expiryHeight {
93+
expiryFunc := e.expiryFuncMap[swapHash]
94+
go expiryFunc()
95+
96+
delete(e.expiryHeightMap, swapHash)
97+
delete(e.expiryFuncMap, swapHash)
98+
}
99+
}
100+
}
101+
102+
// SubscribeExpiry subscribes to an expiry event for a swap. If the expiry height
103+
// has already been reached, the expiryFunc is not called and the function
104+
// returns true. Otherwise, the expiryFunc is called when the expiry height is
105+
// reached and the function returns false.
106+
func (e *ExpiryManager) SubscribeExpiry(swapHash [32]byte,
107+
expiryHeight int32, expiryFunc func()) bool {
108+
109+
e.Lock()
110+
defer e.Unlock()
111+
112+
if e.currentBlockHeight >= expiryHeight {
113+
return true
114+
}
115+
116+
log.Debugf("Subscribing to expiry for swap %x at height %d",
117+
swapHash, expiryHeight)
118+
119+
e.expiryHeightMap[swapHash] = expiryHeight
120+
e.expiryFuncMap[swapHash] = expiryFunc
121+
122+
return false
123+
}
124+
125+
// SubscribeInvoiceManager is a manager for invoice subscription events.
126+
type SubscribeInvoiceManager struct {
127+
invoicesClient lndclient.InvoicesClient
128+
129+
subscribers map[[32]byte]struct{}
130+
131+
sync.Mutex
132+
}
133+
134+
// NewSubscribeInvoiceManager creates a new subscribe invoice manager.
135+
func NewSubscribeInvoiceManager(
136+
invoicesClient lndclient.InvoicesClient) *SubscribeInvoiceManager {
137+
138+
return &SubscribeInvoiceManager{
139+
invoicesClient: invoicesClient,
140+
subscribers: make(map[[32]byte]struct{}),
141+
}
142+
}
143+
144+
// SubscribeInvoice subscribes to invoice events for a swap hash. The update
145+
// callback is called when the invoice is updated and the error callback is
146+
// called when an error occurs.
147+
func (s *SubscribeInvoiceManager) SubscribeInvoice(ctx context.Context,
148+
invoiceHash lntypes.Hash, callback func(lndclient.InvoiceUpdate, error),
149+
) error {
150+
151+
s.Lock()
152+
defer s.Unlock()
153+
// If we already have a subscriber for this swap hash, return early.
154+
if _, ok := s.subscribers[invoiceHash]; ok {
155+
return nil
156+
}
157+
158+
log.Debugf("Subscribing to invoice %v", invoiceHash)
159+
160+
updateChan, errChan, err := s.invoicesClient.SubscribeSingleInvoice(
161+
ctx, invoiceHash,
162+
)
163+
if err != nil {
164+
return err
165+
}
166+
167+
s.subscribers[invoiceHash] = struct{}{}
168+
169+
go func() {
170+
for {
171+
select {
172+
case update := <-updateChan:
173+
callback(update, nil)
174+
175+
case err := <-errChan:
176+
callback(lndclient.InvoiceUpdate{}, err)
177+
delete(s.subscribers, invoiceHash)
178+
return
179+
180+
case <-ctx.Done():
181+
delete(s.subscribers, invoiceHash)
182+
return
183+
}
184+
}
185+
}()
186+
187+
return nil
188+
}
189+
190+
// TxSubscribeConfirmationManager is a manager for transaction confirmation
191+
// subscription events.
192+
type TxSubscribeConfirmationManager struct {
193+
chainNotifier lndclient.ChainNotifierClient
194+
195+
subscribers map[[32]byte]struct{}
196+
197+
sync.Mutex
198+
}
199+
200+
// NewTxSubscribeConfirmationManager creates a new transaction confirmation
201+
// subscription manager.
202+
func NewTxSubscribeConfirmationManager(chainNtfn lndclient.ChainNotifierClient,
203+
) *TxSubscribeConfirmationManager {
204+
205+
return &TxSubscribeConfirmationManager{
206+
chainNotifier: chainNtfn,
207+
subscribers: make(map[[32]byte]struct{}),
208+
}
209+
}
210+
211+
// SubscribeTxConfirmation subscribes to transaction confirmation events for a
212+
// swap hash. The callback is called when the transaction is confirmed or an
213+
// error occurs.
214+
func (t *TxSubscribeConfirmationManager) SubscribeTxConfirmation(
215+
ctx context.Context, swapHash lntypes.Hash, txid *chainhash.Hash,
216+
pkscript []byte, numConfs int32, heightHint int32,
217+
cb func(*chainntnfs.TxConfirmation, error)) error {
218+
219+
t.Lock()
220+
defer t.Unlock()
221+
222+
// If we already have a subscriber for this swap hash, return early.
223+
if _, ok := t.subscribers[swapHash]; ok {
224+
return nil
225+
}
226+
227+
log.Debugf("Subscribing to tx confirmation for swap %v", swapHash)
228+
229+
confChan, errChan, err := t.chainNotifier.RegisterConfirmationsNtfn(
230+
ctx, txid, pkscript, numConfs, heightHint,
231+
)
232+
if err != nil {
233+
return err
234+
}
235+
236+
t.subscribers[swapHash] = struct{}{}
237+
238+
go func() {
239+
for {
240+
select {
241+
case conf := <-confChan:
242+
cb(conf, nil)
243+
244+
case err := <-errChan:
245+
cb(nil, err)
246+
delete(t.subscribers, swapHash)
247+
return
248+
249+
case <-ctx.Done():
250+
delete(t.subscribers, swapHash)
251+
return
252+
}
253+
}
254+
}()
255+
256+
return nil
257+
}

utils/log.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package utils
2+
3+
import (
4+
"github.com/btcsuite/btclog/v2"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// Subsystem defines the sub system name of this package.
9+
const Subsystem = "UTILS"
10+
11+
// log is a logger that is initialized with no output filters. This
12+
// means the package will not perform any logging by default until the caller
13+
// requests it.
14+
var log btclog.Logger
15+
16+
// The default amount of logging is none.
17+
func init() {
18+
UseLogger(build.NewSubLogger(Subsystem, nil))
19+
}
20+
21+
// UseLogger uses a specified Logger to output package logging info.
22+
// This should be used in preference to SetLogWriter if the caller is also
23+
// using btclog.
24+
func UseLogger(logger btclog.Logger) {
25+
log = logger
26+
}

0 commit comments

Comments
 (0)