Skip to content

Commit acf01cc

Browse files
stariushieblmi
authored andcommitted
sweepbatcher: notify caller about confirmations
Add fields ConfChan and ConfErrChan to SpendNotifier type which is a part of SweepRequest passed to AddSweep method. This is needed to reuse confirmation notifications on the calling side the same way it is done for spending notifications.
1 parent a91f968 commit acf01cc

File tree

3 files changed

+272
-3
lines changed

3 files changed

+272
-3
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1882,6 +1882,8 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18821882
}
18831883

18841884
case err := <-errChan:
1885+
b.writeToConfErrChan(ctx, err)
1886+
18851887
b.writeToErrChan(fmt.Errorf("confirmations "+
18861888
"monitoring error: %w", err))
18871889

@@ -2158,7 +2160,55 @@ func (b *batch) handleConf(ctx context.Context,
21582160
b.Infof("confirmed in txid %s", b.batchTxid)
21592161
b.state = Confirmed
21602162

2161-
return b.store.ConfirmBatch(ctx, b.id)
2163+
if err := b.store.ConfirmBatch(ctx, b.id); err != nil {
2164+
return fmt.Errorf("failed to store confirmed state: %w", err)
2165+
}
2166+
2167+
// Calculate the fee portion that each sweep should pay for the batch.
2168+
// TODO: make sure spendTx matches b.sweeps.
2169+
var totalSweptAmt btcutil.Amount
2170+
for _, s := range b.sweeps {
2171+
totalSweptAmt += s.value
2172+
}
2173+
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
2174+
spendTx, len(b.sweeps), totalSweptAmt,
2175+
)
2176+
2177+
// Send the confirmation to all the notifiers.
2178+
for _, s := range b.sweeps {
2179+
// If the sweep's notifier is empty then this means that
2180+
// a swap is not waiting to read an update from it, so
2181+
// we can skip the notification part.
2182+
if s.notifier == nil || s.notifier.ConfChan == nil {
2183+
continue
2184+
}
2185+
2186+
confDetail := &ConfDetail{
2187+
TxConfirmation: conf,
2188+
OnChainFeePortion: getFeePortionPaidBySweep(
2189+
spendTx, feePortionPaidPerSweep,
2190+
roundingDifference, &s,
2191+
),
2192+
}
2193+
2194+
// Notify the caller in a goroutine to avoid possible dead-lock.
2195+
go func(notifier *SpendNotifier) {
2196+
// Note that we don't unblock on ctx, because it will
2197+
// expire soon, when batch.Run completes. The caller is
2198+
// responsible to consume ConfChan or close QuitChan.
2199+
select {
2200+
// Try to write the confirmation to the notification
2201+
// channel.
2202+
case notifier.ConfChan <- confDetail:
2203+
2204+
// If a quit signal was provided by the swap,
2205+
// continue.
2206+
case <-notifier.QuitChan:
2207+
}
2208+
}(s.notifier)
2209+
}
2210+
2211+
return nil
21622212
}
21632213

21642214
// isComplete returns true if the batch is completed. This method is used by the
@@ -2314,6 +2364,44 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
23142364
}
23152365
}
23162366

