Skip to content

Commit e178d32

Browse files
committed
sweepbatcher: add option WithInitialDelay
WithInitialDelay instructs sweepbatcher to wait for the duration provided after new batch creation before it is first published. This facilitates better grouping. It only affects newly created batches, not batches loaded from DB, so publishing does happen in case of a daemon restart (especially important in case of a crashloop). Defaults to 0s. If a sweep is about to expire (time until timeout is less that 2x initialDelay), then waiting is skipped.
1 parent 429eb85 commit e178d32

File tree

2 files changed

+150
-8
lines changed

2 files changed

+150
-8
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,14 @@ type batchConfig struct {
138138
// clock provides methods to work with time and timers.
139139
clock clock.Clock
140140

141-
// batchPublishDelay is the delay between receiving a new block and
142-
// publishing the batch transaction.
141+
// initialDelay is the delay of first batch publishing after creation.
142+
// It only affects newly created batches, not batches loaded from DB,
143+
// so publishing does happen in case of a daemon restart (especially
144+
// important in case of a crashloop).
145+
initialDelay time.Duration
146+
147+
// batchPublishDelay is the delay between receiving a new block or
148+
// initial delay completion and publishing the batch transaction.
143149
batchPublishDelay time.Duration
144150

145151
// noBumping instructs sweepbatcher not to fee bump itself and rely on
@@ -511,6 +517,11 @@ func (b *batch) Wait() {
511517
<-b.finished
512518
}
513519

520+
// stillWaitingMsg is the format of the message printed if the batch is about
521+
// to publish, but initial delay has not ended yet.
522+
const stillWaitingMsg = "Skipping publishing, initial delay will end at " +
523+
"%v, now is %v."
524+
514525
// Run is the batch's main event loop.
515526
func (b *batch) Run(ctx context.Context) error {
516527
runCtx, cancel := context.WithCancel(ctx)
@@ -550,10 +561,25 @@ func (b *batch) Run(ctx context.Context) error {
550561
}
551562
}
552563

564+
// skipBefore is the time before which we skip batch publishing.
565+
// This is needed to facilitate better grouping of sweeps.
566+
// For batches loaded from DB initialDelay should be 0.
567+
skipBefore := clock.Now().Add(b.cfg.initialDelay)
568+
569+
// initialDelayChan is a timer which fires upon initial delay end.
570+
// If initialDelay is 0, it does not fire to prevent race with
571+
// blockChan which also fires immediately with current tip. Such a race
572+
// may result in double publishing if batchPublishDelay is also 0.
573+
var initialDelayChan <-chan time.Time
574+
if b.cfg.initialDelay > 0 {
575+
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
576+
}
577+
553578
// We use a timer in order to not publish new transactions at the same
554579
// time as the block epoch notification. This is done to prevent
555580
// unnecessary transaction publishments when a spend is detected on that
556-
// block.
581+
// block. This timer starts after new block arrives or initialDelay
582+
// completes.
557583
var timerChan <-chan time.Time
558584

559585
b.log.Infof("started, primary %x, total sweeps %v",
@@ -564,6 +590,7 @@ func (b *batch) Run(ctx context.Context) error {
564590
case <-b.callEnter:
565591
<-b.callLeave
566592

593+
// blockChan provides immediately the current tip.
567594
case height := <-blockChan:
568595
b.log.Debugf("received block %v", height)
569596

@@ -572,12 +599,39 @@ func (b *batch) Run(ctx context.Context) error {
572599
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
573600
b.currentHeight = height
574601

602+
case <-initialDelayChan:
603+
b.log.Debugf("initial delay of duration %v has ended",
604+
b.cfg.initialDelay)
605+
606+
// Set the timer to publish the batch transaction after
607+
// the configured delay.
608+
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
609+
575610
case <-timerChan:
576-
if b.state == Open {
577-
err := b.publish(ctx)
578-
if err != nil {
579-
return err
580-
}
611+
// Check that batch is still open.
612+
if b.state != Open {
613+
b.log.Debugf("Skipping publishing, because the"+
614+
"batch is not open (%v).", b.state)
615+
continue
616+
}
617+
618+
// If the batch became urgent, skipBefore is set to now.
619+
if b.isUrgent(skipBefore) {
620+
skipBefore = clock.Now()
621+
}
622+
623+
// Check that the initial delay has ended. We have also
624+
// batchPublishDelay on top of initialDelay, so if
625+
// initialDelayChan has just fired, this check passes.
626+
now := clock.Now()
627+
if skipBefore.After(now) {
628+
b.log.Debugf(stillWaitingMsg, skipBefore, now)
629+
continue
630+
}
631+
632+
err := b.publish(ctx)
633+
if err != nil {
634+
return err
581635
}
582636

583637
case spend := <-b.spendChan:
@@ -611,6 +665,57 @@ func (b *batch) Run(ctx context.Context) error {
611665
}
612666
}
613667

668+
// timeout returns minimum timeout as block height among sweeps of the batch.
669+
// If the batch is empty, return -1.
670+
func (b *batch) timeout() int32 {
671+
// Find minimum among sweeps' timeouts.
672+
minTimeout := int32(-1)
673+
for _, sweep := range b.sweeps {
674+
if minTimeout == -1 || minTimeout > sweep.timeout {
675+
minTimeout = sweep.timeout
676+
}
677+
}
678+
679+
return minTimeout
680+
}
681+
682+
// isUrgent checks if the batch became urgent. This is determined by comparing
683+
// the remaining number of blocks until timeout to the initial delay remained,
684+
// given one block is 10 minutes.
685+
func (b *batch) isUrgent(skipBefore time.Time) bool {
686+
timeout := b.timeout()
687+
if timeout <= 0 {
688+
b.log.Warnf("Method timeout() returned %v. Number of"+
689+
" sweeps: %d. It may be an empty batch.",
690+
timeout, len(b.sweeps))
691+
return false
692+
}
693+
694+
if b.currentHeight == 0 {
695+
// currentHeight is not initiated yet.
696+
return false
697+
}
698+
699+
blocksToTimeout := timeout - b.currentHeight
700+
const blockTime = 10 * time.Minute
701+
timeBank := time.Duration(blocksToTimeout) * blockTime
702+
703+
// We want to have at least 2x as much time to be safe.
704+
const safetyFactor = 2
705+
remainingWaiting := skipBefore.Sub(b.cfg.clock.Now())
706+
707+
if timeBank >= safetyFactor*remainingWaiting {
708+
// There is enough time, keep waiting.
709+
return false
710+
}
711+
712+
b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
713+
"remainingWaiting is %v)", timeBank, remainingWaiting)
714+
715+
// Signal to the caller to cancel initialDelay.
716+
return true
717+
}
718+
614719
// publish creates and publishes the latest batch transaction to the network.
615720
func (b *batch) publish(ctx context.Context) error {
616721
var (

sweepbatcher/sweep_batcher.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,14 @@ type Batcher struct {
257257
// clock provides methods to work with time and timers.
258258
clock clock.Clock
259259

260+
// initialDelay is the delay of first batch publishing after creation.
261+
// It only affects newly created batches, not batches loaded from DB,
262+
// so publishing does happen in case of a daemon restart (especially
263+
// important in case of a crashloop). If a sweep is about to expire
264+
// (time until timeout is less that 2x initialDelay), then waiting is
265+
// skipped.
266+
initialDelay time.Duration
267+
260268
// customFeeRate provides custom min fee rate per swap. The batch uses
261269
// max of the fee rates of its swaps. In this mode confTarget is
262270
// ignored and fee bumping by sweepbatcher is disabled.
@@ -274,6 +282,14 @@ type BatcherConfig struct {
274282
// clock provides methods to work with time and timers.
275283
clock clock.Clock
276284

285+
// initialDelay is the delay of first batch publishing after creation.
286+
// It only affects newly created batches, not batches loaded from DB,
287+
// so publishing does happen in case of a daemon restart (especially
288+
// important in case of a crashloop). If a sweep is about to expire
289+
// (time until timeout is less that 2x initialDelay), then waiting is
290+
// skipped.
291+
initialDelay time.Duration
292+
277293
// customFeeRate provides custom min fee rate per swap. The batch uses
278294
// max of the fee rates of its swaps. In this mode confTarget is
279295
// ignored and fee bumping by sweepbatcher is disabled.
@@ -297,6 +313,17 @@ func WithClock(clock clock.Clock) BatcherOption {
297313
}
298314
}
299315

316+
// WithInitialDelay instructs sweepbatcher to wait for the duration provided
317+
// after new batch creation before it is first published. This facilitates
318+
// better grouping. Defaults to 0s (no initial delay). If a sweep is about
319+
// to expire (time until timeout is less that 2x initialDelay), then waiting
320+
// is skipped.
321+
func WithInitialDelay(initialDelay time.Duration) BatcherOption {
322+
return func(cfg *BatcherConfig) {
323+
cfg.initialDelay = initialDelay
324+
}
325+
}
326+
300327
// WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on
301328
// external source of fee rates (FeeRateProvider). To apply a fee rate change,
302329
// the caller should re-add the sweep by calling AddSweep.
@@ -355,6 +382,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
355382
store: store,
356383
sweepStore: sweepStore,
357384
clock: cfg.clock,
385+
initialDelay: cfg.initialDelay,
358386
customFeeRate: cfg.customFeeRate,
359387
customMuSig2Signer: cfg.customMuSig2Signer,
360388
}
@@ -560,6 +588,12 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
560588
cfg.batchPublishDelay = defaultTestnetPublishDelay
561589
}
562590

591+
if b.initialDelay < 0 {
592+
return nil, fmt.Errorf("negative initialDelay: %v",
593+
b.initialDelay)
594+
}
595+
cfg.initialDelay = b.initialDelay
596+
563597
batchKit := b.newBatchKit()
564598

565599
batch := NewBatch(cfg, batchKit)
@@ -647,6 +681,9 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
647681

648682
cfg := b.newBatchConfig(batch.cfg.maxTimeoutDistance)
649683

684+
// Note that initialDelay and batchPublishDelay are 0 for batches
685+
// recovered from DB so publishing happen in case of a daemon restart
686+
// (especially important in case of a crashloop).
650687
newBatch, err := NewBatchFromDB(cfg, batchKit)
651688
if err != nil {
652689
return fmt.Errorf("failed in NewBatchFromDB: %w", err)

0 commit comments

Comments
 (0)