diff --git a/go.mod b/go.mod index a5f687bee..b9f2f2b3d 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/jessevdk/go-flags v1.4.0 github.com/lib/pq v1.10.9 github.com/lightninglabs/aperture v0.3.13-beta - github.com/lightninglabs/lndclient v0.19.0-7 + github.com/lightninglabs/lndclient v0.19.0-12 github.com/lightninglabs/loop/looprpc v1.0.7 github.com/lightninglabs/loop/swapserverrpc v1.0.14 github.com/lightninglabs/taproot-assets v0.6.0-rc2.0.20250526132410-324bce0a1a7b @@ -35,7 +35,6 @@ require ( github.com/stretchr/testify v1.10.0 github.com/urfave/cli v1.22.14 go.etcd.io/bbolt v1.3.11 - golang.org/x/net v0.38.0 golang.org/x/sync v0.12.0 google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 @@ -185,6 +184,7 @@ require ( golang.org/x/crypto v0.36.0 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/mod v0.21.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/term v0.30.0 // indirect golang.org/x/text v0.23.0 // indirect diff --git a/go.sum b/go.sum index da5a4be71..0a589f67b 100644 --- a/go.sum +++ b/go.sum @@ -1108,8 +1108,8 @@ github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.3 h1:NuDp6Z+QNMSzZ/+RzWsjgAgQSr/REDxTiHmTczZxlXA= github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.3/go.mod h1:bDnEKRN1u13NFBuy/C+bFLhxA5bfd3clT25y76QY0AM= -github.com/lightninglabs/lndclient v0.19.0-7 h1:8+wGQnO8KSUq9elzGLscBUGchID+bWvrpX2qCo+tU48= -github.com/lightninglabs/lndclient v0.19.0-7/go.mod h1:35d50tEMFxlJlKTZGYA6EdOllPsbxS4FUmEVbETUx+Q= +github.com/lightninglabs/lndclient v0.19.0-12 h1:aSIKfnvnHKiyFWppUGHJG5fn8VoF5WG5Lx958ksLmqs= +github.com/lightninglabs/lndclient v0.19.0-12/go.mod h1:cicoJY1AwZuRVXGD8Knp50TRT7TGBmw1k37uPQsGQiw= github.com/lightninglabs/migrate/v4 v4.18.2-9023d66a-fork-pr-2 h1:eFjp1dIB2BhhQp/THKrjLdlYuPugO9UU4kDqu91OX/Q= github.com/lightninglabs/migrate/v4 v4.18.2-9023d66a-fork-pr-2/go.mod h1:99BKpIi6ruaaXRM1A77eqZ+FWPQ3cfRa+ZVy5bmWMaY= github.com/lightninglabs/neutrino v0.16.1 h1:5Kz4ToxncEVkpKC6fwUjXKtFKJhuxlG3sBB3MdJTJjs= diff --git a/instantout/reservation/actions_test.go b/instantout/reservation/actions_test.go index ff4b6a664..a11cf09f0 100644 --- a/instantout/reservation/actions_test.go +++ b/instantout/reservation/actions_test.go @@ -187,8 +187,9 @@ func (m *MockChainNotifier) RegisterBlockEpochNtfn(ctx context.Context) ( } func (m *MockChainNotifier) RegisterSpendNtfn(ctx context.Context, - outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( - chan *chainntnfs.SpendDetail, chan error, error) { + outpoint *wire.OutPoint, pkScript []byte, heightHint int32, + options ...lndclient.NotifierOption) (chan *chainntnfs.SpendDetail, + chan error, error) { args := m.Called(ctx, pkScript, heightHint) return args.Get(0).(chan *chainntnfs.SpendDetail), args.Get(1).(chan error), args.Error(2) diff --git a/loopout.go b/loopout.go index 6fc740916..182f9a5a7 100644 --- a/loopout.go +++ b/loopout.go @@ -1246,7 +1246,11 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, s.height = notification.(int32) timerChan = s.timerFactory(repushDelay) + s.log.Infof("Received block %d", s.height) + case <-timerChan: + s.log.Infof("Checking the sweep") + // canSweep will return false if the preimage is // not revealed yet but the conf target is closer than // 20 blocks. In this case to be sure we won't attempt @@ -1268,6 +1272,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context, } // Send the sweep to the sweeper. + s.log.Infof("(Re)adding the sweep to sweepbatcher") err := s.batcher.AddSweep(ctx, &sweepReq) if err != nil { return nil, err diff --git a/staticaddr/deposit/manager_test.go b/staticaddr/deposit/manager_test.go index 007414c2c..0c4c6ea23 100644 --- a/staticaddr/deposit/manager_test.go +++ b/staticaddr/deposit/manager_test.go @@ -199,8 +199,9 @@ func (m *MockChainNotifier) RegisterBlockEpochNtfn(ctx context.Context) ( } func (m *MockChainNotifier) RegisterSpendNtfn(ctx context.Context, - outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( - chan *chainntnfs.SpendDetail, chan error, error) { + outpoint *wire.OutPoint, pkScript []byte, heightHint int32, + _ ...lndclient.NotifierOption) (chan *chainntnfs.SpendDetail, + chan error, error) { args := m.Called(ctx, pkScript, heightHint) return args.Get(0).(chan *chainntnfs.SpendDetail), diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 933077fcf..f5ba4cf32 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -169,9 +169,10 @@ type batchConfig struct { // initial delay completion and publishing the batch transaction. batchPublishDelay time.Duration - // noBumping instructs sweepbatcher not to fee bump itself and rely on - // external source of fee rates (FeeRateProvider). - noBumping bool + // customFeeRate provides custom min fee rate per swap. The batch uses + // max of the fee rates of its swaps. In this mode confTarget is + // ignored and fee bumping by sweepbatcher is disabled. + customFeeRate FeeRateProvider // txLabeler is a function generating a transaction label. It is called // before publishing a batch transaction. Batch ID is passed to it. @@ -232,6 +233,10 @@ type batch struct { // spendChan is the channel over which spend notifications are received. spendChan chan *chainntnfs.SpendDetail + // spendErrChan is the channel over which spend notifier errors are + // received. + spendErrChan chan error + // confChan is the channel over which confirmation notifications are // received. confChan chan *chainntnfs.TxConfirmation @@ -378,9 +383,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch { id: -1, state: Open, sweeps: make(map[wire.OutPoint]sweep), - spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), - reorgChan: make(chan struct{}, 1), testReqs: make(chan *testRequest), errChan: make(chan error, 1), callEnter: make(chan struct{}), @@ -423,9 +426,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) { state: bk.state, primarySweepID: bk.primaryID, sweeps: bk.sweeps, - spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), - reorgChan: make(chan struct{}, 1), testReqs: make(chan *testRequest), errChan: make(chan error, 1), callEnter: make(chan struct{}), @@ -723,6 +724,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) { // lower that minFeeRate of other sweeps (so it is // applied). if b.rbfCache.FeeRate < s.minFeeRate { + b.Infof("Increasing feerate of the batch "+ + "from %v to %v", b.rbfCache.FeeRate, + s.minFeeRate) b.rbfCache.FeeRate = s.minFeeRate } } @@ -769,6 +773,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) { // Update FeeRate. Max(s.minFeeRate) for all the sweeps of // the batch is the basis for fee bumps. if b.rbfCache.FeeRate < s.minFeeRate { + b.Infof("Increasing feerate of the batch "+ + "from %v to %v", b.rbfCache.FeeRate, + s.minFeeRate) b.rbfCache.FeeRate = s.minFeeRate b.rbfCache.SkipNextBump = true } @@ -968,6 +975,45 @@ func (b *batch) Run(ctx context.Context) error { continue } + // Update feerate of sweeps. This is normally done by + // AddSweep, but it may not be called after the sweep + // is confirmed, but fresh feerate is still needed to + // keep publishing in case of reorg. + for outpoint, s := range b.sweeps { + minFeeRate, err := minimumSweepFeeRate( + ctx, b.cfg.customFeeRate, b.wallet, + s.swapHash, s.outpoint, s.confTarget, + ) + if err != nil { + b.Warnf("failed to determine feerate "+ + "for sweep %v of swap %x, "+ + "confTarget %d: %w", s.outpoint, + s.swapHash[:6], s.confTarget, + err) + continue + } + + if minFeeRate <= s.minFeeRate { + continue + } + + b.Infof("Increasing feerate of sweep %v of "+ + "swap %x from %v to %v", s.outpoint, + s.swapHash[:6], s.minFeeRate, + minFeeRate) + s.minFeeRate = minFeeRate + b.sweeps[outpoint] = s + + if s.minFeeRate <= b.rbfCache.FeeRate { + continue + } + + b.Infof("Increasing feerate of the batch "+ + "from %v to %v", b.rbfCache.FeeRate, + s.minFeeRate) + b.rbfCache.FeeRate = s.minFeeRate + } + err := b.publish(ctx) if err != nil { return fmt.Errorf("publish error: %w", err) @@ -979,6 +1025,11 @@ func (b *batch) Run(ctx context.Context) error { return fmt.Errorf("handleSpend error: %w", err) } + case err := <-b.spendErrChan: + b.writeToSpendErrChan(ctx, err) + + return fmt.Errorf("spend notifier failed: %w", err) + case conf := <-b.confChan: if err := b.handleConf(runCtx, conf); err != nil { return fmt.Errorf("handleConf error: %w", err) @@ -986,16 +1037,14 @@ func (b *batch) Run(ctx context.Context) error { return nil + // A re-org has been detected. We set the batch state back to + // open since our batch transaction is no longer present in any + // block. We can accept more sweeps and try to publish. case <-b.reorgChan: b.state = Open b.Warnf("reorg detected, batch is able to " + "accept new sweeps") - err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID]) - if err != nil { - return fmt.Errorf("monitorSpend error: %w", err) - } - case testReq := <-b.testReqs: testReq.handler() close(testReq.quit) @@ -1790,7 +1839,7 @@ func (b *batch) updateRbfRate(ctx context.Context) error { // Set the initial value for our fee rate. b.rbfCache.FeeRate = rate - } else if !b.cfg.noBumping { + } else if noBumping := b.cfg.customFeeRate != nil; !noBumping { if b.rbfCache.SkipNextBump { // Skip fee bumping, unset the flag, to bump next time. b.rbfCache.SkipNextBump = false @@ -1812,44 +1861,31 @@ func (b *batch) updateRbfRate(ctx context.Context) error { // of the batch transaction gets confirmed, due to the uncertainty of RBF // replacements and network propagation, we can always detect the transaction. func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error { - spendCtx, cancel := context.WithCancel(ctx) + if b.spendChan != nil || b.spendErrChan != nil || b.reorgChan != nil { + return fmt.Errorf("an attempt to run monitorSpend multiple " + + "times per batch") + } - spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( - spendCtx, &primarySweep.outpoint, primarySweep.htlc.PkScript, + reorgChan := make(chan struct{}, 1) + + spendChan, spendErrChan, err := b.chainNotifier.RegisterSpendNtfn( + ctx, &primarySweep.outpoint, primarySweep.htlc.PkScript, primarySweep.initiationHeight, + lndclient.WithReOrgChan(reorgChan), ) if err != nil { - cancel() - - return err + return fmt.Errorf("failed to register spend notifier for "+ + "primary sweep %v, pkscript %x, height %d: %w", + primarySweep.outpoint, primarySweep.htlc.PkScript, + primarySweep.initiationHeight, err) } - b.wg.Add(1) - go func() { - defer cancel() - defer b.wg.Done() - - b.Infof("monitoring spend for outpoint %s", - primarySweep.outpoint.String()) - - select { - case spend := <-spendChan: - select { - case b.spendChan <- spend: - - case <-ctx.Done(): - } - - case err := <-spendErr: - b.writeToSpendErrChan(ctx, err) - - b.writeToErrChan( - fmt.Errorf("spend error: %w", err), - ) + b.Infof("monitoring spend for outpoint %s", + primarySweep.outpoint.String()) - case <-ctx.Done(): - } - }() + b.spendChan = spendChan + b.spendErrChan = spendErrChan + b.reorgChan = reorgChan return nil } @@ -1862,14 +1898,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { return fmt.Errorf("can't find primarySweep") } - reorgChan := make(chan struct{}) - confCtx, cancel := context.WithCancel(ctx) confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn( confCtx, b.batchTxid, b.batchPkScript, batchConfHeight, primarySweep.initiationHeight, - lndclient.WithReOrgChan(reorgChan), ) if err != nil { cancel() @@ -1895,18 +1928,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { b.writeToErrChan(fmt.Errorf("confirmations "+ "monitoring error: %w", err)) - case <-reorgChan: - // A re-org has been detected. We set the batch - // state back to open since our batch - // transaction is no longer present in any - // block. We can accept more sweeps and try to - // publish new transactions, at this point we - // need to monitor again for a new spend. - select { - case b.reorgChan <- struct{}{}: - case <-ctx.Done(): - } - case <-ctx.Done(): } }() @@ -2395,12 +2416,6 @@ func (b *batch) writeToErrChan(err error) { // writeToSpendErrChan sends an error to spend error channels of all the sweeps. func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) { - done, err := b.scheduleNextCall() - if err != nil { - done() - - return - } notifiers := make([]*SpendNotifier, 0, len(b.sweeps)) for _, s := range b.sweeps { // If the sweep's notifier is empty then this means that a swap @@ -2412,7 +2427,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) { notifiers = append(notifiers, s.notifier) } - done() for _, notifier := range notifiers { select { diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 6bab035e7..9967c1573 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1522,28 +1522,12 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, // Find minimum fee rate for the sweep. Use customFeeRate if it is // provided, otherwise use wallet's EstimateFeeRate. - var minFeeRate chainfee.SatPerKWeight - if b.customFeeRate != nil { - minFeeRate, err = b.customFeeRate(ctx, swapHash, outpoint) - if err != nil { - return nil, fmt.Errorf("failed to fetch min fee rate "+ - "for %x: %w", swapHash[:6], err) - } - if minFeeRate < chainfee.AbsoluteFeePerKwFloor { - return nil, fmt.Errorf("min fee rate too low (%v) for "+ - "%x", minFeeRate, swapHash[:6]) - } - } else { - if s.ConfTarget == 0 { - warnf("Fee estimation was requested for zero "+ - "confTarget for sweep %x.", swapHash[:6]) - } - minFeeRate, err = b.wallet.EstimateFeeRate(ctx, s.ConfTarget) - if err != nil { - return nil, fmt.Errorf("failed to estimate fee rate "+ - "for %x, confTarget=%d: %w", swapHash[:6], - s.ConfTarget, err) - } + minFeeRate, err := minimumSweepFeeRate( + ctx, b.customFeeRate, b.wallet, + swapHash, outpoint, s.ConfTarget, + ) + if err != nil { + return nil, err } return &sweep{ @@ -1567,11 +1551,53 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash, }, nil } +// feeRateEstimator determines feerate by confTarget. +type feeRateEstimator interface { + // EstimateFeeRate returns feerate corresponding to the confTarget. + EstimateFeeRate(ctx context.Context, + confTarget int32) (chainfee.SatPerKWeight, error) +} + +// minimumSweepFeeRate determines minimum feerate for a sweep. +func minimumSweepFeeRate(ctx context.Context, customFeeRate FeeRateProvider, + wallet feeRateEstimator, swapHash lntypes.Hash, outpoint wire.OutPoint, + sweepConfTarget int32) (chainfee.SatPerKWeight, error) { + + // Find minimum fee rate for the sweep. Use customFeeRate if it is + // provided, otherwise use wallet's EstimateFeeRate. + if customFeeRate != nil { + minFeeRate, err := customFeeRate(ctx, swapHash, outpoint) + if err != nil { + return 0, fmt.Errorf("failed to fetch min fee rate "+ + "for %x: %w", swapHash[:6], err) + } + if minFeeRate < chainfee.AbsoluteFeePerKwFloor { + return 0, fmt.Errorf("min fee rate too low (%v) for "+ + "%x", minFeeRate, swapHash[:6]) + } + + return minFeeRate, nil + } + + if sweepConfTarget == 0 { + warnf("Fee estimation was requested for zero "+ + "confTarget for sweep %x.", swapHash[:6]) + } + minFeeRate, err := wallet.EstimateFeeRate(ctx, sweepConfTarget) + if err != nil { + return 0, fmt.Errorf("failed to estimate fee rate "+ + "for %x, confTarget=%d: %w", swapHash[:6], + sweepConfTarget, err) + } + + return minFeeRate, nil +} + // newBatchConfig creates new batch config. func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig { return batchConfig{ maxTimeoutDistance: maxTimeoutDistance, - noBumping: b.customFeeRate != nil, + customFeeRate: b.customFeeRate, txLabeler: b.txLabeler, customMuSig2Signer: b.customMuSig2Signer, presignedHelper: b.presignedHelper, diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 590d83ae4..126724435 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -2,6 +2,7 @@ package sweepbatcher import ( "context" + "database/sql" "errors" "fmt" "os" @@ -3997,6 +3998,9 @@ func testSweepBatcherCloseDuringAdding(t *testing.T, store testStore, if errors.Is(err, context.Canceled) { break } + if errors.Is(err, sql.ErrTxDone) { + break + } require.NoError(t, err) } }() @@ -4834,8 +4838,6 @@ func testFeeRateGrows(t *testing.T, store testStore, // Now update fee rate of second sweep (which is not primary) to // feeRateHigh. Fee rate of sweep 1 is still feeRateLow. setFeeRate(swapHash2, feeRateHigh) - require.NoError(t, batcher.AddSweep(ctx, &sweepReq1)) - require.NoError(t, batcher.AddSweep(ctx, &sweepReq2)) // Tick tock next block. err = lnd.NotifyHeight(603) diff --git a/test/chainnotifier_mock.go b/test/chainnotifier_mock.go index a4fae0e77..5d8cba1a0 100644 --- a/test/chainnotifier_mock.go +++ b/test/chainnotifier_mock.go @@ -2,6 +2,7 @@ package test import ( "bytes" + "context" "sync" "time" @@ -10,7 +11,6 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lnrpc/chainrpc" - "golang.org/x/net/context" ) type mockChainNotifier struct { @@ -51,8 +51,9 @@ type ConfRegistration struct { } func (c *mockChainNotifier) RegisterSpendNtfn(ctx context.Context, - outpoint *wire.OutPoint, pkScript []byte, heightHint int32) ( - chan *chainntnfs.SpendDetail, chan error, error) { + outpoint *wire.OutPoint, pkScript []byte, heightHint int32, + opts ...lndclient.NotifierOption) (chan *chainntnfs.SpendDetail, + chan error, error) { spendChan0 := make(chan *chainntnfs.SpendDetail) spendErrChan := make(chan error, 1) diff --git a/test/lightning_client_mock.go b/test/lightning_client_mock.go index 947c6ec6a..bcd38e7ba 100644 --- a/test/lightning_client_mock.go +++ b/test/lightning_client_mock.go @@ -1,6 +1,7 @@ package test import ( + "context" "crypto/rand" "encoding/hex" "fmt" @@ -17,7 +18,6 @@ import ( "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/zpay32" - "golang.org/x/net/context" ) type mockLightningClient struct { diff --git a/test/router_mock.go b/test/router_mock.go index e3e3e6603..9ef84947e 100644 --- a/test/router_mock.go +++ b/test/router_mock.go @@ -1,9 +1,10 @@ package test import ( + "context" + "github.com/lightninglabs/lndclient" "github.com/lightningnetwork/lnd/lntypes" - "golang.org/x/net/context" ) type mockRouter struct {