Skip to content

sweepbatcher: fix reorg detection #975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ func testLoopOutSuccess(ctx *testContext, amt btcutil.Amount, hash lntypes.Hash,

ctx.AssertRegisterConf(true, 3)

ctx.NotifyConf(sweepTx)

ctx.assertStatus(loopdb.StateSuccess)

ctx.assertStoreFinished(loopdb.StateSuccess)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/jessevdk/go-flags v1.4.0
github.com/lib/pq v1.10.9
github.com/lightninglabs/aperture v0.3.13-beta
github.com/lightninglabs/lndclient v0.19.0-7
github.com/lightninglabs/lndclient v0.19.0-12
github.com/lightninglabs/loop/looprpc v1.0.7
github.com/lightninglabs/loop/swapserverrpc v1.0.14
github.com/lightninglabs/taproot-assets v0.6.0-rc2.0.20250526132410-324bce0a1a7b
Expand All @@ -35,7 +35,6 @@ require (
github.com/stretchr/testify v1.10.0
github.com/urfave/cli v1.22.14
go.etcd.io/bbolt v1.3.11
golang.org/x/net v0.38.0
golang.org/x/sync v0.12.0
google.golang.org/grpc v1.64.1
google.golang.org/protobuf v1.34.2
Expand Down Expand Up @@ -185,6 +184,7 @@ require (
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1108,8 +1108,8 @@ github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.3 h1:NuDp6Z+QNMSzZ/+RzWsjgAgQSr/REDxTiHmTczZxlXA=
github.com/lightninglabs/lightning-node-connect/hashmailrpc v1.0.3/go.mod h1:bDnEKRN1u13NFBuy/C+bFLhxA5bfd3clT25y76QY0AM=
github.com/lightninglabs/lndclient v0.19.0-7 h1:8+wGQnO8KSUq9elzGLscBUGchID+bWvrpX2qCo+tU48=
github.com/lightninglabs/lndclient v0.19.0-7/go.mod h1:35d50tEMFxlJlKTZGYA6EdOllPsbxS4FUmEVbETUx+Q=
github.com/lightninglabs/lndclient v0.19.0-12 h1:aSIKfnvnHKiyFWppUGHJG5fn8VoF5WG5Lx958ksLmqs=
github.com/lightninglabs/lndclient v0.19.0-12/go.mod h1:cicoJY1AwZuRVXGD8Knp50TRT7TGBmw1k37uPQsGQiw=
github.com/lightninglabs/migrate/v4 v4.18.2-9023d66a-fork-pr-2 h1:eFjp1dIB2BhhQp/THKrjLdlYuPugO9UU4kDqu91OX/Q=
github.com/lightninglabs/migrate/v4 v4.18.2-9023d66a-fork-pr-2/go.mod h1:99BKpIi6ruaaXRM1A77eqZ+FWPQ3cfRa+ZVy5bmWMaY=
github.com/lightninglabs/neutrino v0.16.1 h1:5Kz4ToxncEVkpKC6fwUjXKtFKJhuxlG3sBB3MdJTJjs=
Expand Down
5 changes: 3 additions & 2 deletions instantout/reservation/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ func (m *MockChainNotifier) RegisterBlockEpochNtfn(ctx context.Context) (
}

func (m *MockChainNotifier) RegisterSpendNtfn(ctx context.Context,
outpoint *wire.OutPoint, pkScript []byte, heightHint int32) (
chan *chainntnfs.SpendDetail, chan error, error) {
outpoint *wire.OutPoint, pkScript []byte, heightHint int32,
options ...lndclient.NotifierOption) (chan *chainntnfs.SpendDetail,
chan error, error) {

args := m.Called(ctx, pkScript, heightHint)
return args.Get(0).(chan *chainntnfs.SpendDetail), args.Get(1).(chan error), args.Error(2)
Expand Down
47 changes: 35 additions & 12 deletions loopout.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,33 +596,34 @@ func (s *loopOutSwap) executeSwap(globalCtx context.Context) error {
return nil
}

// Try to spend htlc and continue (rbf) until a spend has confirmed.
spend, err := s.waitForHtlcSpendConfirmedV2(
// Try to spend htlc and continue (rbf) until a spend has fully
// confirmed.
conf, err := s.waitForHtlcSpendConfirmedV2(
globalCtx, *htlcOutpoint, htlcValue,
)
if err != nil {
return err
}

// If spend details are nil, we resolved the swap without waiting for
// its spend, so we can exit.
if spend == nil {
// If conf details are nil, we resolved the swap without waiting for
// its confirmation, so we can exit.
if conf == nil {
return nil
}

// Inspect witness stack to see if it is a success transaction. We
// don't just try to match with the hash of our sweep tx, because it
// may be swept by a different (fee) sweep tx from a previous run.
htlcInput, err := swap.GetTxInputByOutpoint(
spend.Tx, htlcOutpoint,
conf.Tx, htlcOutpoint,
)
if err != nil {
return err
}

sweepSuccessful := s.htlc.IsSuccessWitness(htlcInput.Witness)
if sweepSuccessful {
s.cost.Onchain = spend.OnChainFeePortion
s.cost.Onchain = conf.OnChainFeePortion
s.state = loopdb.StateSuccess
} else {
s.state = loopdb.StateFailSweepTimeout
Expand Down Expand Up @@ -1140,10 +1141,12 @@ func (s *loopOutSwap) waitForConfirmedHtlc(globalCtx context.Context) (
// sweep or a server revocation tx.
func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
htlcOutpoint wire.OutPoint, htlcValue btcutil.Amount) (
*sweepbatcher.SpendDetail, error) {
*sweepbatcher.ConfDetail, error) {

spendChan := make(chan *sweepbatcher.SpendDetail)
spendErrChan := make(chan error, 1)
confChan := make(chan *sweepbatcher.ConfDetail, 1)
confErrChan := make(chan error, 1)
quitChan := make(chan bool, 1)

defer func() {
Expand All @@ -1153,6 +1156,8 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
notifier := sweepbatcher.SpendNotifier{
SpendChan: spendChan,
SpendErrChan: spendErrChan,
ConfChan: confChan,
ConfErrChan: confErrChan,
QuitChan: quitChan,
}

Expand Down Expand Up @@ -1192,15 +1197,28 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,

for {
select {
// Htlc spend, break loop.
// Htlc spend, but waiting for more confirmations in case of
// a reorg.
case spend := <-spendChan:
s.log.Infof("Htlc spend by tx: %v", spend.Tx.TxHash())

return spend, nil

// Spend notification error.
case err := <-spendErrChan:
return nil, err
return nil, fmt.Errorf("spend notification error: %w",
err)

// Htlc was spent and the sweep transaction is fully confirmed.
// Break from loop.
case conf := <-confChan:
s.log.Infof("Sweep tx %v fully confirmed in block %d",
conf.Tx.TxHash(), conf.BlockHeight)

return conf, nil

// Conf notification error.
case err := <-confErrChan:
return nil, fmt.Errorf("conf notification error: %w",
err)

// Receive status updates for our payment so that we can detect
// whether we've successfully pushed our preimage.
Expand Down Expand Up @@ -1246,7 +1264,11 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
s.height = notification.(int32)
timerChan = s.timerFactory(repushDelay)

s.log.Infof("Received block %d", s.height)

case <-timerChan:
s.log.Infof("Checking the sweep")

// canSweep will return false if the preimage is
// not revealed yet but the conf target is closer than
// 20 blocks. In this case to be sure we won't attempt
Expand All @@ -1268,6 +1290,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
}

// Send the sweep to the sweeper.
s.log.Infof("(Re)adding the sweep to sweepbatcher")
err := s.batcher.AddSweep(ctx, &sweepReq)
if err != nil {
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions loopout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,9 @@ func testCustomSweepConfTarget(t *testing.T) {
// confirmations.
ctx.AssertRegisterConf(true, 3)

// Notify the batch for the confirmation.
ctx.NotifyConf(sweepTx)

cfg.store.(*loopdb.StoreMock).AssertLoopOutState(loopdb.StateSuccess)
status = <-statusChan
require.Equal(t, loopdb.StateSuccess, status.State)
Expand Down Expand Up @@ -747,6 +750,9 @@ func testPreimagePush(t *testing.T) {
// confs.
ctx.AssertRegisterConf(true, 3)

// Notify the batch for the confirmation.
ctx.NotifyConf(sweepTx)

cfg.store.(*loopdb.StoreMock).AssertLoopOutState(loopdb.StateSuccess)
status := <-statusChan
require.Equal(t, loopdb.StateSuccess, status.State)
Expand Down Expand Up @@ -1105,6 +1111,9 @@ func TestLoopOutMuSig2Sweep(t *testing.T) {
// confs.
ctx.AssertRegisterConf(true, 3)

// Notify the batch for the confirmation.
ctx.NotifyConf(sweepTx)

cfg.store.(*loopdb.StoreMock).AssertLoopOutState(loopdb.StateSuccess)
status = <-statusChan
require.Equal(t, status.State, loopdb.StateSuccess)
Expand Down
5 changes: 3 additions & 2 deletions staticaddr/deposit/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ func (m *MockChainNotifier) RegisterBlockEpochNtfn(ctx context.Context) (
}

func (m *MockChainNotifier) RegisterSpendNtfn(ctx context.Context,
outpoint *wire.OutPoint, pkScript []byte, heightHint int32) (
chan *chainntnfs.SpendDetail, chan error, error) {
outpoint *wire.OutPoint, pkScript []byte, heightHint int32,
_ ...lndclient.NotifierOption) (chan *chainntnfs.SpendDetail,
chan error, error) {

args := m.Called(ctx, pkScript, heightHint)
return args.Get(0).(chan *chainntnfs.SpendDetail),
Expand Down
92 changes: 30 additions & 62 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ type batch struct {
// spendChan is the channel over which spend notifications are received.
spendChan chan *chainntnfs.SpendDetail

// spendErrChan is the channel over which spend notifier errors are
// received.
spendErrChan chan error

// confChan is the channel over which confirmation notifications are
// received.
confChan chan *chainntnfs.TxConfirmation
Expand Down Expand Up @@ -378,9 +382,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
id: -1,
state: Open,
sweeps: make(map[wire.OutPoint]sweep),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
Expand Down Expand Up @@ -423,9 +425,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
Expand Down Expand Up @@ -979,23 +979,26 @@ func (b *batch) Run(ctx context.Context) error {
return fmt.Errorf("handleSpend error: %w", err)
}

case err := <-b.spendErrChan:
b.writeToSpendErrChan(ctx, err)

return fmt.Errorf("spend notifier failed: %w", err)

case conf := <-b.confChan:
if err := b.handleConf(runCtx, conf); err != nil {
return fmt.Errorf("handleConf error: %w", err)
}

return nil

// A re-org has been detected. We set the batch state back to
// open since our batch transaction is no longer present in any
// block. We can accept more sweeps and try to publish.
case <-b.reorgChan:
b.state = Open
b.Warnf("reorg detected, batch is able to " +
"accept new sweeps")

err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
if err != nil {
return fmt.Errorf("monitorSpend error: %w", err)
}

case testReq := <-b.testReqs:
testReq.handler()
close(testReq.quit)
Expand Down Expand Up @@ -1812,44 +1815,31 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
// of the batch transaction gets confirmed, due to the uncertainty of RBF
// replacements and network propagation, we can always detect the transaction.
func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
spendCtx, cancel := context.WithCancel(ctx)
if b.spendChan != nil || b.spendErrChan != nil || b.reorgChan != nil {
return fmt.Errorf("an attempt to run monitorSpend multiple " +
"times per batch")
}

spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn(
spendCtx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
reorgChan := make(chan struct{}, 1)

spendChan, spendErrChan, err := b.chainNotifier.RegisterSpendNtfn(
ctx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
primarySweep.initiationHeight,
lndclient.WithReOrgChan(reorgChan),
)
if err != nil {
cancel()

return err
return fmt.Errorf("failed to register spend notifier for "+
"primary sweep %v, pkscript %x, height %d: %w",
primarySweep.outpoint, primarySweep.htlc.PkScript,
primarySweep.initiationHeight, err)
}

b.wg.Add(1)
go func() {
defer cancel()
defer b.wg.Done()
b.Infof("monitoring spend for outpoint %s",
primarySweep.outpoint.String())

b.Infof("monitoring spend for outpoint %s",
primarySweep.outpoint.String())

select {
case spend := <-spendChan:
select {
case b.spendChan <- spend:

case <-ctx.Done():
}

case err := <-spendErr:
b.writeToSpendErrChan(ctx, err)

b.writeToErrChan(
fmt.Errorf("spend error: %w", err),
)

case <-ctx.Done():
}
}()
b.spendChan = spendChan
b.spendErrChan = spendErrChan
b.reorgChan = reorgChan

return nil
}
Expand All @@ -1862,14 +1852,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
return fmt.Errorf("can't find primarySweep")
}

reorgChan := make(chan struct{})

confCtx, cancel := context.WithCancel(ctx)

confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
confCtx, b.batchTxid, b.batchPkScript, batchConfHeight,
primarySweep.initiationHeight,
lndclient.WithReOrgChan(reorgChan),
)
if err != nil {
cancel()
Expand All @@ -1895,18 +1882,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
b.writeToErrChan(fmt.Errorf("confirmations "+
"monitoring error: %w", err))

case <-reorgChan:
// A re-org has been detected. We set the batch
// state back to open since our batch
// transaction is no longer present in any
// block. We can accept more sweeps and try to
// publish new transactions, at this point we
// need to monitor again for a new spend.
select {
case b.reorgChan <- struct{}{}:
case <-ctx.Done():
}

case <-ctx.Done():
}
}()
Expand Down Expand Up @@ -2395,12 +2370,6 @@ func (b *batch) writeToErrChan(err error) {

// writeToSpendErrChan sends an error to spend error channels of all the sweeps.
func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
done, err := b.scheduleNextCall()
if err != nil {
done()

return
}
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
for _, s := range b.sweeps {
// If the sweep's notifier is empty then this means that a swap
Expand All @@ -2412,7 +2381,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {

notifiers = append(notifiers, s.notifier)
}
done()

for _, notifier := range notifiers {
select {
Expand Down
Loading