Skip to content

Commit ab59bda

Browse files
committed
sweepbatcher: replace batch logger atomically
This is needed to fix crashes in unit tests under -race.
1 parent a0f8724 commit ab59bda

File tree

2 files changed

+87
-50
lines changed

2 files changed

+87
-50
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 83 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"math"
1010
"strings"
1111
"sync"
12+
"sync/atomic"
1213
"time"
1314

1415
"github.com/btcsuite/btcd/blockchain"
@@ -284,8 +285,8 @@ type batch struct {
284285
// cfg is the configuration for this batch.
285286
cfg *batchConfig
286287

287-
// log is the logger for this batch.
288-
log btclog.Logger
288+
// log_ is the logger for this batch.
289+
log_ atomic.Pointer[btclog.Logger]
289290

290291
wg sync.WaitGroup
291292
}
@@ -387,7 +388,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
387388
}
388389
}
389390

390-
return &batch{
391+
b := &batch{
391392
id: bk.id,
392393
state: bk.state,
393394
primarySweepID: bk.primaryID,
@@ -412,9 +413,42 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
412413
publishErrorHandler: bk.publishErrorHandler,
413414
purger: bk.purger,
414415
store: bk.store,
415-
log: bk.log,
416416
cfg: &cfg,
417-
}, nil
417+
}
418+
419+
b.setLog(bk.log)
420+
421+
return b, nil
422+
}
423+
424+
// log returns current logger.
425+
func (b *batch) log() btclog.Logger {
426+
return *b.log_.Load()
427+
}
428+
429+
// setLog atomically replaces the logger.
430+
func (b *batch) setLog(logger btclog.Logger) {
431+
b.log_.Store(&logger)
432+
}
433+
434+
// Debugf logs a message with level DEBUG.
435+
func (b *batch) Debugf(format string, params ...interface{}) {
436+
b.log().Debugf(format, params...)
437+
}
438+
439+
// Infof logs a message with level INFO.
440+
func (b *batch) Infof(format string, params ...interface{}) {
441+
b.log().Infof(format, params...)
442+
}
443+
444+
// Warnf logs a message with level WARN.
445+
func (b *batch) Warnf(format string, params ...interface{}) {
446+
b.log().Warnf(format, params...)
447+
}
448+
449+
// Errorf logs a message with level ERROR.
450+
func (b *batch) Errorf(format string, params ...interface{}) {
451+
b.log().Errorf(format, params...)
418452
}
419453

