Skip to content

Commit c52c6c9

Browse files
committed
graph/db: introduce ForEachChannelCacheable
Implement ForEachChannelCacheable which is like ForEachChannel but its call-back takes the cached versions of channel info & policies. This is then used during graph cache population. This will be useful once the SQL implementation is added so that we can reduce the number of DB trips on cache population.
1 parent 3069a67 commit c52c6c9

File tree

3 files changed

+90
-9
lines changed

3 files changed

+90
-9
lines changed

graph/db/graph.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -174,17 +174,14 @@ func (c *ChannelGraph) populateCache() error {
174174
return err
175175
}
176176

177-
err = c.V1Store.ForEachChannel(func(info *models.ChannelEdgeInfo,
178-
policy1, policy2 *models.ChannelEdgePolicy) error {
177+
err = c.V1Store.ForEachChannelCacheable(
178+
func(info *models.CachedEdgeInfo,
179+
policy1, policy2 *models.CachedEdgePolicy) error {
179180

180-
c.graphCache.AddChannel(
181-
models.NewCachedEdge(info),
182-
models.NewCachedPolicy(policy1),
183-
models.NewCachedPolicy(policy2),
184-
)
181+
c.graphCache.AddChannel(info, policy1, policy2)
185182

186-
return nil
187-
})
183+
return nil
184+
})
188185
if err != nil {
189186
return err
190187
}

graph/db/interfaces.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,22 @@ type V1Store interface { //nolint:interfacebloat
159159
*models.ChannelEdgePolicy,
160160
*models.ChannelEdgePolicy) error) error
161161

162+
// ForEachChannelCacheable iterates through all the channel edges stored
163+
// within the graph and invokes the passed callback for each edge. The
164+
// callback takes two edges as since this is a directed graph, both the
165+
// in/out edges are visited. If the callback returns an error, then the
166+
// transaction is aborted and the iteration stops early.
167+
//
168+
// NOTE: If an edge can't be found, or wasn't advertised, then a nil
169+
// pointer for that particular channel edge routing policy will be
170+
// passed into the callback.
171+
//
172+
// NOTE: this method is like ForEachChannel but fetches only the data
173+
// required for the graph cache.
174+
ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
175+
*models.CachedEdgePolicy,
176+
*models.CachedEdgePolicy) error) error
177+
162178
// DisabledChannelIDs returns the channel ids of disabled channels.
163179
// A channel is disabled when two of the associated ChanelEdgePolicies
164180
// have their disabled bit on.

graph/db/kv_store.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,74 @@ func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
453453
}, func() {})
454454
}
455455

456+
// ForEachChannelCacheable iterates through all the channel edges stored within
457+
// the graph and invokes the passed callback for each edge. The callback takes
458+
// two edges as since this is a directed graph, both the in/out edges are
459+
// visited. If the callback returns an error, then the transaction is aborted
460+
// and the iteration stops early.
461+
//
462+
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
463+
// for that particular channel edge routing policy will be passed into the
464+
// callback.
465+
//
466+
// NOTE: this method is like ForEachChannel but fetches only the data required
467+
// for the graph cache.
468+
func (c *KVStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
469+
*models.CachedEdgePolicy, *models.CachedEdgePolicy) error) error {
470+
471+
return c.db.View(func(tx kvdb.RTx) error {
472+
edges := tx.ReadBucket(edgeBucket)
473+
if edges == nil {
474+
return ErrGraphNoEdgesFound
475+
}
476+
477+
// First, load all edges in memory indexed by node and channel
478+
// id.
479+
channelMap, err := c.getChannelMap(edges)
480+
if err != nil {
481+
return err
482+
}
483+
484+
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
485+
if edgeIndex == nil {
486+
return ErrGraphNoEdgesFound
487+
}
488+
489+
// Load edge index, recombine each channel with the policies
490+
// loaded above and invoke the callback.
491+
return kvdb.ForAll(
492+
edgeIndex, func(k, edgeInfoBytes []byte) error {
493+
var chanID [8]byte
494+
copy(chanID[:], k)
495+
496+
edgeInfoReader := bytes.NewReader(edgeInfoBytes)
497+
info, err := deserializeChanEdgeInfo(
498+
edgeInfoReader,
499+
)
500+
if err != nil {
501+
return err
502+
}
503+
504+
policy1 := channelMap[channelMapKey{
505+
nodeKey: info.NodeKey1Bytes,
506+
chanID: chanID,
507+
}]
508+
509+
policy2 := channelMap[channelMapKey{
510+
nodeKey: info.NodeKey2Bytes,
511+
chanID: chanID,
512+
}]
513+
514+
return cb(
515+
models.NewCachedEdge(&info),
516+
models.NewCachedPolicy(policy1),
517+
models.NewCachedPolicy(policy2),
518+
)
519+
},
520+
)
521+
}, func() {})
522+
}
523+
456524
// forEachNodeDirectedChannel iterates through all channels of a given node,
457525
// executing the passed callback on the directed edge representing the channel
458526
// and its incoming policy. If the callback returns an error, then the iteration

0 commit comments

Comments
 (0)