Skip to content

Commit cadcb72

Browse files
committed
sweepbatcher: fix reorg detection
The reorg channel is now passed to RegisterSpendNtfn, and waiting for spend notifications remains active even after the transaction receives its first confirmation. The dedicated goroutine previously used to wait for the spend is no longer needed, as we now handle both spend and potential reorg events in the main event loop while the batch is running. Following this change, RegisterConfirmationsNtfn runs without a reorg channel, as it would only detect deep reorgs that undo the final confirmation - something we can't handle anyway. We can't track fully confirmed swaps indefinitely to guard against such rare reorgs; instead, we can mitigate the risk by increasing the required confirmation depth.
1 parent 1260607 commit cadcb72

File tree

1 file changed

+30
-62
lines changed

1 file changed

+30
-62
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 30 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,10 @@ type batch struct {
232232
// spendChan is the channel over which spend notifications are received.
233233
spendChan chan *chainntnfs.SpendDetail
234234

235+
// spendErrChan is the channel over which spend notifier errors are
236+
// received.
237+
spendErrChan chan error
238+
235239
// confChan is the channel over which confirmation notifications are
236240
// received.
237241
confChan chan *chainntnfs.TxConfirmation
@@ -378,9 +382,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
378382
id: -1,
379383
state: Open,
380384
sweeps: make(map[wire.OutPoint]sweep),
381-
spendChan: make(chan *chainntnfs.SpendDetail),
382385
confChan: make(chan *chainntnfs.TxConfirmation, 1),
383-
reorgChan: make(chan struct{}, 1),
384386
testReqs: make(chan *testRequest),
385387
errChan: make(chan error, 1),
386388
callEnter: make(chan struct{}),
@@ -423,9 +425,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
423425
state: bk.state,
424426
primarySweepID: bk.primaryID,
425427
sweeps: bk.sweeps,
426-
spendChan: make(chan *chainntnfs.SpendDetail),
427428
confChan: make(chan *chainntnfs.TxConfirmation, 1),
428-
reorgChan: make(chan struct{}, 1),
429429
testReqs: make(chan *testRequest),
430430
errChan: make(chan error, 1),
431431
callEnter: make(chan struct{}),
@@ -979,23 +979,26 @@ func (b *batch) Run(ctx context.Context) error {
979979
return fmt.Errorf("handleSpend error: %w", err)
980980
}
981981

982+
case err := <-b.spendErrChan:
983+
b.writeToSpendErrChan(ctx, err)
984+
985+
return fmt.Errorf("spend notifier failed: %w", err)
986+
982987
case conf := <-b.confChan:
983988
if err := b.handleConf(runCtx, conf); err != nil {
984989
return fmt.Errorf("handleConf error: %w", err)
985990
}
986991

987992
return nil
988993

994+
// A re-org has been detected. We set the batch state back to
995+
// open since our batch transaction is no longer present in any
996+
// block. We can accept more sweeps and try to publish.
989997
case <-b.reorgChan:
990998
b.state = Open
991999
b.Warnf("reorg detected, batch is able to " +
9921000
"accept new sweeps")
9931001

994-
err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
995-
if err != nil {
996-
return fmt.Errorf("monitorSpend error: %w", err)
997-
}
998-
9991002
case testReq := <-b.testReqs:
10001003
testReq.handler()
10011004
close(testReq.quit)
@@ -1812,44 +1815,31 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
18121815
// of the batch transaction gets confirmed, due to the uncertainty of RBF
18131816
// replacements and network propagation, we can always detect the transaction.
18141817
func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
1815-
spendCtx, cancel := context.WithCancel(ctx)
1818+
if b.spendChan != nil || b.spendErrChan != nil || b.reorgChan != nil {
1819+
return fmt.Errorf("an attempt to run monitorSpend multiple " +
1820+
"times per batch")
1821+
}
18161822

1817-
spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn(
1818-
spendCtx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
1823+
reorgChan := make(chan struct{}, 1)
1824+
1825+
spendChan, spendErrChan, err := b.chainNotifier.RegisterSpendNtfn(
1826+
ctx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
18191827
primarySweep.initiationHeight,
1828+
lndclient.WithReOrgChan(reorgChan),
18201829
)
18211830
if err != nil {
1822-
cancel()
1823-
1824-
return err
1831+
return fmt.Errorf("failed to register spend notifier for "+
1832+
"primary sweep %v, pkscript %x, height %d: %w",
1833+
primarySweep.outpoint, primarySweep.htlc.PkScript,
1834+
primarySweep.initiationHeight, err)
18251835
}
18261836

1827-
b.wg.Add(1)
1828-
go func() {
1829-
defer cancel()
1830-
defer b.wg.Done()
1837+
b.Infof("monitoring spend for outpoint %s",
1838+
primarySweep.outpoint.String())
18311839

1832-
b.Infof("monitoring spend for outpoint %s",
1833-
primarySweep.outpoint.String())
1834-
1835-
select {
1836-
case spend := <-spendChan:
1837-
select {
1838-
case b.spendChan <- spend:
1839-
1840-
case <-ctx.Done():
1841-
}
1842-
1843-
case err := <-spendErr:
1844-
b.writeToSpendErrChan(ctx, err)
1845-
1846-
b.writeToErrChan(
1847-
fmt.Errorf("spend error: %w", err),
1848-
)
1849-
1850-
case <-ctx.Done():
1851-
}
1852-
}()
1840+
b.spendChan = spendChan
1841+
b.spendErrChan = spendErrChan
1842+
b.reorgChan = reorgChan
18531843

18541844
return nil
18551845
}
@@ -1862,14 +1852,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18621852
return fmt.Errorf("can't find primarySweep")
18631853
}
18641854

1865-
reorgChan := make(chan struct{})
1866-
18671855
confCtx, cancel := context.WithCancel(ctx)
18681856

18691857
confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
18701858
confCtx, b.batchTxid, b.batchPkScript, batchConfHeight,
18711859
primarySweep.initiationHeight,
1872-
lndclient.WithReOrgChan(reorgChan),
18731860
)
18741861
if err != nil {
18751862
cancel()
@@ -1895,18 +1882,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18951882
b.writeToErrChan(fmt.Errorf("confirmations "+
18961883
"monitoring error: %w", err))
18971884

1898-
case <-reorgChan:
1899-
// A re-org has been detected. We set the batch
1900-
// state back to open since our batch
1901-
// transaction is no longer present in any
1902-
// block. We can accept more sweeps and try to
1903-
// publish new transactions, at this point we
1904-
// need to monitor again for a new spend.
1905-
select {
1906-
case b.reorgChan <- struct{}{}:
1907-
case <-ctx.Done():
1908-
}
1909-
19101885
case <-ctx.Done():
19111886
}
19121887
}()
@@ -2395,12 +2370,6 @@ func (b *batch) writeToErrChan(err error) {
23952370

23962371
// writeToSpendErrChan sends an error to spend error channels of all the sweeps.
23972372
func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
2398-
done, err := b.scheduleNextCall()
2399-
if err != nil {
2400-
done()
2401-
2402-
return
2403-
}
24042373
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
24052374
for _, s := range b.sweeps {
24062375
// If the sweep's notifier is empty then this means that a swap
@@ -2412,7 +2381,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
24122381

24132382
notifiers = append(notifiers, s.notifier)
24142383
}
2415-
done()
24162384

24172385
for _, notifier := range notifiers {
24182386
select {

0 commit comments

Comments
 (0)