Skip to content

Commit 07f65b5

Browse files
authored
Merge pull request #9923 from ellemouton/graphCache
graph/db: only fetch required info for graph cache population
2 parents 92a5d35 + 6fb90c8 commit 07f65b5

File tree

9 files changed

+177
-69
lines changed

9 files changed

+177
-69
lines changed

docs/release-notes/release-notes-0.20.0.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ circuit. The indices are only available for forwarding events saved after v0.20.
5353
byte blobs at the end of gossip messages are valid TLV streams.
5454
* Various [preparations](https://github.com/lightningnetwork/lnd/pull/9692)
5555
of the graph code before the SQL implementation is added.
56+
* Only [fetch required
57+
fields](https://github.com/lightningnetwork/lnd/pull/9923) during graph
58+
cache population.
5659
* Add graph schemas, queries and CRUD:
5760
* [1](https://github.com/lightningnetwork/lnd/pull/9866)
5861

graph/db/graph.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,14 @@ func (c *ChannelGraph) populateCache() error {
174174
return err
175175
}
176176

177-
err = c.V1Store.ForEachChannel(func(info *models.ChannelEdgeInfo,
178-
policy1, policy2 *models.ChannelEdgePolicy) error {
177+
err = c.V1Store.ForEachChannelCacheable(
178+
func(info *models.CachedEdgeInfo,
179+
policy1, policy2 *models.CachedEdgePolicy) error {
179180

180-
c.graphCache.AddChannel(info, policy1, policy2)
181+
c.graphCache.AddChannel(info, policy1, policy2)
181182

182-
return nil
183-
})
183+
return nil
184+
})
184185
if err != nil {
185186
return err
186187
}
@@ -312,7 +313,7 @@ func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
312313
}
313314

314315
if c.graphCache != nil {
315-
c.graphCache.AddChannel(edge, nil, nil)
316+
c.graphCache.AddChannel(models.NewCachedEdge(edge), nil, nil)
316317
}
317318

318319
select {
@@ -347,7 +348,11 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
347348

348349
info := infos[0]
349350

350-
c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
351+
c.graphCache.AddChannel(
352+
models.NewCachedEdge(info.Info),
353+
models.NewCachedPolicy(info.Policy1),
354+
models.NewCachedPolicy(info.Policy2),
355+
)
351356
}
352357

353358
return nil
@@ -566,12 +571,9 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
566571
}
567572

568573
if c.graphCache != nil {
569-
var isUpdate1 bool
570-
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
571-
isUpdate1 = true
572-
}
573-
574-
c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
574+
c.graphCache.UpdatePolicy(
575+
models.NewCachedPolicy(edge), from, to,
576+
)
575577
}
576578

