Skip to content

Commit bb837a4

Browse files
committed
sweepbatcher: load sweeps in AddSweep
This is needed to run additional checks in AddSweep in the next commit.
1 parent 78ca6bd commit bb837a4

File tree

4 files changed

+98
-65
lines changed

4 files changed

+98
-65
lines changed

loopout.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1269,7 +1269,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
12691269
}
12701270

12711271
// Send the sweep to the sweeper.
1272-
err := s.batcher.AddSweep(&sweepReq)
1272+
err := s.batcher.AddSweep(ctx, &sweepReq)
12731273
if err != nil {
12741274
return nil, err
12751275
}

sweepbatcher/sweep_batch.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ type batch struct {
306306
// Purger is a function that takes a sweep request and feeds it back to the
307307
// batcher main entry point. The name is inspired by its purpose, which is to
308308
// purge the batch from sweeps that didn't make it to the confirmed tx.
309-
type Purger func(sweepReq *SweepRequest) error
309+
type Purger func(ctx context.Context, sweepReq *SweepRequest) error
310310

311311
// batchKit is a kit of dependencies that are used to initialize a batch. This
312312
// struct is only used as a wrapper for the arguments that are required to
@@ -1895,10 +1895,14 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
18951895
// for re-entry. This batch doesn't care for the outcome of this
18961896
// operation so we don't wait for it.
18971897
go func() {
1898+
// Make sure this context doesn't expire so we successfully
1899+
// add the sweeps to the batcher.
1900+
ctx := context.WithoutCancel(ctx)
1901+
18981902
// Iterate over the purge list and feed the sweeps back to the
18991903
// batcher.
19001904
for _, sweep := range purgeList {
1901-
err := b.purger(&sweep)
1905+
err := b.purger(ctx, &sweep)
19021906
if err != nil {
19031907
b.Errorf("unable to purge sweep %x: %v",
19041908
sweep.SwapHash[:6], err)

sweepbatcher/sweep_batcher.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,18 @@ type SweepRequest struct {
222222
Notifier *SpendNotifier
223223
}
224224

225+
// addSweepsRequest is a request to sweep an outpoint or a group of outpoints
226+
// that is used internally by the batcher (between AddSweep and handleSweeps).
227+
type addSweepsRequest struct {
228+
// sweeps is the list of sweeps already loaded from DB and fee rate
229+
// source.
230+
sweeps []*sweep
231+
232+
// Notifier is a notifier that is used to notify the requester of this
233+
// sweep that the sweep was successful.
234+
notifier *SpendNotifier
235+
}
236+
225237
type SpendDetail struct {
226238
// Tx is the transaction that spent the outpoint.
227239
Tx *wire.MsgTx
@@ -266,8 +278,8 @@ type Batcher struct {
266278
// batches is a map of batch IDs to the currently active batches.
267279
batches map[int32]*batch
268280

269-
// sweepReqs is a channel where sweep requests are received.
270-
sweepReqs chan SweepRequest
281+
// addSweepsChan is a channel where sweep requests are received.
282+
addSweepsChan chan *addSweepsRequest
271283

272284
// testReqs is a channel where test requests are received.
273285
// This is used only in unit tests! The reason to have this is to
@@ -501,7 +513,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
501513

502514
return &Batcher{
503515
batches: make(map[int32]*batch),
504-
sweepReqs: make(chan SweepRequest),
516+
addSweepsChan: make(chan *addSweepsRequest),
505517
testReqs: make(chan *testRequest),
506518
errChan: make(chan error, 1),
507519
quit: make(chan struct{}),
@@ -557,15 +569,8 @@ func (b *Batcher) Run(ctx context.Context) error {
557569

558570
for {
559571
select {
560-
case sweepReq := <-b.sweepReqs:
561-
sweeps, err := b.fetchSweeps(runCtx, sweepReq)
562-
if err != nil {
563-
warnf("fetchSweeps failed: %v.", err)
564-
565-
return err
566-
}
567-
568-
err = b.handleSweeps(runCtx, sweeps, sweepReq.Notifier)
572+
case req := <-b.addSweepsChan:
573+
err = b.handleSweeps(runCtx, req.sweeps, req.notifier)
569574
if err != nil {
570575
warnf("handleSweeps failed: %v.", err)
571576

@@ -589,11 +594,32 @@ func (b *Batcher) Run(ctx context.Context) error {
589594
}
590595
}
591596

592-
// AddSweep adds a sweep request to the batcher for handling. This will either
593-
// place the sweep in an existing batch or create a new one.
594-
func (b *Batcher) AddSweep(sweepReq *SweepRequest) error {
597+
// AddSweep loads information about sweeps from the store and fee rate source,
598+
// and adds them to the batcher for handling. This will either place the sweep
599+
// in an existing batch or create a new one. The method can be called multiple
600+
// times, but the sweeps (including the order of them) must be the same. If
601+
// notifier is provided, the batcher sends back sweeping results through it.
602+
func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error {
603+
// If the batcher is shutting down, quit now.
604+
select {
605+
case <-b.quit:
606+
return ErrBatcherShuttingDown
607+
608+
default:
609+
}
610+
611+
sweeps, err := b.fetchSweeps(ctx, *sweepReq)
612+
if err != nil {
613+
return fmt.Errorf("fetchSweeps failed: %w", err)
614+
}
615+
616+
req := &addSweepsRequest{
617+
sweeps: sweeps,
618+
notifier: sweepReq.Notifier,
619+
}
620+
595621
select {
596-
case b.sweepReqs <- *sweepReq:
622+
case b.addSweepsChan <- req:
597623
return nil
598624

599625
case <-b.quit:

0 commit comments

Comments
 (0)