Skip to content

Commit 472bec0

Browse files
committed
sweepbatcher: customize initialDelay per sweep
Option WithInitialDelay now accepts a function returning initialDelay depending on sweep's data. This is needed to be able to wait longer for sweeps with low priority, but still sweeping high priority sweeps soon.
1 parent 0006341 commit 472bec0

File tree

3 files changed

+363
-59
lines changed

3 files changed

+363
-59
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,13 @@ type batchConfig struct {
151151
// clock provides methods to work with time and timers.
152152
clock clock.Clock
153153

154-
// initialDelay is the delay of first batch publishing after creation.
155-
// It only affects newly created batches, not batches loaded from DB,
156-
// so publishing does happen in case of a daemon restart (especially
157-
// important in case of a crashloop).
158-
initialDelay time.Duration
154+
// initialDelayProvider provides the delay of first batch publishing
155+
// after creation. It only affects newly created batches, not batches
156+
// loaded from DB, so publishing does happen in case of a daemon restart
157+
// (especially important in case of a crashloop). If a sweep is about to
158+
// expire (time until timeout is less that 2x initialDelay), then
159+
// waiting is skipped.
160+
initialDelayProvider InitialDelayProvider
159161

160162
// batchPublishDelay is the delay between receiving a new block or
161163
// initial delay completion and publishing the batch transaction.
@@ -650,6 +652,7 @@ func (b *batch) Run(ctx context.Context) error {
650652

651653
// Cache clock variable.
652654
clock := b.cfg.clock
655+
startTime := clock.Now()
653656

654657
blockChan, blockErrChan, err :=
655658
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
@@ -679,17 +682,15 @@ func (b *batch) Run(ctx context.Context) error {
679682

680683
// skipBefore is the time before which we skip batch publishing.
681684
// This is needed to facilitate better grouping of sweeps.
685+
// The value is set only if the batch has at least one sweep.
682686
// For batches loaded from DB initialDelay should be 0.
683-
skipBefore := clock.Now().Add(b.cfg.initialDelay)
687+
var skipBefore *time.Time
684688

685689
// initialDelayChan is a timer which fires upon initial delay end.
686690
// If initialDelay is set to 0, it will not trigger to avoid setting up
687691
// timerChan twice, which could lead to double publishing if
688692
// batchPublishDelay is also 0.
689693
var initialDelayChan <-chan time.Time
690-
if b.cfg.initialDelay > 0 {
691-
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
692-
}
693694

694695
// We use a timer in order to not publish new transactions at the same
695696
// time as the block epoch notification. This is done to prevent
@@ -703,6 +704,45 @@ func (b *batch) Run(ctx context.Context) error {
703704
b.primarySweepID, len(b.sweeps))
704705

705706
for {
707+
// If the batch is not empty, find earliest initialDelay.
708+
var totalSweptAmt btcutil.Amount
709+
for _, sweep := range b.sweeps {
710+
totalSweptAmt += sweep.value
711+
}
712+
713+
skipBeforeUpdated := false
714+
if totalSweptAmt != 0 {
715+
initialDelay, err := b.cfg.initialDelayProvider(
716+
ctx, len(b.sweeps), totalSweptAmt,
717+
)
718+
if err != nil {
719+
b.Warnf("InitialDelayProvider failed: %v. We "+
720+
"publish this batch without a delay.",
721+
err)
722+
initialDelay = 0
723+
}
724+
if initialDelay < 0 {
725+
b.Warnf("Negative delay: %v. We publish this "+
726+
"batch without a delay.", initialDelay)
727+
initialDelay = 0
728+
}
729+
delayStop := startTime.Add(initialDelay)
730+
if skipBefore == nil || delayStop.Before(*skipBefore) {
731+
skipBefore = &delayStop
732+
skipBeforeUpdated = true
733+
}
734+
}
735+
736+
// Create new timer only if the value of skipBefore was updated.
737+
// Don't create the timer if the delay is <= 0 to avoid double
738+
// publishing if batchPublishDelay is also 0.
739+
if skipBeforeUpdated {
740+
delay := skipBefore.Sub(clock.Now())
741+
if delay > 0 {
742+
initialDelayChan = clock.TickAfter(delay)
743+
}
744+
}
745+
706746
select {
707747
case <-b.callEnter:
708748
<-b.callLeave
@@ -718,7 +758,7 @@ func (b *batch) Run(ctx context.Context) error {
718758

719759
case <-initialDelayChan:
720760
b.Debugf("initial delay of duration %v has ended",
721-
b.cfg.initialDelay)
761+
clock.Now().Sub(startTime))
722762

723763
// Set the timer to publish the batch transaction after
724764
// the configured delay.
@@ -732,17 +772,23 @@ func (b *batch) Run(ctx context.Context) error {
732772
continue
733773
}
734774

775+
if skipBefore == nil {
776+
b.Debugf("Skipping publishing, because " +
777+
"the batch is empty.")
778+
continue
779+
}
780+
735781
// If the batch became urgent, skipBefore is set to now.
736-
if b.isUrgent(skipBefore) {
737-
skipBefore = clock.Now()
782+
if b.isUrgent(*skipBefore) {
783+
*skipBefore = clock.Now()
738784
}
739785

740786
// Check that the initial delay has ended. We have also
741787
// batchPublishDelay on top of initialDelay, so if
742788
// initialDelayChan has just fired, this check passes.
743789
now := clock.Now()
744790
if skipBefore.After(now) {
745-
b.Debugf(stillWaitingMsg, skipBefore, now)
791+
b.Debugf(stillWaitingMsg, *skipBefore, now)
746792
continue
747793
}
748794

sweepbatcher/sweep_batcher.go

Lines changed: 57 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,21 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
165165
type FeeRateProvider func(ctx context.Context,
166166
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)
167167

168+
// InitialDelayProvider returns the duration after which a newly created batch
169+
// is first published. It allows to customize the duration based on total value
170+
// of the batch. There is a trade-off between better grouping and getting funds
171+
// faster. If the function returns an error, no delay is used and the error is
172+
// logged as a warning.
173+
type InitialDelayProvider func(ctx context.Context, numSweeps int,
174+
value btcutil.Amount) (time.Duration, error)
175+
176+
// zeroInitialDelay returns no delay for any sweeps.
177+
func zeroInitialDelay(_ context.Context, _ int,
178+
_ btcutil.Amount) (time.Duration, error) {
179+
180+
return 0, nil
181+
}
182+
168183
// PublishErrorHandler is a function that handles transaction publishing error.
169184
type PublishErrorHandler func(err error, errMsg string, log btclog.Logger)
170185

@@ -299,13 +314,13 @@ type Batcher struct {
299314
// clock provides methods to work with time and timers.
300315
clock clock.Clock
301316

302-
// initialDelay is the delay of first batch publishing after creation.
303-
// It only affects newly created batches, not batches loaded from DB,
304-
// so publishing does happen in case of a daemon restart (especially
305-
// important in case of a crashloop). If a sweep is about to expire
306-
// (time until timeout is less that 2x initialDelay), then waiting is
307-
// skipped.
308-
initialDelay time.Duration
317+
// initialDelayProvider provides the delay of first batch publishing
318+
// after creation. It only affects newly created batches, not batches
319+
// loaded from DB, so publishing does happen in case of a daemon restart
320+
// (especially important in case of a crashloop). If a sweep is about to
321+
// expire (time until timeout is less that 2x initialDelay), then
322+
// waiting is skipped.
323+
initialDelayProvider InitialDelayProvider
309324

310325
// publishDelay is the delay of batch publishing that is applied in the
311326
// beginning, after the appearance of a new block in the network or
@@ -339,13 +354,13 @@ type BatcherConfig struct {
339354
// clock provides methods to work with time and timers.
340355
clock clock.Clock
341356

342-
// initialDelay is the delay of first batch publishing after creation.
343-
// It only affects newly created batches, not batches loaded from DB,
344-
// so publishing does happen in case of a daemon restart (especially
345-
// important in case of a crashloop). If a sweep is about to expire
346-
// (time until timeout is less that 2x initialDelay), then waiting is
347-
// skipped.
348-
initialDelay time.Duration
357+
// initialDelayProvider provides the delay of first batch publishing
358+
// after creation. It only affects newly created batches, not batches
359+
// loaded from DB, so publishing does happen in case of a daemon restart
360+
// (especially important in case of a crashloop). If a sweep is about to
361+
// expire (time until timeout is less that 2x initialDelay), then
362+
// waiting is skipped.
363+
initialDelayProvider InitialDelayProvider
349364

350365
// publishDelay is the delay of batch publishing that is applied in the
351366
// beginning, after the appearance of a new block in the network or
@@ -390,9 +405,9 @@ func WithClock(clock clock.Clock) BatcherOption {
390405
// better grouping. Defaults to 0s (no initial delay). If a sweep is about
391406
// to expire (time until timeout is less that 2x initialDelay), then waiting
392407
// is skipped.
393-
func WithInitialDelay(initialDelay time.Duration) BatcherOption {
408+
func WithInitialDelay(provider InitialDelayProvider) BatcherOption {
394409
return func(cfg *BatcherConfig) {
395-
cfg.initialDelay = initialDelay
410+
cfg.initialDelayProvider = provider
396411
}
397412
}
398413

@@ -478,27 +493,27 @@ func NewBatcher(wallet lndclient.WalletKitClient,
478493
}
479494

480495
return &Batcher{
481-
batches: make(map[int32]*batch),
482-
sweepReqs: make(chan SweepRequest),
483-
testReqs: make(chan *testRequest),
484-
errChan: make(chan error, 1),
485-
quit: make(chan struct{}),
486-
initDone: make(chan struct{}),
487-
wallet: wallet,
488-
chainNotifier: chainNotifier,
489-
signerClient: signerClient,
490-
musig2ServerSign: musig2ServerSigner,
491-
VerifySchnorrSig: verifySchnorrSig,
492-
chainParams: chainparams,
493-
store: store,
494-
sweepStore: sweepStore,
495-
clock: cfg.clock,
496-
initialDelay: cfg.initialDelay,
497-
publishDelay: cfg.publishDelay,
498-
customFeeRate: cfg.customFeeRate,
499-
txLabeler: cfg.txLabeler,
500-
customMuSig2Signer: cfg.customMuSig2Signer,
501-
publishErrorHandler: cfg.publishErrorHandler,
496+
batches: make(map[int32]*batch),
497+
sweepReqs: make(chan SweepRequest),
498+
testReqs: make(chan *testRequest),
499+
errChan: make(chan error, 1),
500+
quit: make(chan struct{}),
501+
initDone: make(chan struct{}),
502+
wallet: wallet,
503+
chainNotifier: chainNotifier,
504+
signerClient: signerClient,
505+
musig2ServerSign: musig2ServerSigner,
506+
VerifySchnorrSig: verifySchnorrSig,
507+
chainParams: chainparams,
508+
store: store,
509+
sweepStore: sweepStore,
510+
clock: cfg.clock,
511+
initialDelayProvider: cfg.initialDelayProvider,
512+
publishDelay: cfg.publishDelay,
513+
customFeeRate: cfg.customFeeRate,
514+
txLabeler: cfg.txLabeler,
515+
customMuSig2Signer: cfg.customMuSig2Signer,
516+
publishErrorHandler: cfg.publishErrorHandler,
502517
}
503518
}
504519

@@ -749,11 +764,10 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
749764
cfg.batchPublishDelay = b.publishDelay
750765
}
751766

752-
if b.initialDelay < 0 {
753-
return nil, fmt.Errorf("negative initialDelay: %v",
754-
b.initialDelay)
767+
cfg.initialDelayProvider = b.initialDelayProvider
768+
if cfg.initialDelayProvider == nil {
769+
cfg.initialDelayProvider = zeroInitialDelay
755770
}
756-
cfg.initialDelay = b.initialDelay
757771

758772
batchKit := b.newBatchKit()
759773

@@ -847,6 +861,8 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
847861
// Note that initialDelay and batchPublishDelay are 0 for batches
848862
// recovered from DB so publishing happen in case of a daemon restart
849863
// (especially important in case of a crashloop).
864+
cfg.initialDelayProvider = zeroInitialDelay
865+
850866
newBatch, err := NewBatchFromDB(cfg, batchKit)
851867
if err != nil {
852868
return fmt.Errorf("failed in NewBatchFromDB: %w", err)

0 commit comments

Comments
 (0)