2367+
// writeToConfErrChan sends an error to confirmation error channels of all the
2368+
// sweeps.
2369+
func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) {
2370+
done, err := b.scheduleNextCall()
2371+
if err != nil {
2372+
done()
2373+
2374+
return
2375+
}
2376+
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
2377+
for _, s := range b.sweeps {
2378+
// If the sweep's notifier is empty then this means that a swap
2379+
// is not waiting to read an update from it, so we can skip
2380+
// the notification part.
2381+
if s.notifier == nil || s.notifier.ConfErrChan == nil {
2382+
continue
2383+
}
2384+
2385+
notifiers = append(notifiers, s.notifier)
2386+
}
2387+
done()
2388+
2389+
for _, notifier := range notifiers {
2390+
select {
2391+
// Try to write the error to the notification
2392+
// channel.
2393+
case notifier.ConfErrChan <- confErr:
2394+
2395+
// If a quit signal was provided by the swap,
2396+
// continue.
2397+
case <-notifier.QuitChan:
2398+
2399+
// If the context was canceled, stop.
2400+
case <-ctx.Done():
2401+
}
2402+
}
2403+
}
2404+
23172405
func (b *batch) persistSweep(ctx context.Context, sweep sweep,
23182406
completed bool) error {
23192407

sweepbatcher/sweep_batcher.go

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightninglabs/loop/loopdb"
2121
"github.com/lightninglabs/loop/swap"
2222
"github.com/lightninglabs/loop/utils"
23+
"github.com/lightningnetwork/lnd/chainntnfs"
2324
"github.com/lightningnetwork/lnd/clock"
2425
"github.com/lightningnetwork/lnd/input"
2526
"github.com/lightningnetwork/lnd/lntypes"
@@ -280,6 +281,8 @@ type addSweepsRequest struct {
280281
parentBatch *dbBatch
281282
}
282283

284+
// SpendDetail is a notification that is send to the user of sweepbatcher when
285+
// a batch gets the first confirmation.
283286
type SpendDetail struct {
284287
// Tx is the transaction that spent the outpoint.
285288
Tx *wire.MsgTx
@@ -291,6 +294,19 @@ type SpendDetail struct {
291294
OnChainFeePortion btcutil.Amount
292295
}
293296

297+
// ConfDetail is a notification that is send to the user of sweepbatcher when
298+
// a batch is fully confirmed, i.e. gets batchConfHeight confirmations.
299+
type ConfDetail struct {
300+
// TxConfirmation has data about the confirmation of the transaction.
301+
*chainntnfs.TxConfirmation
302+
303+
// OnChainFeePortion is the fee portion that was paid to get this sweep
304+
// confirmed on chain. This is the difference between the value of the
305+
// outpoint and the value of all sweeps that were included in the batch
306+
// divided by the number of sweeps.
307+
OnChainFeePortion btcutil.Amount
308+
}
309+
294310
// SpendNotifier is a notifier that is used to notify the requester of a sweep
295311
// that the sweep was successful.
296312
type SpendNotifier struct {
@@ -300,6 +316,14 @@ type SpendNotifier struct {
300316
// SpendErrChan is a channel where spend errors are received.
301317
SpendErrChan chan<- error
302318

319+
// ConfChan is a channel where the confirmation details are received.
320+
// This channel is optional.
321+
ConfChan chan<- *ConfDetail
322+
323+
// ConfErrChan is a channel where confirmation errors are received.
324+
// This channel is optional.
325+
ConfErrChan chan<- error
326+
303327
// QuitChan is a channel that can be closed to stop the notifier.
304328
QuitChan <-chan bool
305329
}
@@ -1114,7 +1138,9 @@ func (b *Batcher) FetchUnconfirmedBatches(ctx context.Context) ([]*batch,
11141138
}
11151139

11161140
// monitorSpendAndNotify monitors the spend of a specific outpoint and writes
1117-
// the response back to the response channel.
1141+
// the response back to the response channel. It is called if the batch is fully
1142+
// confirmed and we just need to deliver the data back to the caller though
1143+
// SpendNotifier.
11181144
func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
11191145
parentBatchID int32, notifier *SpendNotifier) error {
11201146

@@ -1172,6 +1198,16 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
11721198
select {
11731199
// Try to write the update to the notification channel.
11741200
case notifier.SpendChan <- spendDetail:
1201+
err := b.monitorConfAndNotify(
1202+
ctx, sweep, notifier, spendTx,
1203+
onChainFeePortion,
1204+
)
1205+
if err != nil {
1206+
b.writeToErrChan(
1207+
ctx, fmt.Errorf("monitor conf "+
1208+
"failed: %w", err),
1209+
)
1210+
}
11751211

11761212
// If a quit signal was provided by the swap, continue.
11771213
case <-notifier.QuitChan:
@@ -1215,6 +1251,84 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweep *sweep,
12151251
return nil
12161252
}
12171253

1254+
// monitorConfAndNotify monitors the confirmation of a specific transaction and
1255+
// writes the response back to the response channel. It is called if the batch
1256+
// is fully confirmed and we just need to deliver the data back to the caller
1257+
// though SpendNotifier.
1258+
func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep,
1259+
notifier *SpendNotifier, spendTx *wire.MsgTx,
1260+
onChainFeePortion btcutil.Amount) error {
1261+
1262+
// If confirmation notifications were not requested, stop.
1263+
if notifier.ConfChan == nil && notifier.ConfErrChan == nil {
1264+
return nil
1265+
}
1266+
1267+
batchTxid := spendTx.TxHash()
1268+
1269+
if len(spendTx.TxOut) != 1 {
1270+
return fmt.Errorf("unexpected number of outputs in batch: %d, "+
1271+
"want %d", len(spendTx.TxOut), 1)
1272+
}
1273+
batchPkScript := spendTx.TxOut[0].PkScript
1274+
1275+
reorgChan := make(chan struct{})
1276+
1277+
confCtx, cancel := context.WithCancel(ctx)
1278+
1279+
confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
1280+
confCtx, &batchTxid, batchPkScript, batchConfHeight,
1281+
sweep.initiationHeight, lndclient.WithReOrgChan(reorgChan),
1282+
)
1283+
if err != nil {
1284+
cancel()
1285+
return err
1286+
}
1287+
1288+
b.wg.Add(1)
1289+
go func() {
1290+
defer cancel()
1291+
defer b.wg.Done()
1292+
1293+
select {
1294+
case conf := <-confChan:
1295+
if notifier.ConfChan != nil {
1296+
confDetail := &ConfDetail{
1297+
TxConfirmation: conf,
1298+
OnChainFeePortion: onChainFeePortion,
1299+
}
1300+
1301+
select {
1302+
case notifier.ConfChan <- confDetail:
1303+
case <-notifier.QuitChan:
1304+
case <-ctx.Done():
1305+
}
1306+
}
1307+
1308+
case err := <-errChan:
1309+
if notifier.ConfErrChan != nil {
1310+
select {
1311+
case notifier.ConfErrChan <- err:
1312+
case <-notifier.QuitChan:
1313+
case <-ctx.Done():
1314+
}
1315+
}
1316+
1317+
b.writeToErrChan(ctx, fmt.Errorf("confirmations "+
1318+
"monitoring error: %w", err))
1319+
1320+
case <-reorgChan:
1321+
// A re-org has been detected, but the batch is fully
1322+
// confirmed and this is unexpected. Crash the batcher.
1323+
b.writeToErrChan(ctx, fmt.Errorf("unexpected reorg"))
1324+
1325+
case <-ctx.Done():
1326+
}
1327+
}()
1328+
1329+
return nil
1330+
}
1331+
12181332
func (b *Batcher) writeToErrChan(ctx context.Context, err error) {
12191333
select {
12201334
case b.errChan <- err:

0 commit comments

Comments
 (0)