Skip to content

Commit 49affa2

Browse files
authored
Merge pull request #9424 from yyforyongyu/fix-gossip-ann
multi: fix inconsistent state in gossip syncer
2 parents 1f20bd3 + 56ff6d1 commit 49affa2

File tree

10 files changed

+99
-42
lines changed

10 files changed

+99
-42
lines changed

discovery/gossiper.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,8 @@ func (d *AuthenticatedGossiper) stop() {
813813
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
814814
peer lnpeer.Peer) chan error {
815815

816+
log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())
817+
816818
errChan := make(chan error, 1)
817819

818820
// For messages in the known set of channel series queries, we'll
@@ -835,9 +837,13 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
835837

836838
// If we've found the message target, then we'll dispatch the
837839
// message directly to it.
838-
syncer.ProcessQueryMsg(m, peer.QuitSignal())
840+
err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
841+
if err != nil {
842+
log.Errorf("Process query msg from peer %x got %v",
843+
peer.PubKey(), err)
844+
}
839845

840-
errChan <- nil
846+
errChan <- err
841847
return errChan
842848

843849
// If a peer is updating its current update horizon, then we'll dispatch
@@ -2382,7 +2388,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
23822388
timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
23832389

23842390
log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
2385-
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
2391+
"node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
2392+
nMsg.source.SerializeCompressed())
23862393

23872394
// We'll quickly ask the router if it already has a newer update for
23882395
// this node so we can skip validating signatures if not required.
@@ -2441,7 +2448,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
24412448
// TODO(roasbeef): get rid of the above
24422449

24432450
log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
2444-
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
2451+
"node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
2452+
nMsg.source.SerializeCompressed())
24452453

24462454
return announcements, true
24472455
}
@@ -3045,9 +3053,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
30453053
edgeToUpdate = e2
30463054
}
30473055

3048-
log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
3049-
"edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
3050-
edgeToUpdate != nil)
3056+
log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
3057+
"edge policy=%v", chanInfo.ChannelID,
3058+
pubKey.SerializeCompressed(), edgeToUpdate != nil)
30513059

30523060
// Validate the channel announcement with the expected public key and
30533061
// channel capacity. In the case of an invalid channel update, we'll

discovery/sync_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,8 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
529529
s.setSyncState(chansSynced)
530530
s.setSyncType(PassiveSync)
531531

532-
log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v",
533-
s.syncState(), s.SyncType(), peer)
532+
log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
533+
s.syncState(), s.SyncType(), peer.PubKey())
534534

535535
return s
536536
}

discovery/sync_manager_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer {
2828
func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer {
2929
return &mockPeer{
3030
pk: pk,
31-
sentMsgs: make(chan lnwire.Message),
31+
sentMsgs: make(chan lnwire.Message, 1),
3232
quit: quit,
3333
}
3434
}
@@ -483,7 +483,9 @@ func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) {
483483
// transition it to chansSynced to ensure the remaining syncers
484484
// aren't started as active.
485485
if i == 0 {
486-
assertSyncerStatus(t, s, syncingChans, PassiveSync)
486+
assertSyncerStatus(
487+
t, s, waitingQueryRangeReply, PassiveSync,
488+
)
487489
continue
488490
}
489491

discovery/syncer.go

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,39 @@ func (g *GossipSyncer) Stop() {
475475
})
476476
}
477477