420454
// addSweep tries to add a sweep to the batch. If this is the first sweep being
@@ -430,7 +464,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
430464
// If the provided sweep is nil, we can't proceed with any checks, so
431465
// we just return early.
432466
if sweep == nil {
433-
b.log.Infof("the sweep is nil")
467+
b.Infof("the sweep is nil")
434468

435469
return false, nil
436470
}
@@ -473,7 +507,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
473507
// the batch, do not add another sweep to prevent the tx from becoming
474508
// non-standard.
475509
if len(b.sweeps) >= MaxSweepsPerBatch {
476-
b.log.Infof("the batch has already too many sweeps (%d >= %d)",
510+
b.Infof("the batch has already too many sweeps %d >= %d",
477511
len(b.sweeps), MaxSweepsPerBatch)
478512

479513
return false, nil
@@ -483,7 +517,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
483517
// arrive here after the batch got closed because of a spend. In this
484518
// case we cannot add the sweep to this batch.
485519
if b.state != Open {
486-
b.log.Infof("the batch state (%v) is not open", b.state)
520+
b.Infof("the batch state (%v) is not open", b.state)
487521

488522
return false, nil
489523
}
@@ -493,15 +527,15 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
493527
// we cannot add this sweep to the batch.
494528
for _, s := range b.sweeps {
495529
if s.isExternalAddr {
496-
b.log.Infof("the batch already has a sweep (%x) with "+
530+
b.Infof("the batch already has a sweep %x with "+
497531
"an external address", s.swapHash[:6])
498532

499533
return false, nil
500534
}
501535

502536
if sweep.isExternalAddr {
503-
b.log.Infof("the batch is not empty and new sweep (%x)"+
504-
" has an external address", sweep.swapHash[:6])
537+
b.Infof("the batch is not empty and new sweep %x "+
538+
"has an external address", sweep.swapHash[:6])
505539

506540
return false, nil
507541
}
@@ -515,7 +549,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
515549
int32(math.Abs(float64(sweep.timeout - s.timeout)))
516550

517551
if timeoutDistance > b.cfg.maxTimeoutDistance {
518-
b.log.Infof("too long timeout distance between the "+
552+
b.Infof("too long timeout distance between the "+
519553
"batch and sweep %x: %d > %d",
520554
sweep.swapHash[:6], timeoutDistance,
521555
b.cfg.maxTimeoutDistance)
@@ -544,7 +578,7 @@ func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) {
544578
}
545579

546580
// Add the sweep to the batch's sweeps.
547-
b.log.Infof("adding sweep %x", sweep.swapHash[:6])
581+
b.Infof("adding sweep %x", sweep.swapHash[:6])
548582
b.sweeps[sweep.swapHash] = *sweep
549583

550584
// Update FeeRate. Max(sweep.minFeeRate) for all the sweeps of
@@ -572,7 +606,7 @@ func (b *batch) sweepExists(hash lntypes.Hash) bool {
572606

573607
// Wait waits for the batch to gracefully stop.
574608
func (b *batch) Wait() {
575-
b.log.Infof("Stopping")
609+
b.Infof("Stopping")
576610
<-b.finished
577611
}
578612

@@ -613,8 +647,7 @@ func (b *batch) Run(ctx context.Context) error {
613647
// Set currentHeight here, because it may be needed in monitorSpend.
614648
select {
615649
case b.currentHeight = <-blockChan:
616-
b.log.Debugf("initial height for the batch is %v",
617-
b.currentHeight)
650+
b.Debugf("initial height for the batch is %v", b.currentHeight)
618651

619652
case <-runCtx.Done():
620653
return runCtx.Err()
@@ -652,7 +685,7 @@ func (b *batch) Run(ctx context.Context) error {
652685
// completes.
653686
timerChan := clock.TickAfter(b.cfg.batchPublishDelay)
654687

655-
b.log.Infof("started, primary %x, total sweeps %v",
688+
b.Infof("started, primary %x, total sweeps %v",
656689
b.primarySweepID[0:6], len(b.sweeps))
657690

658691
for {
@@ -662,15 +695,15 @@ func (b *batch) Run(ctx context.Context) error {
662695

663696
// blockChan provides immediately the current tip.
664697
case height := <-blockChan:
665-
b.log.Debugf("received block %v", height)
698+
b.Debugf("received block %v", height)
666699

667700
// Set the timer to publish the batch transaction after
668701
// the configured delay.
669702
timerChan = clock.TickAfter(b.cfg.batchPublishDelay)
670703
b.currentHeight = height
671704

672705
case <-initialDelayChan:
673-
b.log.Debugf("initial delay of duration %v has ended",
706+
b.Debugf("initial delay of duration %v has ended",
674707
b.cfg.initialDelay)
675708

676709
// Set the timer to publish the batch transaction after
@@ -680,8 +713,8 @@ func (b *batch) Run(ctx context.Context) error {
680713
case <-timerChan:
681714
// Check that batch is still open.
682715
if b.state != Open {
683-
b.log.Debugf("Skipping publishing, because the"+
684-
" batch is not open (%v).", b.state)
716+
b.Debugf("Skipping publishing, because "+
717+
"the batch is not open (%v).", b.state)
685718
continue
686719
}
687720

@@ -695,7 +728,7 @@ func (b *batch) Run(ctx context.Context) error {
695728
// initialDelayChan has just fired, this check passes.
696729
now := clock.Now()
697730
if skipBefore.After(now) {
698-
b.log.Debugf(stillWaitingMsg, skipBefore, now)
731+
b.Debugf(stillWaitingMsg, skipBefore, now)
699732
continue
700733
}
701734

@@ -715,8 +748,8 @@ func (b *batch) Run(ctx context.Context) error {
715748

716749
case <-b.reorgChan:
717750
b.state = Open
718-
b.log.Warnf("reorg detected, batch is able to accept " +
719-
"new sweeps")
751+
b.Warnf("reorg detected, batch is able to " +
752+
"accept new sweeps")
720753

721754
err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
722755
if err != nil {
@@ -755,8 +788,10 @@ func (b *batch) timeout() int32 {
755788
func (b *batch) isUrgent(skipBefore time.Time) bool {
756789
timeout := b.timeout()
757790
if timeout <= 0 {
758-
b.log.Warnf("Method timeout() returned %v. Number of"+
759-
" sweeps: %d. It may be an empty batch.",
791+
// This may happen if the batch is empty or if SweepInfo.Timeout
792+
// is not set, may be possible in tests or if there is a bug.
793+
b.Warnf("Method timeout() returned %v. Number of "+
794+
"sweeps: %d. It may be an empty batch.",
760795
timeout, len(b.sweeps))
761796
return false
762797
}
@@ -779,7 +814,7 @@ func (b *batch) isUrgent(skipBefore time.Time) bool {
779814
return false
780815
}
781816

782-
b.log.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
817+
b.Debugf("cancelling waiting for urgent sweep (timeBank is %v, "+
783818
"remainingWaiting is %v)", timeBank, remainingWaiting)
784819

785820
// Signal to the caller to cancel initialDelay.
@@ -795,7 +830,7 @@ func (b *batch) publish(ctx context.Context) error {
795830
)
796831

797832
if len(b.sweeps) == 0 {
798-
b.log.Debugf("skipping publish: no sweeps in the batch")
833+
b.Debugf("skipping publish: no sweeps in the batch")
799834

800835
return nil
801836
}
@@ -808,7 +843,7 @@ func (b *batch) publish(ctx context.Context) error {
808843

809844
// logPublishError is a function which logs publish errors.
810845
logPublishError := func(errMsg string, err error) {
811-
b.publishErrorHandler(err, errMsg, b.log)
846+
b.publishErrorHandler(err, errMsg, b.log())
812847
}
813848

814849
fee, err, signSuccess = b.publishMixedBatch(ctx)
@@ -830,9 +865,9 @@ func (b *batch) publish(ctx context.Context) error {
830865
}
831866
}
832867

833-
b.log.Infof("published, total sweeps: %v, fees: %v", len(b.sweeps), fee)
868+
b.Infof("published, total sweeps: %v, fees: %v", len(b.sweeps), fee)
834869
for _, sweep := range b.sweeps {
835-
b.log.Infof("published sweep %x, value: %v",
870+
b.Infof("published sweep %x, value: %v",
836871
sweep.swapHash[:6], sweep.value)
837872
}
838873

@@ -1026,7 +1061,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
10261061
coopInputs int
10271062
)
10281063
for attempt := 1; ; attempt++ {
1029-
b.log.Infof("Attempt %d of collecting cooperative signatures.",
1064+
b.Infof("Attempt %d of collecting cooperative signatures.",
10301065
attempt)
10311066

10321067
// Construct unsigned batch transaction.
@@ -1062,7 +1097,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
10621097
ctx, i, sweep, tx, prevOutsMap, psbtBytes,
10631098
)
10641099
if err != nil {
1065-
b.log.Infof("cooperative signing failed for "+
1100+
b.Infof("cooperative signing failed for "+
10661101
"sweep %x: %v", sweep.swapHash[:6], err)
10671102

10681103
// Set coopFailed flag for this sweep in all the
@@ -1201,7 +1236,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
12011236
}
12021237
}
12031238
txHash := tx.TxHash()
1204-
b.log.Infof("attempting to publish batch tx=%v with feerate=%v, "+
1239+
b.Infof("attempting to publish batch tx=%v with feerate=%v, "+
12051240
"weight=%v, feeForWeight=%v, fee=%v, sweeps=%d, "+
12061241
"%d cooperative: (%s) and %d non-cooperative (%s), destAddr=%s",
12071242
txHash, b.rbfCache.FeeRate, weight, feeForWeight, fee,
@@ -1215,7 +1250,7 @@ func (b *batch) publishMixedBatch(ctx context.Context) (btcutil.Amount, error,
12151250
blockchain.GetTransactionWeight(btcutil.NewTx(tx)),
12161251
)
12171252
if realWeight != weight {
1218-
b.log.Warnf("actual weight of tx %v is %v, estimated as %d",
1253+
b.Warnf("actual weight of tx %v is %v, estimated as %d",
12191254
txHash, realWeight, weight)
12201255
}
12211256

@@ -1239,11 +1274,11 @@ func (b *batch) debugLogTx(msg string, tx *wire.MsgTx) {
12391274
// Serialize the transaction and convert to hex string.
12401275
buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize()))
12411276
if err := tx.Serialize(buf); err != nil {
1242-
b.log.Errorf("failed to serialize tx for debug log: %v", err)
1277+
b.Errorf("failed to serialize tx for debug log: %v", err)
12431278
return
12441279
}
12451280

1246-
b.log.Debugf("%s: %s", msg, hex.EncodeToString(buf.Bytes()))
1281+
b.Debugf("%s: %s", msg, hex.EncodeToString(buf.Bytes()))
12471282
}
12481283

12491284
// musig2sign signs one sweep using musig2.
@@ -1405,15 +1440,16 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
14051440
if b.rbfCache.FeeRate == 0 {
14061441
// We set minFeeRate in each sweep, so fee rate is expected to
14071442
// be initiated here.
1408-
b.log.Warnf("rbfCache.FeeRate is 0, which must not happen.")
1443+
b.Warnf("rbfCache.FeeRate is 0, which must not happen.")
14091444

14101445
if b.cfg.batchConfTarget == 0 {
1411-
b.log.Warnf("updateRbfRate called with zero " +
1446+
b.Warnf("updateRbfRate called with zero " +
14121447
"batchConfTarget")
14131448
}
14141449

1415-
b.log.Infof("initializing rbf fee rate for conf target=%v",
1450+
b.Infof("initializing rbf fee rate for conf target=%v",
14161451
b.cfg.batchConfTarget)
1452+
14171453
rate, err := b.wallet.EstimateFeeRate(
14181454
ctx, b.cfg.batchConfTarget,
14191455
)
@@ -1453,6 +1489,7 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
14531489
)
14541490
if err != nil {
14551491
cancel()
1492+
14561493
return err
14571494
}
14581495

@@ -1461,7 +1498,7 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
14611498
defer cancel()
14621499
defer b.wg.Done()
14631500

1464-
b.log.Infof("monitoring spend for outpoint %s",
1501+
b.Infof("monitoring spend for outpoint %s",
14651502
primarySweep.outpoint.String())
14661503

14671504
for {
@@ -1584,7 +1621,7 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
15841621
if len(spendTx.TxOut) > 0 {
15851622
b.batchPkScript = spendTx.TxOut[0].PkScript
15861623
} else {
1587-
b.log.Warnf("transaction %v has no outputs", txHash)
1624+
b.Warnf("transaction %v has no outputs", txHash)
15881625
}
15891626

15901627
// As a previous version of the batch transaction may get confirmed,
@@ -1666,13 +1703,13 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
16661703

16671704
err := b.purger(&sweep)
16681705
if err != nil {
1669-
b.log.Errorf("unable to purge sweep %x: %v",
1706+
b.Errorf("unable to purge sweep %x: %v",
16701707
sweep.SwapHash[:6], err)
16711708
}
16721709
}
16731710
}()
16741711

1675-
b.log.Infof("spent, total sweeps: %v, purged sweeps: %v",
1712+
b.Infof("spent, total sweeps: %v, purged sweeps: %v",
16761713
len(notifyList), len(purgeList))
16771714

16781715
err := b.monitorConfirmations(ctx)
@@ -1690,7 +1727,7 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
16901727
// handleConf handles a confirmation notification. This is the final step of the
16911728
// batch. Here we signal to the batcher that this batch was completed.
16921729
func (b *batch) handleConf(ctx context.Context) error {
1693-
b.log.Infof("confirmed in txid %s", b.batchTxid)
1730+
b.Infof("confirmed in txid %s", b.batchTxid)
16941731
b.state = Confirmed
16951732

16961733
return b.store.ConfirmBatch(ctx, b.id)
@@ -1769,7 +1806,7 @@ func (b *batch) insertAndAcquireID(ctx context.Context) (int32, error) {
17691806
}
17701807

17711808
b.id = id
1772-
b.log = batchPrefixLogger(fmt.Sprintf("%d", b.id))
1809+
b.setLog(batchPrefixLogger(fmt.Sprintf("%d", b.id)))
17731810

17741811
return id, nil
17751812
}

0 commit comments

Comments
 (0)