Skip to content

Commit 40efefe

Browse files
authored
Merge pull request #9959 from ellemouton/chanGraphContext2
multi: add context.Context param to more graphdb.V1Store methods
2 parents e0a9705 + 9151362 commit 40efefe

26 files changed

+344
-272
lines changed

autopilot/prefattach_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey,
488488
Capacity: capacity,
489489
}
490490
edge.AddNodeKeys(lnNode1, lnNode2, lnNode1, lnNode2)
491-
if err := d.db.AddChannelEdge(edge); err != nil {
491+
if err := d.db.AddChannelEdge(ctx, edge); err != nil {
492492
return nil, nil, err
493493
}
494494
edgePolicy := &models.ChannelEdgePolicy{
@@ -504,7 +504,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey,
504504
ChannelFlags: 0,
505505
}
506506

507-
if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil {
507+
if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil {
508508
return nil, nil, err
509509
}
510510
edgePolicy = &models.ChannelEdgePolicy{
@@ -519,7 +519,7 @@ func (d *testDBGraph) addRandChannel(node1, node2 *btcec.PublicKey,
519519
MessageFlags: 1,
520520
ChannelFlags: 1,
521521
}
522-
if err := d.db.UpdateEdgePolicy(edgePolicy); err != nil {
522+
if err := d.db.UpdateEdgePolicy(ctx, edgePolicy); err != nil {
523523
return nil, nil, err
524524
}
525525

discovery/chan_series.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package discovery
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/btcsuite/btcd/chaincfg/chainhash"
@@ -22,7 +23,8 @@ type ChannelGraphTimeSeries interface {
2223
// height that's close to the current tip of the main chain as we
2324
// know it. We'll use this to start our QueryChannelRange dance with
2425
// the remote node.
25-
HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error)
26+
HighestChanID(ctx context.Context,
27+
chain chainhash.Hash) (*lnwire.ShortChannelID, error)
2628

2729
// UpdatesInHorizon returns all known channel and node updates with an
2830
// update timestamp between the start time and end time. We'll use this
@@ -87,8 +89,10 @@ func NewChanSeries(graph *graphdb.ChannelGraph) *ChanSeries {
8789
// this to start our QueryChannelRange dance with the remote node.
8890
//
8991
// NOTE: This is part of the ChannelGraphTimeSeries interface.
90-
func (c *ChanSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
91-
chanID, err := c.graph.HighestChanID()
92+
func (c *ChanSeries) HighestChanID(ctx context.Context,
93+
_ chainhash.Hash) (*lnwire.ShortChannelID, error) {
94+
95+
chanID, err := c.graph.HighestChanID(ctx)
9296
if err != nil {
9397
return nil, err
9498
}

discovery/gossiper.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2288,7 +2288,7 @@ func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,
22882288

22892289
// updateChannel creates a new fully signed update for the channel, and updates
22902290
// the underlying graph with the new state.
2291-
func (d *AuthenticatedGossiper) updateChannel(_ context.Context,
2291+
func (d *AuthenticatedGossiper) updateChannel(ctx context.Context,
22922292
info *models.ChannelEdgeInfo,
22932293
edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
22942294
*lnwire.ChannelUpdate1, error) {
@@ -2322,7 +2322,7 @@ func (d *AuthenticatedGossiper) updateChannel(_ context.Context,
23222322
}
23232323

23242324
// Finally, we'll write the new edge policy to disk.
2325-
if err := d.cfg.Graph.UpdateEdge(edge); err != nil {
2325+
if err := d.cfg.Graph.UpdateEdge(ctx, edge); err != nil {
23262326
return nil, nil, err
23272327
}
23282328

@@ -2808,7 +2808,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
28082808
// We will add the edge to the channel router. If the nodes present in
28092809
// this channel are not present in the database, a partial node will be
28102810
// added to represent each node while we wait for a node announcement.
2811-
err = d.cfg.Graph.AddEdge(edge, ops...)
2811+
err = d.cfg.Graph.AddEdge(ctx, edge, ops...)
28122812
if err != nil {
28132813
log.Debugf("Graph rejected edge for short_chan_id(%v): %v",
28142814
scid.ToUint64(), err)
@@ -3263,7 +3263,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
32633263
ExtraOpaqueData: upd.ExtraOpaqueData,
32643264
}
32653265

3266-
if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
3266+
if err := d.cfg.Graph.UpdateEdge(ctx, update, ops...); err != nil {
32673267
if graph.IsError(
32683268
err, graph.ErrOutdated,
32693269
graph.ErrIgnored,

discovery/gossiper_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ func (r *mockGraphSource) IsZombieEdge(chanID lnwire.ShortChannelID) (bool,
136136
return ok, nil
137137
}
138138

139-
func (r *mockGraphSource) AddEdge(info *models.ChannelEdgeInfo,
140-
_ ...batch.SchedulerOption) error {
139+
func (r *mockGraphSource) AddEdge(_ context.Context,
140+
info *models.ChannelEdgeInfo, _ ...batch.SchedulerOption) error {
141141

142142
r.mu.Lock()
143143
defer r.mu.Unlock()
@@ -161,8 +161,8 @@ func (r *mockGraphSource) queueValidationFail(chanID uint64) {
161161
r.chansToReject[chanID] = struct{}{}
162162
}
163163

164-
func (r *mockGraphSource) UpdateEdge(edge *models.ChannelEdgePolicy,
165-
_ ...batch.SchedulerOption) error {
164+
func (r *mockGraphSource) UpdateEdge(_ context.Context,
165+
edge *models.ChannelEdgePolicy, _ ...batch.SchedulerOption) error {
166166

167167
r.mu.Lock()
168168
defer func() {

discovery/syncer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -965,12 +965,14 @@ func (g *GossipSyncer) processChanRangeReply(_ context.Context,
965965
// party when we're kicking off the channel graph synchronization upon
966966
// connection. The historicalQuery boolean can be used to generate a query from
967967
// the genesis block of the chain.
968-
func (g *GossipSyncer) genChanRangeQuery(_ context.Context,
968+
func (g *GossipSyncer) genChanRangeQuery(ctx context.Context,
969969
historicalQuery bool) (*lnwire.QueryChannelRange, error) {
970970

971971
// First, we'll query our channel graph time series for its highest
972972
// known channel ID.
973-
newestChan, err := g.cfg.channelSeries.HighestChanID(g.cfg.chainHash)
973+
newestChan, err := g.cfg.channelSeries.HighestChanID(
974+
ctx, g.cfg.chainHash,
975+
)
974976
if err != nil {
975977
return nil, err
976978
}

discovery/syncer_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,12 @@ func newMockChannelGraphTimeSeries(
7979
}
8080
}
8181

82-
func (m *mockChannelGraphTimeSeries) HighestChanID(chain chainhash.Hash) (*lnwire.ShortChannelID, error) {
82+
func (m *mockChannelGraphTimeSeries) HighestChanID(_ context.Context,
83+
_ chainhash.Hash) (*lnwire.ShortChannelID, error) {
84+
8385
return &m.highestID, nil
8486
}
87+
8588
func (m *mockChannelGraphTimeSeries) UpdatesInHorizon(chain chainhash.Hash,
8689
startTime time.Time, endTime time.Time) ([]lnwire.Message, error) {
8790

graph/builder.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,8 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error {
915915
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
916916
// database. It returns a bool indicating whether the updates were successful.
917917
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
918+
ctx := context.TODO()
919+
918920
ch, _, _, err := b.GetChannelByID(msg.ShortChannelID)
919921
if err != nil {
920922
log.Errorf("Unable to retrieve channel by id: %v", err)
@@ -959,7 +961,7 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
959961
ExtraOpaqueData: msg.ExtraOpaqueData,
960962
}
961963

962-
err = b.UpdateEdge(update)
964+
err = b.UpdateEdge(ctx, update)
963965
if err != nil && !IsError(err, ErrIgnored, ErrOutdated) {
964966
log.Errorf("Unable to apply channel update: %v", err)
965967
return false
@@ -1017,10 +1019,10 @@ func (b *Builder) addNode(ctx context.Context, node *models.LightningNode,
10171019
// in construction of payment path.
10181020
//
10191021
// NOTE: This method is part of the ChannelGraphSource interface.
1020-
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
1022+
func (b *Builder) AddEdge(ctx context.Context, edge *models.ChannelEdgeInfo,
10211023
op ...batch.SchedulerOption) error {
10221024

1023-
err := b.addEdge(edge, op...)
1025+
err := b.addEdge(ctx, edge, op...)
10241026
if err != nil {
10251027
logNetworkMsgProcessError(err)
10261028

@@ -1038,7 +1040,7 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
10381040
//
10391041
// TODO(elle): this currently also does funding-transaction validation. But this
10401042
// should be moved to the gossiper instead.
1041-
func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
1043+
func (b *Builder) addEdge(ctx context.Context, edge *models.ChannelEdgeInfo,
10421044
op ...batch.SchedulerOption) error {
10431045

10441046
log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID)
@@ -1061,7 +1063,7 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
10611063
edge.ChannelID)
10621064
}
10631065

1064-
if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil {
1066+
if err := b.cfg.Graph.AddChannelEdge(ctx, edge, op...); err != nil {
10651067
return fmt.Errorf("unable to add edge: %w", err)
10661068
}
10671069

@@ -1118,10 +1120,10 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
11181120
// considered as not fully constructed.
11191121
//
11201122
// NOTE: This method is part of the ChannelGraphSource interface.
1121-
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
1122-
op ...batch.SchedulerOption) error {
1123+
func (b *Builder) UpdateEdge(ctx context.Context,
1124+
update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error {
11231125

1124-
err := b.updateEdge(update, op...)
1126+
err := b.updateEdge(ctx, update, op...)
11251127
if err != nil {
11261128
logNetworkMsgProcessError(err)
11271129

@@ -1135,8 +1137,8 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
11351137
// persisted in the graph, and then applies it to the graph if the update is
11361138
// considered fresh enough and if we actually have a channel persisted for the
11371139
// given update.
1138-
func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
1139-
op ...batch.SchedulerOption) error {
1140+
func (b *Builder) updateEdge(ctx context.Context,
1141+
policy *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error {
11401142

11411143
log.Debugf("Received ChannelEdgePolicy for channel %v",
11421144
policy.ChannelID)
@@ -1209,7 +1211,7 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
12091211

12101212
// Now that we know this isn't a stale update, we'll apply the new edge
12111213
// policy to the proper directional edge within the channel graph.
1212-
if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil {
1214+
if err = b.cfg.Graph.UpdateEdgePolicy(ctx, policy, op...); err != nil {
12131215
err := errors.Errorf("unable to add channel: %v", err)
12141216
log.Error(err)
12151217
return err

0 commit comments

Comments
 (0)