577579
select {

graph/db/graph_cache.go

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ func (c *GraphCache) AddNodeFeatures(node route.Vertex,
114114
// and policy 2 does not matter, the directionality is extracted from the info
115115
// and policy flags automatically. The policy will be set as the outgoing policy
116116
// on one node and the incoming policy on the peer's side.
117-
func (c *GraphCache) AddChannel(info *models.ChannelEdgeInfo,
118-
policy1 *models.ChannelEdgePolicy, policy2 *models.ChannelEdgePolicy) {
117+
func (c *GraphCache) AddChannel(info *models.CachedEdgeInfo,
118+
policy1, policy2 *models.CachedEdgePolicy) {
119119

120120
if info == nil {
121121
return
@@ -147,19 +147,17 @@ func (c *GraphCache) AddChannel(info *models.ChannelEdgeInfo,
147147
// of node 2 then we have the policy 1 as seen from node 1.
148148
if policy1 != nil {
149149
fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
150-
if policy1.ToNode != info.NodeKey2Bytes {
150+
if !policy1.IsNode1() {
151151
fromNode, toNode = toNode, fromNode
152152
}
153-
isEdge1 := policy1.ChannelFlags&lnwire.ChanUpdateDirection == 0
154-
c.UpdatePolicy(policy1, fromNode, toNode, isEdge1)
153+
c.UpdatePolicy(policy1, fromNode, toNode)
155154
}
156155
if policy2 != nil {
157156
fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
158-
if policy2.ToNode != info.NodeKey1Bytes {
157+
if policy2.IsNode1() {
159158
fromNode, toNode = toNode, fromNode
160159
}
161-
isEdge1 := policy2.ChannelFlags&lnwire.ChanUpdateDirection == 0
162-
c.UpdatePolicy(policy2, fromNode, toNode, isEdge1)
160+
c.UpdatePolicy(policy2, fromNode, toNode)
163161
}
164162
}
165163

@@ -177,8 +175,8 @@ func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
177175
// of the from and to node is not strictly important. But we assume that a
178176
// channel edge was added beforehand so that the directed channel struct already
179177
// exists in the cache.
180-
func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
181-
toNode route.Vertex, edge1 bool) {
178+
func (c *GraphCache) UpdatePolicy(policy *models.CachedEdgePolicy, fromNode,
179+
toNode route.Vertex) {
182180

183181
c.mtx.Lock()
184182
defer c.mtx.Unlock()
@@ -203,15 +201,15 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
203201
switch {
204202
// This is node 1, and it is edge 1, so this is the outgoing
205203
// policy for node 1.
206-
case channel.IsNode1 && edge1:
204+
case channel.IsNode1 && policy.IsNode1():
207205
channel.OutPolicySet = true
208206
policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
209207
channel.InboundFee = fee
210208
})
211209

212210
// This is node 2, and it is edge 2, so this is the outgoing
213211
// policy for node 2.
214-
case !channel.IsNode1 && !edge1:
212+
case !channel.IsNode1 && !policy.IsNode1():
215213
channel.OutPolicySet = true
216214
policy.InboundFee.WhenSome(func(fee lnwire.Fee) {
217215
channel.InboundFee = fee
@@ -220,7 +218,7 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
220218
// The other two cases left mean it's the inbound policy for the
221219
// node.
222220
default:
223-
channel.InPolicy = models.NewCachedPolicy(policy)
221+
channel.InPolicy = policy
224222
}
225223
}
226224

@@ -264,34 +262,6 @@ func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
264262
delete(c.nodeChannels[node], chanID)
265263
}
266264

267-
// UpdateChannel updates the channel edge information for a specific edge. We
268-
// expect the edge to already exist and be known. If it does not yet exist, this
269-
// call is a no-op.
270-
func (c *GraphCache) UpdateChannel(info *models.ChannelEdgeInfo) {
271-
c.mtx.Lock()
272-
defer c.mtx.Unlock()
273-
274-
if len(c.nodeChannels[info.NodeKey1Bytes]) == 0 ||
275-
len(c.nodeChannels[info.NodeKey2Bytes]) == 0 {
276-
277-
return
278-
}
279-
280-
channel, ok := c.nodeChannels[info.NodeKey1Bytes][info.ChannelID]
281-
if ok {
282-
// We only expect to be called when the channel is already
283-
// known.
284-
channel.Capacity = info.Capacity
285-
channel.OtherNode = info.NodeKey2Bytes
286-
}
287-
288-
channel, ok = c.nodeChannels[info.NodeKey2Bytes][info.ChannelID]
289-
if ok {
290-
channel.Capacity = info.Capacity
291-
channel.OtherNode = info.NodeKey1Bytes
292-
}
293-
}
294-
295265
// getChannels returns a copy of the passed node's channels or nil if there
296266
// isn't any.
297267
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {

graph/db/graph_cache_test.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,25 @@ func TestGraphCacheAddNode(t *testing.T) {
4343
FeeRate: 20,
4444
}
4545

46-
outPolicy1 := &models.ChannelEdgePolicy{
46+
outPolicy1 := &models.CachedEdgePolicy{
4747
ChannelID: 1000,
4848
ChannelFlags: lnwire.ChanUpdateChanFlags(channelFlagA),
49-
ToNode: nodeB,
49+
ToNodePubKey: func() route.Vertex {
50+
return nodeB
51+
},
5052
// Define an inbound fee.
5153
InboundFee: fn.Some(inboundFee),
52-
ExtraOpaqueData: []byte{
53-
253, 217, 3, 8, 0, 0, 0, 10, 0, 0, 0, 20,
54-
},
5554
}
56-
inPolicy1 := &models.ChannelEdgePolicy{
55+
inPolicy1 := &models.CachedEdgePolicy{
5756
ChannelID: 1000,
5857
ChannelFlags: lnwire.ChanUpdateChanFlags(channelFlagB),
59-
ToNode: nodeA,
58+
ToNodePubKey: func() route.Vertex {
59+
return nodeA
60+
},
6061
}
6162
cache := NewGraphCache(10)
6263
cache.AddNodeFeatures(nodeA, lnwire.EmptyFeatureVector())
63-
cache.AddChannel(&models.ChannelEdgeInfo{
64+
cache.AddChannel(&models.CachedEdgeInfo{
6465
ChannelID: 1000,
6566
// Those are direction independent!
6667
NodeKey1Bytes: pubKey1,
@@ -120,7 +121,7 @@ func TestGraphCacheAddNode(t *testing.T) {
120121
runTest(pubKey2, pubKey1)
121122
}
122123

123-
func assertCachedPolicyEqual(t *testing.T, original *models.ChannelEdgePolicy,
124+
func assertCachedPolicyEqual(t *testing.T, original,
124125
cached *models.CachedEdgePolicy) {
125126

126127
require.Equal(t, original.ChannelID, cached.ChannelID)
@@ -134,7 +135,7 @@ func assertCachedPolicyEqual(t *testing.T, original *models.ChannelEdgePolicy,
134135
t, original.FeeProportionalMillionths,
135136
cached.FeeProportionalMillionths,
136137
)
137-
require.Equal(
138-
t, route.Vertex(original.ToNode), cached.ToNodePubKey(),
139-
)
138+
if original.ToNodePubKey != nil {
139+
require.Equal(t, original.ToNodePubKey(), cached.ToNodePubKey())
140+
}
140141
}

graph/db/interfaces.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,22 @@ type V1Store interface { //nolint:interfacebloat
159159
*models.ChannelEdgePolicy,
160160
*models.ChannelEdgePolicy) error) error
161161

162+
// ForEachChannelCacheable iterates through all the channel edges stored
163+
// within the graph and invokes the passed callback for each edge. The
164+
// callback takes two edges as since this is a directed graph, both the
165+
// in/out edges are visited. If the callback returns an error, then the
166+
// transaction is aborted and the iteration stops early.
167+
//
168+
// NOTE: If an edge can't be found, or wasn't advertised, then a nil
169+
// pointer for that particular channel edge routing policy will be
170+
// passed into the callback.
171+
//
172+
// NOTE: this method is like ForEachChannel but fetches only the data
173+
// required for the graph cache.
174+
ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
175+
*models.CachedEdgePolicy,
176+
*models.CachedEdgePolicy) error) error
177+
162178
// DisabledChannelIDs returns the channel ids of disabled channels.
163179
// A channel is disabled when two of the associated ChanelEdgePolicies
164180
// have their disabled bit on.

graph/db/kv_store.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,74 @@ func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
453453
}, func() {})
454454
}
455455

456+
// ForEachChannelCacheable iterates through all the channel edges stored within
457+
// the graph and invokes the passed callback for each edge. The callback takes
458+
// two edges as since this is a directed graph, both the in/out edges are
459+
// visited. If the callback returns an error, then the transaction is aborted
460+
// and the iteration stops early.
461+
//
462+
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
463+
// for that particular channel edge routing policy will be passed into the
464+
// callback.
465+
//
466+
// NOTE: this method is like ForEachChannel but fetches only the data required
467+
// for the graph cache.
468+
func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
469+
*models.CachedEdgePolicy, *models.CachedEdgePolicy) error) error {
470+
471+
return c.db.View(func(tx kvdb.RTx) error {
472+
edges := tx.ReadBucket(edgeBucket)
473+
if edges == nil {
474+
return ErrGraphNoEdgesFound
475+
}
476+
477+
// First, load all edges in memory indexed by node and channel
478+
// id.
479+
channelMap, err := c.getChannelMap(edges)
480+
if err != nil {
481+
return err
482+
}
483+
484+
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
485+
if edgeIndex == nil {
486+
return ErrGraphNoEdgesFound
487+
}
488+
489+
// Load edge index, recombine each channel with the policies
490+
// loaded above and invoke the callback.
491+
return kvdb.ForAll(
492+
edgeIndex, func(k, edgeInfoBytes []byte) error {
493+
var chanID [8]byte
494+
copy(chanID[:], k)
495+
496+
edgeInfoReader := bytes.NewReader(edgeInfoBytes)
497+
info, err := deserializeChanEdgeInfo(
498+
edgeInfoReader,
499+
)
500+
if err != nil {
501+
return err
502+
}
503+
504+
policy1 := channelMap[channelMapKey{
505+
nodeKey: info.NodeKey1Bytes,
506+
chanID: chanID,
507+
}]
508+
509+
policy2 := channelMap[channelMapKey{
510+
nodeKey: info.NodeKey2Bytes,
511+
chanID: chanID,
512+
}]
513+
514+
return cb(
515+
models.NewCachedEdge(&info),
516+
models.NewCachedPolicy(policy1),
517+
models.NewCachedPolicy(policy2),
518+
)
519+
},
520+
)
521+
}, func() {})
522+
}
523+
456524
// forEachNodeDirectedChannel iterates through all channels of a given node,
457525
// executing the passed callback on the directed edge representing the channel
458526
// and its incoming policy. If the callback returns an error, then the iteration

graph/db/models/cached_edge_info.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package models
2+
3+
import "github.com/btcsuite/btcd/btcutil"
4+
5+
// CachedEdgeInfo is a struct that only caches the information of a
6+
// ChannelEdgeInfo that we actually use for pathfinding and therefore need to
7+
// store in the cache.
8+
type CachedEdgeInfo struct {
9+
// ChannelID is the unique channel ID for the channel. The first 3
10+
// bytes are the block height, the next 3 the index within the block,
11+
// and the last 2 bytes are the output index for the channel.
12+
ChannelID uint64
13+
14+
// NodeKey1Bytes is the raw public key of the first node.
15+
NodeKey1Bytes [33]byte
16+
17+
// NodeKey2Bytes is the raw public key of the second node.
18+
NodeKey2Bytes [33]byte
19+
20+
// Capacity is the total capacity of the channel, this is determined by
21+
// the value output in the outpoint that created this channel.
22+
Capacity btcutil.Amount
23+
}
24+
25+
// NewCachedEdge creates a new CachedEdgeInfo from the provided ChannelEdgeInfo.
26+
func NewCachedEdge(edgeInfo *ChannelEdgeInfo) *CachedEdgeInfo {
27+
return &CachedEdgeInfo{
28+
ChannelID: edgeInfo.ChannelID,
29+
NodeKey1Bytes: edgeInfo.NodeKey1Bytes,
30+
NodeKey2Bytes: edgeInfo.NodeKey2Bytes,
31+
Capacity: edgeInfo.Capacity,
32+
}
33+
}

0 commit comments

Comments
 (0)