Skip to content

Commit fb1fef9

Browse files
authored
Merge pull request #9971 from ellemouton/graphSQL16-closed-scids
[16] graph/db: SQL closed SCIDs table and last few methods
2 parents 1fcf112 + daf6a08 commit fb1fef9

File tree

11 files changed

+385
-20
lines changed

11 files changed

+385
-20
lines changed

docs/release-notes/release-notes-0.20.0.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ circuit. The indices are only available for forwarding events saved after v0.20.
9090
* [7](https://github.com/lightningnetwork/lnd/pull/9937)
9191
* [8](https://github.com/lightningnetwork/lnd/pull/9938)
9292
* [9](https://github.com/lightningnetwork/lnd/pull/9939)
93+
* [10](https://github.com/lightningnetwork/lnd/pull/9971)
9394

9495
## RPC Updates
9596

graph/db/graph_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,7 @@ func TestAddEdgeProof(t *testing.T) {
11281128
t.Parallel()
11291129
ctx := context.Background()
11301130

1131-
graph := MakeTestGraph(t)
1131+
graph := MakeTestGraphNew(t)
11321132

11331133
// Add an edge with no proof.
11341134
node1 := createTestVertex(t)
@@ -4325,7 +4325,7 @@ func TestGraphLoading(t *testing.T) {
43254325
func TestClosedScid(t *testing.T) {
43264326
t.Parallel()
43274327

4328-
graph := MakeTestGraph(t)
4328+
graph := MakeTestGraphNew(t)
43294329

43304330
scid := lnwire.ShortChannelID{}
43314331

graph/db/kv_store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2150,8 +2150,8 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime,
21502150
}
21512151

21522152
if len(edgesInHorizon) > 0 {
2153-
log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
2154-
float64(hits)/float64(len(edgesInHorizon)), hits,
2153+
log.Debugf("ChanUpdatesInHorizon hit percentage: %.2f (%d/%d)",
2154+
float64(hits)*100/float64(len(edgesInHorizon)), hits,
21552155
len(edgesInHorizon))
21562156
} else {
21572157
log.Debugf("ChanUpdatesInHorizon returned no edges in "+

graph/db/sql_store.go

Lines changed: 257 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type SQLQueries interface {
9191
Channel queries.
9292
*/
9393
CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error)
94+
AddV1ChannelProof(ctx context.Context, arg sqlc.AddV1ChannelProofParams) (sql.Result, error)
9495
GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error)
9596
GetChannelByOutpoint(ctx context.Context, outpoint string) (sqlc.GetChannelByOutpointRow, error)
9697
GetChannelsBySCIDRange(ctx context.Context, arg sqlc.GetChannelsBySCIDRangeParams) ([]sqlc.GetChannelsBySCIDRangeRow, error)
@@ -136,6 +137,12 @@ type SQLQueries interface {
136137
GetPruneTip(ctx context.Context) (sqlc.PruneLog, error)
137138
UpsertPruneLogEntry(ctx context.Context, arg sqlc.UpsertPruneLogEntryParams) error
138139
DeletePruneLogEntriesInRange(ctx context.Context, arg sqlc.DeletePruneLogEntriesInRangeParams) error
140+
141+
/*
142+
Closed SCID table queries.
143+
*/
144+
InsertClosedChannel(ctx context.Context, scid []byte) error
145+
IsClosedChannel(ctx context.Context, scid []byte) (bool, error)
139146
}
140147

141148
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@@ -1096,8 +1103,8 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime,
10961103
}
10971104

10981105
if len(edges) > 0 {
1099-
log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
1100-
float64(hits)/float64(len(edges)), hits, len(edges))
1106+
log.Debugf("ChanUpdatesInHorizon hit percentage: %.2f (%d/%d)",
1107+
float64(hits)*100/float64(len(edges)), hits, len(edges))
11011108
} else {
11021109
log.Debugf("ChanUpdatesInHorizon returned no edges in "+
11031110
"horizon (%s, %s)", startTime, endTime)
@@ -1231,6 +1238,103 @@ func (s *SQLStore) ForEachNodeCached(cb func(node route.Vertex,
12311238
}, sqldb.NoOpReset)
12321239
}
12331240

1241+
// ForEachChannelCacheable iterates through all the channel edges stored
1242+
// within the graph and invokes the passed callback for each edge. The
1243+
// callback takes two edges as since this is a directed graph, both the
1244+
// in/out edges are visited. If the callback returns an error, then the
1245+
// transaction is aborted and the iteration stops early.
1246+
//
1247+
// NOTE: If an edge can't be found, or wasn't advertised, then a nil
1248+
// pointer for that particular channel edge routing policy will be
1249+
// passed into the callback.
1250+
//
1251+
// NOTE: this method is like ForEachChannel but fetches only the data
1252+
// required for the graph cache.
1253+
func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
1254+
*models.CachedEdgePolicy,
1255+
*models.CachedEdgePolicy) error) error {
1256+
1257+
ctx := context.TODO()
1258+
1259+
handleChannel := func(db SQLQueries,
1260+
row sqlc.ListChannelsWithPoliciesPaginatedRow) error {
1261+
1262+
node1, node2, err := buildNodeVertices(
1263+
row.Node1Pubkey, row.Node2Pubkey,
1264+
)
1265+
if err != nil {
1266+
return err
1267+
}
1268+
1269+
edge := buildCacheableChannelInfo(row.Channel, node1, node2)
1270+
1271+
dbPol1, dbPol2, err := extractChannelPolicies(row)
1272+
if err != nil {
1273+
return err
1274+
}
1275+
1276+
var pol1, pol2 *models.CachedEdgePolicy
1277+
if dbPol1 != nil {
1278+
policy1, err := buildChanPolicy(
1279+
*dbPol1, edge.ChannelID, nil, node2, true,
1280+
)
1281+
if err != nil {
1282+
return err
1283+
}
1284+
1285+
pol1 = models.NewCachedPolicy(policy1)
1286+
}
1287+
if dbPol2 != nil {
1288+
policy2, err := buildChanPolicy(
1289+
*dbPol2, edge.ChannelID, nil, node1, false,
1290+
)
1291+
if err != nil {
1292+
return err
1293+
}
1294+
1295+
pol2 = models.NewCachedPolicy(policy2)
1296+
}
1297+
1298+
if err := cb(edge, pol1, pol2); err != nil {
1299+
return err
1300+
}
1301+
1302+
return nil
1303+
}
1304+
1305+
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
1306+
lastID := int64(-1)
1307+
for {
1308+
//nolint:ll
1309+
rows, err := db.ListChannelsWithPoliciesPaginated(
1310+
ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{
1311+
Version: int16(ProtocolV1),
1312+
ID: lastID,
1313+
Limit: pageSize,
1314+
},
1315+
)
1316+
if err != nil {
1317+
return err
1318+
}
1319+
1320+
if len(rows) == 0 {
1321+
break
1322+
}
1323+
1324+
for _, row := range rows {
1325+
err := handleChannel(db, row)
1326+
if err != nil {
1327+
return err
1328+
}
1329+
1330+
lastID = row.Channel.ID
1331+
}
1332+
}
1333+
1334+
return nil
1335+
}, sqldb.NoOpReset)
1336+
}
1337+
12341338
// ForEachChannel iterates through all the channel edges stored within the
12351339
// graph and invokes the passed callback for each edge. The callback takes two
12361340
// edges as since this is a directed graph, both the in/out edges are visited.
@@ -1291,7 +1395,7 @@ func (s *SQLStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
12911395
}
12921396

12931397
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
1294-
var lastID int64
1398+
lastID := int64(-1)
12951399
for {
12961400
//nolint:ll
12971401
rows, err := db.ListChannelsWithPoliciesPaginated(
@@ -2575,6 +2679,155 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) (
25752679
return removedChans, nil
25762680
}
25772681

2682+
// AddEdgeProof sets the proof of an existing edge in the graph database.
2683+
//
2684+
// NOTE: part of the V1Store interface.
2685+
func (s *SQLStore) AddEdgeProof(scid lnwire.ShortChannelID,
2686+
proof *models.ChannelAuthProof) error {
2687+
2688+
var (
2689+
ctx = context.TODO()
2690+
scidBytes = channelIDToBytes(scid.ToUint64())
2691+
)
2692+
2693+
err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
2694+
res, err := db.AddV1ChannelProof(
2695+
ctx, sqlc.AddV1ChannelProofParams{
2696+
Scid: scidBytes[:],
2697+
Node1Signature: proof.NodeSig1Bytes,
2698+
Node2Signature: proof.NodeSig2Bytes,
2699+
Bitcoin1Signature: proof.BitcoinSig1Bytes,
2700+
Bitcoin2Signature: proof.BitcoinSig2Bytes,
2701+
},
2702+
)
2703+
if err != nil {
2704+
return fmt.Errorf("unable to add edge proof: %w", err)
2705+
}
2706+
2707+
n, err := res.RowsAffected()
2708+
if err != nil {
2709+
return err
2710+
}
2711+
2712+
if n == 0 {
2713+
return fmt.Errorf("no rows affected when adding edge "+
2714+
"proof for SCID %v", scid)
2715+
} else if n > 1 {
2716+
return fmt.Errorf("multiple rows affected when adding "+
2717+
"edge proof for SCID %v: %d rows affected",
2718+
scid, n)
2719+
}
2720+
2721+
return nil
2722+
}, sqldb.NoOpReset)
2723+
if err != nil {
2724+
return fmt.Errorf("unable to add edge proof: %w", err)
2725+
}
2726+
2727+
return nil
2728+
}
2729+
2730+
// PutClosedScid stores a SCID for a closed channel in the database. This is so
2731+
// that we can ignore channel announcements that we know to be closed without
2732+
// having to validate them and fetch a block.
2733+
//
2734+
// NOTE: part of the V1Store interface.
2735+
func (s *SQLStore) PutClosedScid(scid lnwire.ShortChannelID) error {
2736+
var (
2737+
ctx = context.TODO()
2738+
chanIDB = channelIDToBytes(scid.ToUint64())
2739+
)
2740+
2741+
return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
2742+
return db.InsertClosedChannel(ctx, chanIDB[:])
2743+
}, sqldb.NoOpReset)
2744+
}
2745+
2746+
// IsClosedScid checks whether a channel identified by the passed in scid is
2747+
// closed. This helps avoid having to perform expensive validation checks.
2748+
//
2749+
// NOTE: part of the V1Store interface.
2750+
func (s *SQLStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
2751+
var (
2752+
ctx = context.TODO()
2753+
isClosed bool
2754+
chanIDB = channelIDToBytes(scid.ToUint64())
2755+
)
2756+
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
2757+
var err error
2758+
isClosed, err = db.IsClosedChannel(ctx, chanIDB[:])
2759+
if err != nil {
2760+
return fmt.Errorf("unable to fetch closed channel: %w",
2761+
err)
2762+
}
2763+
2764+
return nil
2765+
}, sqldb.NoOpReset)
2766+
if err != nil {
2767+
return false, fmt.Errorf("unable to fetch closed channel: %w",
2768+
err)
2769+
}
2770+
2771+
return isClosed, nil
2772+
}
2773+
2774+
// GraphSession will provide the call-back with access to a NodeTraverser
2775+
// instance which can be used to perform queries against the channel graph.
2776+
//
2777+
// NOTE: part of the V1Store interface.
2778+
func (s *SQLStore) GraphSession(cb func(graph NodeTraverser) error) error {
2779+
var ctx = context.TODO()
2780+
2781+
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
2782+
return cb(newSQLNodeTraverser(db, s.cfg.ChainHash))
2783+
}, sqldb.NoOpReset)
2784+
}
2785+
2786+
// sqlNodeTraverser implements the NodeTraverser interface but with a backing
2787+
// read only transaction for a consistent view of the graph.
2788+
type sqlNodeTraverser struct {
2789+
db SQLQueries
2790+
chain chainhash.Hash
2791+
}
2792+
2793+
// A compile-time assertion to ensure that sqlNodeTraverser implements the
2794+
// NodeTraverser interface.
2795+
var _ NodeTraverser = (*sqlNodeTraverser)(nil)
2796+
2797+
// newSQLNodeTraverser creates a new instance of the sqlNodeTraverser.
2798+
func newSQLNodeTraverser(db SQLQueries,
2799+
chain chainhash.Hash) *sqlNodeTraverser {
2800+
2801+
return &sqlNodeTraverser{
2802+
db: db,
2803+
chain: chain,
2804+
}
2805+
}
2806+
2807+
// ForEachNodeDirectedChannel calls the callback for every channel of the given
2808+
// node.
2809+
//
2810+
// NOTE: Part of the NodeTraverser interface.
2811+
func (s *sqlNodeTraverser) ForEachNodeDirectedChannel(nodePub route.Vertex,
2812+
cb func(channel *DirectedChannel) error) error {
2813+
2814+
ctx := context.TODO()
2815+
2816+
return forEachNodeDirectedChannel(ctx, s.db, nodePub, cb)
2817+
}
2818+
2819+
// FetchNodeFeatures returns the features of the given node. If the node is
2820+
// unknown, assume no additional features are supported.
2821+
//
2822+
// NOTE: Part of the NodeTraverser interface.
2823+
func (s *sqlNodeTraverser) FetchNodeFeatures(nodePub route.Vertex) (
2824+
*lnwire.FeatureVector, error) {
2825+
2826+
ctx := context.TODO()
2827+
2828+
return fetchNodeFeatures(ctx, s.db, nodePub)
2829+
}
2830+
25782831
// forEachNodeDirectedChannel iterates through all channels of a given
25792832
// node, executing the passed callback on the directed edge representing the
25802833
// channel and its incoming policy. If the node is not found, no error is
@@ -2704,7 +2957,7 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries,
27042957
func forEachNodeCacheable(ctx context.Context, db SQLQueries,
27052958
cb func(nodeID int64, nodePub route.Vertex) error) error {
27062959

2707-
var lastID int64
2960+
lastID := int64(-1)
27082961

27092962
for {
27102963
nodes, err := db.ListNodeIDsAndPubKeys(

0 commit comments

Comments
 (0)