Skip to content

Commit 4dbbd83

Browse files
authored
Merge pull request #9496 from ellemouton/graph4
graph: remove redundant iteration through a node's persisted channels
2 parents 7b29431 + 5c2c00e commit 4dbbd83

File tree

5 files changed

+81
-217
lines changed

5 files changed

+81
-217
lines changed

docs/release-notes/release-notes-0.19.0.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ The underlying functionality between those two options remain the same.
262262

263263
* Log rotation can now use ZSTD
264264

265+
* [Remove redundant
266+
iteration](https://github.com/lightningnetwork/lnd/pull/9496) over a node's
267+
persisted channels when updating the graph cache with a new node or node
268+
update.
269+
265270
## Deprecations
266271

267272
### ⚠️ **Warning:** The following RPCs will be removed in release version **0.21**:

graph/db/graph.go

Lines changed: 30 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,13 @@ func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph,
231231
log.Debugf("Populating in-memory channel graph, this might " +
232232
"take a while...")
233233

234-
err := g.ForEachNodeCacheable(
235-
func(tx kvdb.RTx, node GraphCacheNode) error {
236-
g.graphCache.AddNodeFeatures(node)
234+
err := g.ForEachNodeCacheable(func(node route.Vertex,
235+
features *lnwire.FeatureVector) error {
237236

238-
return nil
239-
},
240-
)
237+
g.graphCache.AddNodeFeatures(node, features)
238+
239+
return nil
240+
})
241241
if err != nil {
242242
return nil, err
243243
}
@@ -772,8 +772,8 @@ func (c *ChannelGraph) forEachNode(
772772
// graph, executing the passed callback with each node encountered. If the
773773
// callback returns an error, then the transaction is aborted and the iteration
774774
// stops early.
775-
func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
776-
GraphCacheNode) error) error {
775+
func (c *ChannelGraph) ForEachNodeCacheable(cb func(route.Vertex,
776+
*lnwire.FeatureVector) error) error {
777777

778778
traversal := func(tx kvdb.RTx) error {
779779
// First grab the nodes bucket which stores the mapping from
@@ -792,7 +792,7 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
792792
}
793793

794794
nodeReader := bytes.NewReader(nodeBytes)
795-
cacheableNode, err := deserializeLightningNodeCacheable(
795+
node, features, err := deserializeLightningNodeCacheable( //nolint:ll
796796
nodeReader,
797797
)
798798
if err != nil {
@@ -801,7 +801,7 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
801801

802802
// Execute the callback, the transaction will abort if
803803
// this returns an error.
804-
return cb(tx, cacheableNode)
804+
return cb(node, features)
805805
})
806806
}
807807

@@ -901,13 +901,9 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
901901
r := &batch.Request{
902902
Update: func(tx kvdb.RwTx) error {
903903
if c.graphCache != nil {
904-
cNode := newGraphCacheNode(
904+
c.graphCache.AddNodeFeatures(
905905
node.PubKeyBytes, node.Features,
906906
)
907-
err := c.graphCache.AddNode(tx, cNode)
908-
if err != nil {
909-
return err
910-
}
911907
}
912908

913909
return addLightningNode(tx, node)
@@ -3059,50 +3055,6 @@ func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx,
30593055
return node, nil
30603056
}
30613057

3062-
// graphCacheNode is a struct that wraps a LightningNode in a way that it can be
3063-
// cached in the graph cache.
3064-
type graphCacheNode struct {
3065-
pubKeyBytes route.Vertex
3066-
features *lnwire.FeatureVector
3067-
}
3068-
3069-
// newGraphCacheNode returns a new cache optimized node.
3070-
func newGraphCacheNode(pubKey route.Vertex,
3071-
features *lnwire.FeatureVector) *graphCacheNode {
3072-
3073-
return &graphCacheNode{
3074-
pubKeyBytes: pubKey,
3075-
features: features,
3076-
}
3077-
}
3078-
3079-
// PubKey returns the node's public identity key.
3080-
func (n *graphCacheNode) PubKey() route.Vertex {
3081-
return n.pubKeyBytes
3082-
}
3083-
3084-
// Features returns the node's features.
3085-
func (n *graphCacheNode) Features() *lnwire.FeatureVector {
3086-
return n.features
3087-
}
3088-
3089-
// ForEachChannel iterates through all channels of this node, executing the
3090-
// passed callback with an edge info structure and the policies of each end
3091-
// of the channel. The first edge policy is the outgoing edge *to* the
3092-
// connecting node, while the second is the incoming edge *from* the
3093-
// connecting node. If the callback returns an error, then the iteration is
3094-
// halted with the error propagated back up to the caller.
3095-
//
3096-
// Unknown policies are passed into the callback as nil values.
3097-
func (n *graphCacheNode) ForEachChannel(tx kvdb.RTx,
3098-
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
3099-
*models.ChannelEdgePolicy) error) error {
3100-
3101-
return nodeTraversal(tx, n.pubKeyBytes[:], nil, cb)
3102-
}
3103-
3104-
var _ GraphCacheNode = (*graphCacheNode)(nil)
3105-
31063058
// HasLightningNode determines if the graph has a vertex identified by the
31073059
// target node identity public key. If the node exists in the database, a
31083060
// timestamp of when the data for the node was lasted updated is returned along
@@ -4062,60 +4014,59 @@ func fetchLightningNode(nodeBucket kvdb.RBucket,
40624014
return deserializeLightningNode(nodeReader)
40634015
}
40644016

4065-
func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) {
4066-
// Always populate a feature vector, even if we don't have a node
4067-
// announcement and short circuit below.
4068-
node := newGraphCacheNode(
4069-
route.Vertex{},
4070-
lnwire.EmptyFeatureVector(),
4071-
)
4017+
func deserializeLightningNodeCacheable(r io.Reader) (route.Vertex,
4018+
*lnwire.FeatureVector, error) {
40724019

4073-
var nodeScratch [8]byte
4020+
var (
4021+
pubKey route.Vertex
4022+
features = lnwire.EmptyFeatureVector()
4023+
nodeScratch [8]byte
4024+
)
40744025

40754026
// Skip ahead:
40764027
// - LastUpdate (8 bytes)
40774028
if _, err := r.Read(nodeScratch[:]); err != nil {
4078-
return nil, err
4029+
return pubKey, nil, err
40794030
}
40804031

4081-
if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil {
4082-
return nil, err
4032+
if _, err := io.ReadFull(r, pubKey[:]); err != nil {
4033+
return pubKey, nil, err
40834034
}
40844035

40854036
// Read the node announcement flag.
40864037
if _, err := r.Read(nodeScratch[:2]); err != nil {
4087-
return nil, err
4038+
return pubKey, nil, err
40884039
}
40894040
hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])
40904041

40914042
// The rest of the data is optional, and will only be there if we got a
40924043
// node announcement for this node.
40934044
if hasNodeAnn == 0 {
4094-
return node, nil
4045+
return pubKey, features, nil
40954046
}
40964047

40974048
// We did get a node announcement for this node, so we'll have the rest
40984049
// of the data available.
40994050
var rgb uint8
41004051
if err := binary.Read(r, byteOrder, &rgb); err != nil {
4101-
return nil, err
4052+
return pubKey, nil, err
41024053
}
41034054
if err := binary.Read(r, byteOrder, &rgb); err != nil {
4104-
return nil, err
4055+
return pubKey, nil, err
41054056
}
41064057
if err := binary.Read(r, byteOrder, &rgb); err != nil {
4107-
return nil, err
4058+
return pubKey, nil, err
41084059
}
41094060

41104061
if _, err := wire.ReadVarString(r, 0); err != nil {
4111-
return nil, err
4062+
return pubKey, nil, err
41124063
}
41134064

4114-
if err := node.features.Decode(r); err != nil {
4115-
return nil, err
4065+
if err := features.Decode(r); err != nil {
4066+
return pubKey, nil, err
41164067
}
41174068

4118-
return node, nil
4069+
return pubKey, features, nil
41194070
}
41204071

41214072
func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {

graph/db/graph_cache.go

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,10 @@ import (
66

77
"github.com/btcsuite/btcd/btcutil"
88
"github.com/lightningnetwork/lnd/graph/db/models"
9-
"github.com/lightningnetwork/lnd/kvdb"
109
"github.com/lightningnetwork/lnd/lnwire"
1110
"github.com/lightningnetwork/lnd/routing/route"
1211
)
1312

14-
// GraphCacheNode is an interface for all the information the cache needs to know
15-
// about a lightning node.
16-
type GraphCacheNode interface {
17-
// PubKey is the node's public identity key.
18-
PubKey() route.Vertex
19-
20-
// Features returns the node's p2p features.
21-
Features() *lnwire.FeatureVector
22-
23-
// ForEachChannel iterates through all channels of a given node,
24-
// executing the passed callback with an edge info structure and the
25-
// policies of each end of the channel. The first edge policy is the
26-
// outgoing edge *to* the connecting node, while the second is the
27-
// incoming edge *from* the connecting node. If the callback returns an
28-
// error, then the iteration is halted with the error propagated back up
29-
// to the caller.
30-
ForEachChannel(kvdb.RTx,
31-
func(kvdb.RTx, *models.ChannelEdgeInfo,
32-
*models.ChannelEdgePolicy,
33-
*models.ChannelEdgePolicy) error) error
34-
}
35-
3613
// DirectedChannel is a type that stores the channel information as seen from
3714
// one side of the channel.
3815
type DirectedChannel struct {
@@ -124,33 +101,13 @@ func (c *GraphCache) Stats() string {
124101
}
125102

126103
// AddNodeFeatures adds a graph node and its features to the cache.
127-
func (c *GraphCache) AddNodeFeatures(node GraphCacheNode) {
128-
nodePubKey := node.PubKey()
104+
func (c *GraphCache) AddNodeFeatures(node route.Vertex,
105+
features *lnwire.FeatureVector) {
129106

130-
// Only hold the lock for a short time. The `ForEachChannel()` below is
131-
// possibly slow as it has to go to the backend, so we can unlock
132-
// between the calls. And the AddChannel() method will acquire its own
133-
// lock anyway.
134107
c.mtx.Lock()
135-
c.nodeFeatures[nodePubKey] = node.Features()
136-
c.mtx.Unlock()
137-
}
138-
139-
// AddNode adds a graph node, including all the (directed) channels of that
140-
// node.
141-
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
142-
c.AddNodeFeatures(node)
143-
144-
return node.ForEachChannel(
145-
tx, func(tx kvdb.RTx, info *models.ChannelEdgeInfo,
146-
outPolicy *models.ChannelEdgePolicy,
147-
inPolicy *models.ChannelEdgePolicy) error {
148-
149-
c.AddChannel(info, outPolicy, inPolicy)
108+
defer c.mtx.Unlock()
150109

151-
return nil
152-
},
153-
)
110+
c.nodeFeatures[node] = features
154111
}
155112

156113
// AddChannel adds a non-directed channel, meaning that the order of policy 1

graph/db/graph_cache_test.go

Lines changed: 8 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"testing"
66

77
"github.com/lightningnetwork/lnd/graph/db/models"
8-
"github.com/lightningnetwork/lnd/kvdb"
98
"github.com/lightningnetwork/lnd/lnwire"
109
"github.com/lightningnetwork/lnd/routing/route"
1110
"github.com/stretchr/testify/require"
@@ -25,39 +24,6 @@ var (
2524
pubKey2, _ = route.NewVertexFromBytes(pubKey2Bytes)
2625
)
2726

28-
type node struct {
29-
pubKey route.Vertex
30-
features *lnwire.FeatureVector
31-
32-
edgeInfos []*models.ChannelEdgeInfo
33-
outPolicies []*models.ChannelEdgePolicy
34-
inPolicies []*models.ChannelEdgePolicy
35-
}
36-
37-
func (n *node) PubKey() route.Vertex {
38-
return n.pubKey
39-
}
40-
func (n *node) Features() *lnwire.FeatureVector {
41-
return n.features
42-
}
43-
44-
func (n *node) ForEachChannel(tx kvdb.RTx,
45-
cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
46-
*models.ChannelEdgePolicy) error) error {
47-
48-
for idx := range n.edgeInfos {
49-
err := cb(
50-
tx, n.edgeInfos[idx], n.outPolicies[idx],
51-
n.inPolicies[idx],
52-
)
53-
if err != nil {
54-
return err
55-
}
56-
}
57-
58-
return nil
59-
}
60-
6127
// TestGraphCacheAddNode tests that a channel going from node A to node B can be
6228
// cached correctly, independent of the direction we add the channel as.
6329
func TestGraphCacheAddNode(t *testing.T) {
@@ -85,21 +51,15 @@ func TestGraphCacheAddNode(t *testing.T) {
8551
ChannelFlags: lnwire.ChanUpdateChanFlags(channelFlagB),
8652
ToNode: nodeA,
8753
}
88-
node := &node{
89-
pubKey: nodeA,
90-
features: lnwire.EmptyFeatureVector(),
91-
edgeInfos: []*models.ChannelEdgeInfo{{
92-
ChannelID: 1000,
93-
// Those are direction independent!
94-
NodeKey1Bytes: pubKey1,
95-
NodeKey2Bytes: pubKey2,
96-
Capacity: 500,
97-
}},
98-
outPolicies: []*models.ChannelEdgePolicy{outPolicy1},
99-
inPolicies: []*models.ChannelEdgePolicy{inPolicy1},
100-
}
10154
cache := NewGraphCache(10)
102-
require.NoError(t, cache.AddNode(nil, node))
55+
cache.AddNodeFeatures(nodeA, lnwire.EmptyFeatureVector())
56+
cache.AddChannel(&models.ChannelEdgeInfo{
57+
ChannelID: 1000,
58+
// Those are direction independent!
59+
NodeKey1Bytes: pubKey1,
60+
NodeKey2Bytes: pubKey2,
61+
Capacity: 500,
62+
}, outPolicy1, inPolicy1)
10363

10464
var fromChannels, toChannels []*DirectedChannel
10565
_ = cache.ForEachChannel(nodeA, func(c *DirectedChannel) error {

0 commit comments

Comments
 (0)