Skip to content

Commit bce9b5d

Browse files
authored
Merge pull request #975 from starius/spend-reorgs
sweepbatcher: fix reorg detection
2 parents 2176e52 + 534c71b commit bce9b5d

File tree

8 files changed

+155
-95
lines changed

8 files changed

+155
-95
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ require (
3535
github.com/stretchr/testify v1.10.0
3636
github.com/urfave/cli v1.22.14
3737
go.etcd.io/bbolt v1.3.11
38-
golang.org/x/net v0.38.0
3938
golang.org/x/sync v0.12.0
4039
google.golang.org/grpc v1.64.1
4140
google.golang.org/protobuf v1.34.2
@@ -184,6 +183,7 @@ require (
184183
golang.org/x/crypto v0.36.0 // indirect
185184
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
186185
golang.org/x/mod v0.21.0 // indirect
186+
golang.org/x/net v0.38.0 // indirect
187187
golang.org/x/sys v0.31.0 // indirect
188188
golang.org/x/term v0.30.0 // indirect
189189
golang.org/x/text v0.23.0 // indirect

loopout.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,7 +1246,11 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
12461246
s.height = notification.(int32)
12471247
timerChan = s.timerFactory(repushDelay)
12481248

1249+
s.log.Infof("Received block %d", s.height)
1250+
12491251
case <-timerChan:
1252+
s.log.Infof("Checking the sweep")
1253+
12501254
// canSweep will return false if the preimage is
12511255
// not revealed yet but the conf target is closer than
12521256
// 20 blocks. In this case to be sure we won't attempt
@@ -1268,6 +1272,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
12681272
}
12691273

12701274
// Send the sweep to the sweeper.
1275+
s.log.Infof("(Re)adding the sweep to sweepbatcher")
12711276
err := s.batcher.AddSweep(ctx, &sweepReq)
12721277
if err != nil {
12731278
return nil, err

sweepbatcher/sweep_batch.go

Lines changed: 84 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,10 @@ type batchConfig struct {
169169
// initial delay completion and publishing the batch transaction.
170170
batchPublishDelay time.Duration
171171

172-
// noBumping instructs sweepbatcher not to fee bump itself and rely on
173-
// external source of fee rates (FeeRateProvider).
174-
noBumping bool
172+
// customFeeRate provides custom min fee rate per swap. The batch uses
173+
// max of the fee rates of its swaps. In this mode confTarget is
174+
// ignored and fee bumping by sweepbatcher is disabled.
175+
customFeeRate FeeRateProvider
175176

176177
// txLabeler is a function generating a transaction label. It is called
177178
// before publishing a batch transaction. Batch ID is passed to it.
@@ -232,6 +233,10 @@ type batch struct {
232233
// spendChan is the channel over which spend notifications are received.
233234
spendChan chan *chainntnfs.SpendDetail
234235

236+
// spendErrChan is the channel over which spend notifier errors are
237+
// received.
238+
spendErrChan chan error
239+
235240
// confChan is the channel over which confirmation notifications are
236241
// received.
237242
confChan chan *chainntnfs.TxConfirmation
@@ -378,9 +383,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
378383
id: -1,
379384
state: Open,
380385
sweeps: make(map[wire.OutPoint]sweep),
381-
spendChan: make(chan *chainntnfs.SpendDetail),
382386
confChan: make(chan *chainntnfs.TxConfirmation, 1),
383-
reorgChan: make(chan struct{}, 1),
384387
testReqs: make(chan *testRequest),
385388
errChan: make(chan error, 1),
386389
callEnter: make(chan struct{}),
@@ -423,9 +426,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
423426
state: bk.state,
424427
primarySweepID: bk.primaryID,
425428
sweeps: bk.sweeps,
426-
spendChan: make(chan *chainntnfs.SpendDetail),
427429
confChan: make(chan *chainntnfs.TxConfirmation, 1),
428-
reorgChan: make(chan struct{}, 1),
429430
testReqs: make(chan *testRequest),
430431
errChan: make(chan error, 1),
431432
callEnter: make(chan struct{}),
@@ -723,6 +724,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) {
723724
// lower that minFeeRate of other sweeps (so it is
724725
// applied).
725726
if b.rbfCache.FeeRate < s.minFeeRate {
727+
b.Infof("Increasing feerate of the batch "+
728+
"from %v to %v", b.rbfCache.FeeRate,
729+
s.minFeeRate)
726730
b.rbfCache.FeeRate = s.minFeeRate
727731
}
728732
}
@@ -769,6 +773,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) {
769773
// Update FeeRate. Max(s.minFeeRate) for all the sweeps of
770774
// the batch is the basis for fee bumps.
771775
if b.rbfCache.FeeRate < s.minFeeRate {
776+
b.Infof("Increasing feerate of the batch "+
777+
"from %v to %v", b.rbfCache.FeeRate,
778+
s.minFeeRate)
772779
b.rbfCache.FeeRate = s.minFeeRate
773780
b.rbfCache.SkipNextBump = true
774781
}
@@ -968,6 +975,12 @@ func (b *batch) Run(ctx context.Context) error {
968975
continue
969976
}
970977

978+
// Update feerate of sweeps. This is normally done by
979+
// AddSweep, but it may not be called after the sweep
980+
// is confirmed, but fresh feerate is still needed to
981+
// keep publishing in case of reorg.
982+
b.updateFeeRate(ctx)
983+
971984
err := b.publish(ctx)
972985
if err != nil {
973986
return fmt.Errorf("publish error: %w", err)
@@ -979,23 +992,26 @@ func (b *batch) Run(ctx context.Context) error {
979992
return fmt.Errorf("handleSpend error: %w", err)
980993
}
981994

995+
case err := <-b.spendErrChan:
996+
b.writeToSpendErrChan(ctx, err)
997+
998+
return fmt.Errorf("spend notifier failed: %w", err)
999+
9821000
case conf := <-b.confChan:
9831001
if err := b.handleConf(runCtx, conf); err != nil {
9841002
return fmt.Errorf("handleConf error: %w", err)
9851003
}
9861004

9871005
return nil
9881006

1007+
// A re-org has been detected. We set the batch state back to
1008+
// open since our batch transaction is no longer present in any
1009+
// block. We can accept more sweeps and try to publish.
9891010
case <-b.reorgChan:
9901011
b.state = Open
9911012
b.Warnf("reorg detected, batch is able to " +
9921013
"accept new sweeps")
9931014

994-
err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
995-
if err != nil {
996-
return fmt.Errorf("monitorSpend error: %w", err)
997-
}
998-
9991015
case testReq := <-b.testReqs:
10001016
testReq.handler()
10011017
close(testReq.quit)
@@ -1013,6 +1029,41 @@ func (b *batch) Run(ctx context.Context) error {
10131029
}
10141030
}
10151031

1032+
// updateFeeRate gets fresh values of minFeeRate for sweeps and updates the
1033+
// feerate of the batch if needed. This method must be called from event loop.
1034+
func (b *batch) updateFeeRate(ctx context.Context) {
1035+
for outpoint, s := range b.sweeps {
1036+
minFeeRate, err := minimumSweepFeeRate(
1037+
ctx, b.cfg.customFeeRate, b.wallet,
1038+
s.swapHash, s.outpoint, s.confTarget,
1039+
)
1040+
if err != nil {
1041+
b.Warnf("failed to determine feerate for sweep %v of "+
1042+
"swap %x, confTarget %d: %w", s.outpoint,
1043+
s.swapHash[:6], s.confTarget, err)
1044+
continue
1045+
}
1046+
1047+
if minFeeRate <= s.minFeeRate {
1048+
continue
1049+
}
1050+
1051+
b.Infof("Increasing feerate of sweep %v of swap %x from %v "+
1052+
"to %v", s.outpoint, s.swapHash[:6], s.minFeeRate,
1053+
minFeeRate)
1054+
s.minFeeRate = minFeeRate
1055+
b.sweeps[outpoint] = s
1056+
1057+
if s.minFeeRate <= b.rbfCache.FeeRate {
1058+
continue
1059+
}
1060+
1061+
b.Infof("Increasing feerate of the batch from %v to %v",
1062+
b.rbfCache.FeeRate, s.minFeeRate)
1063+
b.rbfCache.FeeRate = s.minFeeRate
1064+
}
1065+
}
1066+
10161067
// testRunInEventLoop runs a function in the event loop blocking until
10171068
// the function returns. For unit tests only!
10181069
func (b *batch) testRunInEventLoop(ctx context.Context, handler func()) {
@@ -1790,7 +1841,7 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
17901841

17911842
// Set the initial value for our fee rate.
17921843
b.rbfCache.FeeRate = rate
1793-
} else if !b.cfg.noBumping {
1844+
} else if noBumping := b.cfg.customFeeRate != nil; !noBumping {
17941845
if b.rbfCache.SkipNextBump {
17951846
// Skip fee bumping, unset the flag, to bump next time.
17961847
b.rbfCache.SkipNextBump = false
@@ -1812,44 +1863,33 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
18121863
// of the batch transaction gets confirmed, due to the uncertainty of RBF
18131864
// replacements and network propagation, we can always detect the transaction.
18141865
func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
1815-
spendCtx, cancel := context.WithCancel(ctx)
1866+
if b.spendChan != nil || b.spendErrChan != nil || b.reorgChan != nil {
1867+
return fmt.Errorf("an attempt to run monitorSpend multiple " +
1868+
"times per batch")
1869+
}
1870+
1871+
reorgChan := make(chan struct{}, 1)
18161872

1817-
spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn(
1818-
spendCtx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
1873+
spendChan, spendErrChan, err := b.chainNotifier.RegisterSpendNtfn(
1874+
ctx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
18191875
primarySweep.initiationHeight,
1876+
lndclient.WithReOrgChan(reorgChan),
18201877
)
18211878
if err != nil {
1822-
cancel()
1823-
1824-
return err
1879+
return fmt.Errorf("failed to register spend notifier for "+
1880+
"primary sweep %v, pkscript %x, height %d: %w",
1881+
primarySweep.outpoint, primarySweep.htlc.PkScript,
1882+
primarySweep.initiationHeight, err)
18251883
}
18261884

1827-
b.wg.Add(1)
1828-
go func() {
1829-
defer cancel()
1830-
defer b.wg.Done()
1831-
1832-
b.Infof("monitoring spend for outpoint %s",
1833-
primarySweep.outpoint.String())
1885+
b.Infof("monitoring spend for outpoint %s",
1886+
primarySweep.outpoint.String())
18341887

1835-
select {
1836-
case spend := <-spendChan:
1837-
select {
1838-
case b.spendChan <- spend:
1839-
1840-
case <-ctx.Done():
1841-
}
1842-
1843-
case err := <-spendErr:
1844-
b.writeToSpendErrChan(ctx, err)
1845-
1846-
b.writeToErrChan(
1847-
fmt.Errorf("spend error: %w", err),
1848-
)
1849-
1850-
case <-ctx.Done():
1851-
}
1852-
}()
1888+
// This is safe to do as we always call monitorSpend from the event
1889+
// loop's goroutine.
1890+
b.spendChan = spendChan
1891+
b.spendErrChan = spendErrChan
1892+
b.reorgChan = reorgChan
18531893

18541894
return nil
18551895
}
@@ -1862,14 +1902,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18621902
return fmt.Errorf("can't find primarySweep")
18631903
}
18641904

1865-
reorgChan := make(chan struct{})
1866-
18671905
confCtx, cancel := context.WithCancel(ctx)
18681906

18691907
confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
18701908
confCtx, b.batchTxid, b.batchPkScript, batchConfHeight,
18711909
primarySweep.initiationHeight,
1872-
lndclient.WithReOrgChan(reorgChan),
18731910
)
18741911
if err != nil {
18751912
cancel()
@@ -1895,18 +1932,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
18951932
b.writeToErrChan(fmt.Errorf("confirmations "+
18961933
"monitoring error: %w", err))
18971934

1898-
case <-reorgChan:
1899-
// A re-org has been detected. We set the batch
1900-
// state back to open since our batch
1901-
// transaction is no longer present in any
1902-
// block. We can accept more sweeps and try to
1903-
// publish new transactions, at this point we
1904-
// need to monitor again for a new spend.
1905-
select {
1906-
case b.reorgChan <- struct{}{}:
1907-
case <-ctx.Done():
1908-
}
1909-
19101935
case <-ctx.Done():
19111936
}
19121937
}()
@@ -2395,12 +2420,6 @@ func (b *batch) writeToErrChan(err error) {
23952420

23962421
// writeToSpendErrChan sends an error to spend error channels of all the sweeps.
23972422
func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
2398-
done, err := b.scheduleNextCall()
2399-
if err != nil {
2400-
done()
2401-
2402-
return
2403-
}
24042423
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
24052424
for _, s := range b.sweeps {
24062425
// If the sweep's notifier is empty then this means that a swap
@@ -2412,7 +2431,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
24122431

24132432
notifiers = append(notifiers, s.notifier)
24142433
}
2415-
done()
24162434

24172435
for _, notifier := range notifiers {
24182436
select {

0 commit comments

Comments
 (0)