Skip to content

Commit c3cbfd8

Browse files
authored
Merge pull request #9241 from Crypt-iQ/fix_vb
discovery+graph: track job set dependencies in vb
2 parents baa34b0 + 323b633 commit c3cbfd8

File tree

8 files changed

+815
-567
lines changed

8 files changed

+815
-567
lines changed

discovery/gossiper.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,9 @@ type AuthenticatedGossiper struct {
517517
// AuthenticatedGossiper lock.
518518
chanUpdateRateLimiter map[uint64][2]*rate.Limiter
519519

520+
// vb is used to enforce job dependency ordering of gossip messages.
521+
vb *ValidationBarrier
522+
520523
sync.Mutex
521524
}
522525

@@ -542,6 +545,8 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
542545
banman: newBanman(),
543546
}
544547

548+
gossiper.vb = NewValidationBarrier(1000, gossiper.quit)
549+
545550
gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
546551
ChainHash: cfg.ChainHash,
547552
ChanSeries: cfg.ChanSeries,
@@ -1409,10 +1414,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
14091414
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
14101415
}
14111416

1412-
// We'll use this validation to ensure that we process jobs in their
1413-
// dependency order during parallel validation.
1414-
validationBarrier := graph.NewValidationBarrier(1000, d.quit)
1415-
14161417
for {
14171418
select {
14181419
// A new policy update has arrived. We'll commit it to the
@@ -1481,11 +1482,17 @@ func (d *AuthenticatedGossiper) networkHandler() {
14811482
// We'll set up any dependent, and wait until a free
14821483
// slot for this job opens up, this allow us to not
14831484
// have thousands of goroutines active.
1484-
validationBarrier.InitJobDependencies(announcement.msg)
1485+
annJobID, err := d.vb.InitJobDependencies(
1486+
announcement.msg,
1487+
)
1488+
if err != nil {
1489+
announcement.err <- err
1490+
continue
1491+
}
14851492

14861493
d.wg.Add(1)
14871494
go d.handleNetworkMessages(
1488-
announcement, &announcements, validationBarrier,
1495+
announcement, &announcements, annJobID,
14891496
)
14901497

14911498
// The trickle timer has ticked, which indicates we should
@@ -1536,28 +1543,23 @@ func (d *AuthenticatedGossiper) networkHandler() {
15361543
//
15371544
// NOTE: must be run as a goroutine.
15381545
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
1539-
deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) {
1546+
deDuped *deDupedAnnouncements, jobID JobID) {
15401547

15411548
defer d.wg.Done()
1542-
defer vb.CompleteJob()
1549+
defer d.vb.CompleteJob()
15431550

15441551
// We should only broadcast this message forward if it originated from
15451552
// us or it wasn't received as part of our initial historical sync.
15461553
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
15471554

15481555
// If this message has an existing dependency, then we'll wait until
15491556
// that has been fully validated before we proceed.
1550-
err := vb.WaitForDependants(nMsg.msg)
1557+
err := d.vb.WaitForParents(jobID, nMsg.msg)
15511558
if err != nil {
15521559
log.Debugf("Validating network message %s got err: %v",
15531560
nMsg.msg.MsgType(), err)
15541561

1555-
if !graph.IsError(
1556-
err,
1557-
graph.ErrVBarrierShuttingDown,
1558-
graph.ErrParentValidationFailed,
1559-
) {
1560-
1562+
if errors.Is(err, ErrVBarrierShuttingDown) {
15611563
log.Warnf("unexpected error during validation "+
15621564
"barrier shutdown: %v", err)
15631565
}
@@ -1577,7 +1579,16 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
15771579

15781580
// If this message had any dependencies, then we can now signal them to
15791581
// continue.
1580-
vb.SignalDependants(nMsg.msg, allow)
1582+
err = d.vb.SignalDependents(nMsg.msg, jobID)
1583+
if err != nil {
1584+
// Something is wrong if SignalDependents returns an error.
1585+
log.Errorf("SignalDependents returned error for msg=%v with "+
1586+
"JobID=%v", spew.Sdump(nMsg.msg), jobID)
1587+
1588+
nMsg.err <- err
1589+
1590+
return
1591+
}
15811592

15821593
// If the announcement was accepted, then add the emitted announcements
15831594
// to our announce batch to be broadcast once the trickle timer ticks
@@ -2407,7 +2418,6 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
24072418
err,
24082419
graph.ErrOutdated,
24092420
graph.ErrIgnored,
2410-
graph.ErrVBarrierShuttingDown,
24112421
) {
24122422

24132423
log.Error(err)
@@ -3148,7 +3158,6 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
31483158
if graph.IsError(
31493159
err, graph.ErrOutdated,
31503160
graph.ErrIgnored,
3151-
graph.ErrVBarrierShuttingDown,
31523161
) {
31533162

31543163
log.Debugf("Update edge for short_chan_id(%v) got: %v",

0 commit comments

Comments
 (0)