Skip to content

Commit 98fa740

Browse files
authored
Merge pull request #801 from starius/sweepbatcher-wait
sweepbatcher: add options WithInitialDelay and WithPublishDelay
2 parents 6d3a488 + 9af6718 commit 98fa740

File tree

5 files changed

+733
-16
lines changed

5 files changed

+733
-16
lines changed

sweepbatcher/store.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/btcsuite/btcd/wire"
1212
"github.com/lightninglabs/loop/loopdb"
1313
"github.com/lightninglabs/loop/loopdb/sqlc"
14-
"github.com/lightningnetwork/lnd/clock"
1514
"github.com/lightningnetwork/lnd/lntypes"
1615
)
1716

@@ -72,15 +71,13 @@ type SQLStore struct {
7271
baseDb BaseDB
7372

7473
network *chaincfg.Params
75-
clock clock.Clock
7674
}
7775

7876
// NewSQLStore creates a new SQLStore.
7977
func NewSQLStore(db BaseDB, network *chaincfg.Params) *SQLStore {
8078
return &SQLStore{
8179
baseDb: db,
8280
network: network,
83-
clock: clock.NewDefaultClock(),
8481
}
8582
}
8683

sweepbatcher/sweep_batch.go

Lines changed: 121 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/lightninglabs/loop/swap"
2525
sweeppkg "github.com/lightninglabs/loop/sweep"
2626
"github.com/lightningnetwork/lnd/chainntnfs"
27+
"github.com/lightningnetwork/lnd/clock"
2728
"github.com/lightningnetwork/lnd/input"
2829
"github.com/lightningnetwork/lnd/keychain"
2930
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
@@ -134,8 +135,17 @@ type batchConfig struct {
134135
// batchConfTarget is the confirmation target of the batch transaction.
135136
batchConfTarget int32
136137

137-
// batchPublishDelay is the delay between receiving a new block and
138-
// publishing the batch transaction.
138+
// clock provides methods to work with time and timers.
139+
clock clock.Clock
140+
141+
// initialDelay is the delay of first batch publishing after creation.
142+
// It only affects newly created batches, not batches loaded from DB,
143+
// so publishing does happen in case of a daemon restart (especially
144+
// important in case of a crashloop).
145+
initialDelay time.Duration
146+
147+
// batchPublishDelay is the delay between receiving a new block or
148+
// initial delay completion and publishing the batch transaction.
139149
batchPublishDelay time.Duration
140150

141151
// noBumping instructs sweepbatcher not to fee bump itself and rely on
@@ -507,6 +517,11 @@ func (b *batch) Wait() {
507517
<-b.finished
508518
}
509519

520+
// stillWaitingMsg is the format of the message printed if the batch is about
521+
// to publish, but initial delay has not ended yet.
522+
const stillWaitingMsg = "Skipping publishing, initial delay will end at " +
523+
"%v, now is %v."
524+
510525
// Run is the batch's main event loop.
511526
func (b *batch) Run(ctx context.Context) error {
512527
runCtx, cancel := context.WithCancel(ctx)
@@ -527,6 +542,9 @@ func (b *batch) Run(ctx context.Context) error {
527542
return fmt.Errorf("both musig2 signers provided")
528543
}
529544

545+
// Cache clock variable.
546+
clock := b.cfg.clock
547+
530548
blockChan, blockErrChan, err :=
531549
b.chainNotifier.RegisterBlockEpochNtfn(runCtx)
532550
if err != nil {
@@ -543,10 +561,25 @@ func (b *batch) Run(ctx context.Context) error {
543561
}
544562
}
545563

564+
// skipBefore is the time before which we skip batch publishing.
565+
// This is needed to facilitate better grouping of sweeps.
566+
// For batches loaded from DB initialDelay should be 0.
567+
skipBefore := clock.Now().Add(b.cfg.initialDelay)
568+
569+
// initialDelayChan is a timer which fires upon initial delay end.
570+
// If initialDelay is 0, it does not fire to prevent race with
571+
// blockChan which also fires immediately with current tip. Such a race
572+
// may result in double publishing if batchPublishDelay is also 0.
573+
var initialDelayChan <-chan time.Time
574+
if b.cfg.initialDelay > 0 {
575+
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
576+
}
577+
546578
// We use a timer in order to not publish new transactions at the same
547579
// time as the block epoch notification. This is done to prevent
548580
// unnecessary transaction publishments when a spend is detected on that
549-
// block.
581+
// block. This timer starts after new block arrives or initialDelay
582+
// completes.
550583
var timerChan <-chan time.Time
551584

552585
b.log.Infof("started, primary %x, total sweeps %v",
@@ -557,20 +590,48 @@ func (b *batch) Run(ctx context.Context) error {
557590
case <-b.callEnter:
558591
<-b.callLeave
559592

593+
// blockChan provides immediately the current tip.
560594
case height := <-blockChan:
561595
b.log.Debugf("received block %v", height)
562596

563597
// Set the timer to publish the batch transaction after
564598
// the configured delay.
565-
timerChan = time.After(b.cfg.batchPublishDelay)
599+
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
566600
b.currentHeight = height
567601

602+
case <-initialDelayChan:
603+
b.log.Debugf("initial delay of duration %v has ended",
604+
b.cfg.initialDelay)
605+
606+
// Set the timer to publish the batch transaction after
607+
// the configured delay.
608+
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
609+
568610
case <-timerChan:
569-
if b.state == Open {
570-
err := b.publish(ctx)
571-
if err != nil {
572-
return err
573-
}
611+
// Check that batch is still open.
612+
if b.state != Open {
613+
b.log.Debugf("Skipping publishing, because the"+
614+
"batch is not open (%v).", b.state)
615+
continue
616+
}
617+
618+
// If the batch became urgent, skipBefore is set to now.
619+
if b.isUrgent(skipBefore) {
620+
skipBefore = clock.Now()
621+
}
622+
623+
// Check that the initial delay has ended. We have also
624+
// batchPublishDelay on top of initialDelay, so if
625+
// initialDelayChan has just fired, this check passes.
626+
now := clock.Now()
627+
if skipBefore.After(now) {
628+
b.log.Debugf(stillWaitingMsg, skipBefore, now)
629+
continue
630+
}
631+
632+
err := b.publish(ctx)
633+
if err != nil {
634+
return err
574635
}
575636

576637
case spend := <-b.spendChan:
@@ -604,6 +665,57 @@ func (b *batch) Run(ctx context.Context) error {
604665
}
605666
}
606667

668+
// timeout returns minimum timeout as block height among sweeps of the batch.
669+
// If the batch is empty, return -1.
670+
func (b *batch) timeout() int32 {
671+
// Find minimum among sweeps' timeouts.
672+
minTimeout := int32(-1)
673+
for _, sweep := range b.sweeps {
674+
if minTimeout == -1 || minTimeout > sweep.timeout {
675+
minTimeout = sweep.timeout
676+
}
677+
}
678+
679+
return minTimeout
680+
}
681+
682+
// isUrgent checks if the batch became urgent. This is determined by comparing
683+
// the remaining number of blocks until timeout to the initial delay remained,
684+
// given one block is 10 minutes.
685+
func (b *batch) isUrgent(skipBefore time.Time) bool {
686+
timeout := b.timeout()
687+
if timeout <= 0 {
688+
b.log.Warnf("Method timeout() returned %v. Number of"+
689+
" sweeps: %d. It may be an empty batch.",
690+
timeout, len(b.sweeps))
691+
return false
692+
}
693+
694+
if b.currentHeight == 0 {
695+
// currentHeight is not initiated yet.
696+
return false
697+
}
698+
699+
blocksToTimeout := timeout - b.currentHeight
700+
const blockTime = 10 * time.Minute
701+
timeBank := time.Duration(blocksToTimeout) * blockTime
702+
703+
// We want to have at least 2x as much time to be safe.
704+
const safetyFactor = 2
705+
remainingWaiting := skipBefore.Sub(b.cfg.clock.Now())
706+
707+
if timeBank >= safetyFactor*remainingWaiting {
708+
// There is enough time, keep waiting.
709+
return false
710+
}
711+
712+
b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
713+
"remainingWaiting is %v)", timeBank, remainingWaiting)
714+
715+
// Signal to the caller to cancel initialDelay.
716+
return true
717+
}
718+
607719
// publish creates and publishes the latest batch transaction to the network.
608720
func (b *batch) publish(ctx context.Context) error {
609721
var (

sweepbatcher/sweep_batcher.go

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/lightninglabs/loop/loopdb"
1717
"github.com/lightninglabs/loop/swap"
1818
"github.com/lightninglabs/loop/utils"
19+
"github.com/lightningnetwork/lnd/clock"
1920
"github.com/lightningnetwork/lnd/input"
2021
"github.com/lightningnetwork/lnd/lntypes"
2122
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
@@ -44,7 +45,7 @@ const (
4445

4546
// defaultTestnetPublishDelay is the default publish delay that is used
4647
// for testnet.
47-
defaultPublishDelay = 500 * time.Millisecond
48+
defaultTestnetPublishDelay = 500 * time.Millisecond
4849
)
4950

5051
type BatcherStore interface {
@@ -253,6 +254,23 @@ type Batcher struct {
253254
// exit.
254255
wg sync.WaitGroup
255256

257+
// clock provides methods to work with time and timers.
258+
clock clock.Clock
259+
260+
// initialDelay is the delay of first batch publishing after creation.
261+
// It only affects newly created batches, not batches loaded from DB,
262+
// so publishing does happen in case of a daemon restart (especially
263+
// important in case of a crashloop). If a sweep is about to expire
264+
// (time until timeout is less that 2x initialDelay), then waiting is
265+
// skipped.
266+
initialDelay time.Duration
267+
268+
// publishDelay is the delay of batch publishing that is applied in the
269+
// beginning, after the appearance of a new block in the network or
270+
// after the end of initial delay. For batches recovered from DB this
271+
// value is always 0s, regardless of this setting.
272+
publishDelay time.Duration
273+
256274
// customFeeRate provides custom min fee rate per swap. The batch uses
257275
// max of the fee rates of its swaps. In this mode confTarget is
258276
// ignored and fee bumping by sweepbatcher is disabled.
@@ -267,6 +285,23 @@ type Batcher struct {
267285

268286
// BatcherConfig holds batcher configuration.
269287
type BatcherConfig struct {
288+
// clock provides methods to work with time and timers.
289+
clock clock.Clock
290+
291+
// initialDelay is the delay of first batch publishing after creation.
292+
// It only affects newly created batches, not batches loaded from DB,
293+
// so publishing does happen in case of a daemon restart (especially
294+
// important in case of a crashloop). If a sweep is about to expire
295+
// (time until timeout is less that 2x initialDelay), then waiting is
296+
// skipped.
297+
initialDelay time.Duration
298+
299+
// publishDelay is the delay of batch publishing that is applied in the
300+
// beginning, after the appearance of a new block in the network or
301+
// after the end of initial delay. For batches recovered from DB this
302+
// value is always 0s, regardless of this setting.
303+
publishDelay time.Duration
304+
270305
// customFeeRate provides custom min fee rate per swap. The batch uses
271306
// max of the fee rates of its swaps. In this mode confTarget is
272307
// ignored and fee bumping by sweepbatcher is disabled.
@@ -282,6 +317,37 @@ type BatcherConfig struct {
282317
// BatcherOption configures batcher behaviour.
283318
type BatcherOption func(*BatcherConfig)
284319

320+
// WithClock sets the clock used by sweepbatcher and its batches. It is needed
321+
// to manipulate time in tests.
322+
func WithClock(clock clock.Clock) BatcherOption {
323+
return func(cfg *BatcherConfig) {
324+
cfg.clock = clock
325+
}
326+
}
327+
328+
// WithInitialDelay instructs sweepbatcher to wait for the duration provided
329+
// after new batch creation before it is first published. This facilitates
330+
// better grouping. Defaults to 0s (no initial delay). If a sweep is about
331+
// to expire (time until timeout is less that 2x initialDelay), then waiting
332+
// is skipped.
333+
func WithInitialDelay(initialDelay time.Duration) BatcherOption {
334+
return func(cfg *BatcherConfig) {
335+
cfg.initialDelay = initialDelay
336+
}
337+
}
338+
339+
// WithPublishDelay sets the delay of batch publishing that is applied in the
340+
// beginning, after the appearance of a new block in the network or after the
341+
// end of initial delay (see WithInitialDelay). It is needed to prevent
342+
// unnecessary transaction publishments when a spend is detected on that block.
343+
// Default value depends on the network: 5 seconds in mainnet, 0.5s in testnet.
344+
// For batches recovered from DB this value is always 0s.
345+
func WithPublishDelay(publishDelay time.Duration) BatcherOption {
346+
return func(cfg *BatcherConfig) {
347+
cfg.publishDelay = publishDelay
348+
}
349+
}
350+
285351
// WithCustomFeeRate instructs sweepbatcher not to fee bump itself and rely on
286352
// external source of fee rates (FeeRateProvider). To apply a fee rate change,
287353
// the caller should re-add the sweep by calling AddSweep.
@@ -315,6 +381,11 @@ func NewBatcher(wallet lndclient.WalletKitClient,
315381
opt(&cfg)
316382
}
317383

384+
// If WithClock was not provided, use default clock.
385+
if cfg.clock == nil {
386+
cfg.clock = clock.NewDefaultClock()
387+
}
388+
318389
if cfg.customMuSig2Signer != nil && musig2ServerSigner != nil {
319390
panic("customMuSig2Signer must not be used with " +
320391
"musig2ServerSigner")
@@ -334,6 +405,9 @@ func NewBatcher(wallet lndclient.WalletKitClient,
334405
chainParams: chainparams,
335406
store: store,
336407
sweepStore: sweepStore,
408+
clock: cfg.clock,
409+
initialDelay: cfg.initialDelay,
410+
publishDelay: cfg.publishDelay,
337411
customFeeRate: cfg.customFeeRate,
338412
customMuSig2Signer: cfg.customMuSig2Signer,
339413
}
@@ -536,8 +610,22 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
536610
cfg.batchPublishDelay = defaultMainnetPublishDelay
537611

538612
default:
539-
cfg.batchPublishDelay = defaultPublishDelay
613+
cfg.batchPublishDelay = defaultTestnetPublishDelay
614+
}
615+
616+
if b.publishDelay != 0 {
617+
if b.publishDelay < 0 {
618+
return nil, fmt.Errorf("negative publishDelay: %v",
619+
b.publishDelay)
620+
}
621+
cfg.batchPublishDelay = b.publishDelay
622+
}
623+
624+
if b.initialDelay < 0 {
625+
return nil, fmt.Errorf("negative initialDelay: %v",
626+
b.initialDelay)
540627
}
628+
cfg.initialDelay = b.initialDelay
541629

542630
batchKit := b.newBatchKit()
543631

@@ -626,6 +714,9 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
626714

627715
cfg := b.newBatchConfig(batch.cfg.maxTimeoutDistance)
628716

717+
// Note that initialDelay and batchPublishDelay are 0 for batches
718+
// recovered from DB so publishing happen in case of a daemon restart
719+
// (especially important in case of a crashloop).
629720
newBatch, err := NewBatchFromDB(cfg, batchKit)
630721
if err != nil {
631722
return fmt.Errorf("failed in NewBatchFromDB: %w", err)
@@ -934,6 +1025,7 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
9341025
maxTimeoutDistance: maxTimeoutDistance,
9351026
noBumping: b.customFeeRate != nil,
9361027
customMuSig2Signer: b.customMuSig2Signer,
1028+
clock: b.clock,
9371029
}
9381030
}
9391031

0 commit comments

Comments
 (0)