478+
// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
479+
// in this state, we will send a QueryChannelRange msg to our peer and advance
480+
// the syncer's state to waitingQueryRangeReply.
481+
func (g *GossipSyncer) handleSyncingChans() {
482+
// Prepare the query msg.
483+
queryRangeMsg, err := g.genChanRangeQuery(g.genHistoricalChanRangeQuery)
484+
if err != nil {
485+
log.Errorf("Unable to gen chan range query: %v", err)
486+
return
487+
}
488+
489+
// Acquire a lock so the following state transition is atomic.
490+
//
491+
// NOTE: We must lock the following steps as it's possible we get an
492+
// immediate response (ReplyChannelRange) after sending the query msg.
493+
// The response is handled in ProcessQueryMsg, which requires the
494+
// current state to be waitingQueryRangeReply.
495+
g.Lock()
496+
defer g.Unlock()
497+
498+
// Send the msg to the remote peer, which is non-blocking as
499+
// `sendToPeer` only queues the msg in Brontide.
500+
err = g.cfg.sendToPeer(queryRangeMsg)
501+
if err != nil {
502+
log.Errorf("Unable to send chan range query: %v", err)
503+
return
504+
}
505+
506+
// With the message sent successfully, we'll transition into the next
507+
// state where we wait for their reply.
508+
g.setSyncState(waitingQueryRangeReply)
509+
}
510+
478511
// channelGraphSyncer is the main goroutine responsible for ensuring that we
479512
// properly channel graph state with the remote peer, and also that we only
480513
// send them messages which actually pass their defined update horizon.
@@ -495,27 +528,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
495528
// understand, as we'll as responding to any other queries by
496529
// them.
497530
case syncingChans:
498-
// If we're in this state, then we'll send the remote
499-
// peer our opening QueryChannelRange message.
500-
queryRangeMsg, err := g.genChanRangeQuery(
501-
g.genHistoricalChanRangeQuery,
502-
)
503-
if err != nil {
504-
log.Errorf("Unable to gen chan range "+
505-
"query: %v", err)
506-
return
507-
}
508-
509-
err = g.cfg.sendToPeer(queryRangeMsg)
510-
if err != nil {
511-
log.Errorf("Unable to send chan range "+
512-
"query: %v", err)
513-
return
514-
}
515-
516-
// With the message sent successfully, we'll transition
517-
// into the next state where we wait for their reply.
518-
g.setSyncState(waitingQueryRangeReply)
531+
g.handleSyncingChans()
519532

520533
// In this state, we've sent out our initial channel range
521534
// query and are waiting for the final response from the remote
@@ -1342,9 +1355,9 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
13421355
return err
13431356
}
13441357

1345-
log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
1346-
"end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
1347-
len(newUpdatestoSend))
1358+
log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
1359+
"start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
1360+
startTime, endTime, len(newUpdatestoSend))
13481361

13491362
// If we don't have any to send, then we can return early.
13501363
if len(newUpdatestoSend) == 0 {
@@ -1515,12 +1528,15 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc
15151528
// Reply messages should only be expected in states where we're waiting
15161529
// for a reply.
15171530
case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
1531+
g.Lock()
15181532
syncState := g.syncState()
1533+
g.Unlock()
1534+
15191535
if syncState != waitingQueryRangeReply &&
15201536
syncState != waitingQueryChanReply {
15211537

1522-
return fmt.Errorf("received unexpected query reply "+
1523-
"message %T", msg)
1538+
return fmt.Errorf("unexpected msg %T received in "+
1539+
"state %v", msg, syncState)
15241540
}
15251541
msgChan = g.gossipMsgs
15261542

docs/release-notes/release-notes-0.19.0.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@
7474
This is a protocol gadget required for Dynamic Commitments and Splicing that
7575
will be added later.
7676

77+
* [Fixed](https://github.com/lightningnetwork/lnd/pull/9424) a case where the
78+
initial historical sync may be blocked due to a race condition in handling the
79+
syncer's internal state.
80+
7781
## Functional Enhancements
7882
* [Add ability](https://github.com/lightningnetwork/lnd/pull/8998) to paginate
7983
wallet transactions.

graph/builder.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,16 +1400,18 @@ func (b *Builder) processUpdate(msg interface{},
14001400
msg.ChannelID)
14011401
}
14021402

1403+
log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v",
1404+
edge1Timestamp, edge2Timestamp)
1405+
14031406
// As edges are directional edge node has a unique policy for
14041407
// the direction of the edge they control. Therefore, we first
14051408
// check if we already have the most up-to-date information for
14061409
// that edge. If this message has a timestamp not strictly
14071410
// newer than what we already know of we can exit early.
1408-
switch {
1411+
switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
14091412
// A flag set of 0 indicates this is an announcement for the
14101413
// "first" node in the channel.
1411-
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
1412-
1414+
case 0:
14131415
// Ignore outdated message.
14141416
if !edge1Timestamp.Before(msg.LastUpdate) {
14151417
return NewErrf(ErrOutdated, "Ignoring "+
@@ -1420,8 +1422,7 @@ func (b *Builder) processUpdate(msg interface{},
14201422

14211423
// Similarly, a flag set of 1 indicates this is an announcement
14221424
// for the "second" node in the channel.
1423-
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
1424-
1425+
case 1:
14251426
// Ignore outdated message.
14261427
if !edge2Timestamp.Before(msg.LastUpdate) {
14271428
return NewErrf(ErrOutdated, "Ignoring "+

graph/db/graph.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2797,6 +2797,10 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
27972797
tx, edge, c.graphCache,
27982798
)
27992799

2800+
if err != nil {
2801+
log.Errorf("UpdateEdgePolicy faild: %v", err)
2802+
}
2803+
28002804
// Silence ErrEdgeNotFound so that the batch can
28012805
// succeed, but propagate the error via local state.
28022806
if errors.Is(err, ErrEdgeNotFound) {

graph/db/graph_cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
228228
var inboundFee lnwire.Fee
229229
_, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee)
230230
if err != nil {
231+
log.Errorf("Failed to extract records from edge policy %v: %v",
232+
policy.ChannelID, err)
233+
231234
return
232235
}
233236

@@ -236,11 +239,16 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
236239

237240
updatePolicy := func(nodeKey route.Vertex) {
238241
if len(c.nodeChannels[nodeKey]) == 0 {
242+
log.Warnf("Node=%v not found in graph cache", nodeKey)
243+
239244
return
240245
}
241246

242247
channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
243248
if !ok {
249+
log.Warnf("Channel=%v not found in graph cache",
250+
policy.ChannelID)
251+
244252
return
245253
}
246254

graph/db/models/channel_edge_policy.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package models
22

33
import (
4+
"fmt"
45
"time"
56

67
"github.com/btcsuite/btcd/btcec/v2/ecdsa"
@@ -113,3 +114,10 @@ func (c *ChannelEdgePolicy) ComputeFee(
113114

114115
return c.FeeBaseMSat + (amt*c.FeeProportionalMillionths)/feeRateParts
115116
}
117+
118+
// String returns a human-readable version of the channel edge policy.
119+
func (c *ChannelEdgePolicy) String() string {
120+
return fmt.Sprintf("ChannelID=%v, MessageFlags=%v, ChannelFlags=%v, "+
121+
"LastUpdate=%v", c.ChannelID, c.MessageFlags, c.ChannelFlags,
122+
c.LastUpdate)
123+
}

routing/unified_edges.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ func (u *nodeEdgeUnifier) addPolicy(fromNode route.Vertex,
5959
// Skip channels if there is an outgoing channel restriction.
6060
if localChan && u.outChanRestr != nil {
6161
if _, ok := u.outChanRestr[edge.ChannelID]; !ok {
62+
log.Debugf("Skipped adding policy for restricted edge "+
63+
"%v", edge.ChannelID)
64+
6265
return
6366
}
6467
}
@@ -100,6 +103,9 @@ func (u *nodeEdgeUnifier) addGraphPolicies(g Graph) error {
100103
// Note that we are searching backwards so this node would have
101104
// come prior to the pivot node in the route.
102105
if channel.InPolicy == nil {
106+
log.Debugf("Skipped adding edge %v due to nil policy",
107+
channel.ChannelID)
108+
103109
return nil
104110
}
105111

0 commit comments

Comments
 (0)