Skip to content

Commit e9692f8

Browse files
authored
Merge pull request #9869 from ellemouton/graphSQL8-channels-schema
sqldb+graph/db: add channel tables and implement some channel CRUD
2 parents f93c675 + cf54245 commit e9692f8

File tree

9 files changed

+604
-20
lines changed

9 files changed

+604
-20
lines changed

graph/db/graph_test.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ func createLightningNode(priv *btcec.PrivateKey) *models.LightningNode {
8282
Alias: "kek" + hex.EncodeToString(pub),
8383
Features: testFeatures,
8484
Addresses: testAddrs,
85-
ExtraOpaqueData: make([]byte, 0),
8685
}
8786
copy(n.PubKeyBytes[:], priv.PubKey().SerializeCompressed())
8887

@@ -262,7 +261,7 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
262261
func TestPartialNode(t *testing.T) {
263262
t.Parallel()
264263

265-
graph := MakeTestGraph(t)
264+
graph := MakeTestGraphNew(t)
266265

267266
// To insert a partial node, we need to add a channel edge that has
268267
// node keys for nodes we are not yet aware
@@ -1862,7 +1861,7 @@ func TestGraphPruning(t *testing.T) {
18621861
func TestHighestChanID(t *testing.T) {
18631862
t.Parallel()
18641863

1865-
graph := MakeTestGraph(t)
1864+
graph := MakeTestGraphNew(t)
18661865

18671866
// If we don't yet have any channels in the database, then we should
18681867
// get a channel ID of zero if we ask for the highest channel ID.
@@ -3333,34 +3332,33 @@ func TestPruneGraphNodes(t *testing.T) {
33333332
func TestAddChannelEdgeShellNodes(t *testing.T) {
33343333
t.Parallel()
33353334

3336-
graph := MakeTestGraph(t)
3335+
graph := MakeTestGraphNew(t)
33373336

33383337
// To start, we'll create two nodes, and only add one of them to the
33393338
// channel graph.
33403339
node1 := createTestVertex(t)
3341-
if err := graph.AddLightningNode(node1); err != nil {
3342-
t.Fatalf("unable to add node: %v", err)
3343-
}
3340+
require.NoError(t, graph.SetSourceNode(node1))
33443341
node2 := createTestVertex(t)
33453342

33463343
// We'll now create an edge between the two nodes, as a result, node2
33473344
// should be inserted into the database as a shell node.
33483345
edgeInfo, _ := createEdge(100, 0, 0, 0, node1, node2)
3349-
if err := graph.AddChannelEdge(&edgeInfo); err != nil {
3350-
t.Fatalf("unable to add edge: %v", err)
3351-
}
3346+
require.NoError(t, graph.AddChannelEdge(&edgeInfo))
33523347

33533348
// Ensure that node1 was inserted as a full node, while node2 only has
33543349
// a shell node present.
33553350
node1, err := graph.FetchLightningNode(node1.PubKeyBytes)
33563351
require.NoError(t, err, "unable to fetch node1")
3357-
if !node1.HaveNodeAnnouncement {
3358-
t.Fatalf("have shell announcement for node1, shouldn't")
3359-
}
3352+
require.True(t, node1.HaveNodeAnnouncement)
33603353

33613354
node2, err = graph.FetchLightningNode(node2.PubKeyBytes)
33623355
require.NoError(t, err, "unable to fetch node2")
33633356
require.False(t, node2.HaveNodeAnnouncement)
3357+
3358+
// Show that attempting to add the channel again will result in an
3359+
// error.
3360+
err = graph.AddChannelEdge(&edgeInfo)
3361+
require.ErrorIs(t, err, ErrEdgeAlreadyExist)
33643362
}
33653363

33663364
// TestNodePruningUpdateIndexDeletion tests that once a node has been removed

graph/db/kv_store.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4122,7 +4122,7 @@ func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
41224122

41234123
// We'll try and see if there are any opaque bytes left, if not, then
41244124
// we'll ignore the EOF error and return the node as is.
4125-
node.ExtraOpaqueData, err = wire.ReadVarBytes(
4125+
extraBytes, err := wire.ReadVarBytes(
41264126
r, 0, MaxAllowedExtraOpaqueBytes, "blob",
41274127
)
41284128
switch {
@@ -4132,6 +4132,10 @@ func deserializeLightningNode(r io.Reader) (models.LightningNode, error) {
41324132
return models.LightningNode{}, err
41334133
}
41344134

4135+
if len(extraBytes) > 0 {
4136+
node.ExtraOpaqueData = extraBytes
4137+
}
4138+
41354139
return node, nil
41364140
}
41374141

graph/db/sql_store.go

Lines changed: 238 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ type SQLQueries interface {
6969
*/
7070
AddSourceNode(ctx context.Context, nodeID int64) error
7171
GetSourceNodesByVersion(ctx context.Context, version int16) ([]sqlc.GetSourceNodesByVersionRow, error)
72+
73+
/*
74+
Channel queries.
75+
*/
76+
CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error)
77+
GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error)
78+
HighestSCID(ctx context.Context, version int16) ([]byte, error)
79+
80+
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
81+
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
7282
}
7383

7484
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@@ -455,6 +465,83 @@ func (s *SQLStore) NodeUpdatesInHorizon(startTime,
455465
return nodes, nil
456466
}
457467

468+
// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
469+
// undirected edge from the two target nodes are created. The information stored
470+
// denotes the static attributes of the channel, such as the channelID, the keys
471+
// involved in creation of the channel, and the set of features that the channel
472+
// supports. The chanPoint and chanID are used to uniquely identify the edge
473+
// globally within the database.
474+
//
475+
// NOTE: part of the V1Store interface.
476+
func (s *SQLStore) AddChannelEdge(edge *models.ChannelEdgeInfo,
477+
opts ...batch.SchedulerOption) error {
478+
479+
ctx := context.TODO()
480+
481+
var alreadyExists bool
482+
r := &batch.Request[SQLQueries]{
483+
Opts: batch.NewSchedulerOptions(opts...),
484+
Reset: func() {
485+
alreadyExists = false
486+
},
487+
Do: func(tx SQLQueries) error {
488+
err := insertChannel(ctx, tx, edge)
489+
490+
// Silence ErrEdgeAlreadyExist so that the batch can
491+
// succeed, but propagate the error via local state.
492+
if errors.Is(err, ErrEdgeAlreadyExist) {
493+
alreadyExists = true
494+
return nil
495+
}
496+
497+
return err
498+
},
499+
OnCommit: func(err error) error {
500+
switch {
501+
case err != nil:
502+
return err
503+
case alreadyExists:
504+
return ErrEdgeAlreadyExist
505+
default:
506+
s.rejectCache.remove(edge.ChannelID)
507+
s.chanCache.remove(edge.ChannelID)
508+
return nil
509+
}
510+
},
511+
}
512+
513+
return s.chanScheduler.Execute(ctx, r)
514+
}
515+
516+
// HighestChanID returns the "highest" known channel ID in the channel graph.
517+
// This represents the "newest" channel from the PoV of the chain. This method
518+
// can be used by peers to quickly determine if their graphs are in sync.
519+
//
520+
// NOTE: This is part of the V1Store interface.
521+
func (s *SQLStore) HighestChanID() (uint64, error) {
522+
ctx := context.TODO()
523+
524+
var highestChanID uint64
525+
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
526+
chanID, err := db.HighestSCID(ctx, int16(ProtocolV1))
527+
if errors.Is(err, sql.ErrNoRows) {
528+
return nil
529+
} else if err != nil {
530+
return fmt.Errorf("unable to fetch highest chan ID: %w",
531+
err)
532+
}
533+
534+
highestChanID = byteOrder.Uint64(chanID)
535+
536+
return nil
537+
}, sqldb.NoOpReset)
538+
if err != nil {
539+
return 0, fmt.Errorf("unable to fetch highest chan ID: %w", err)
540+
}
541+
542+
return highestChanID, nil
543+
}
544+
458545
// getNodeByPubKey attempts to look up a target node by its public key.
459546
func getNodeByPubKey(ctx context.Context, db SQLQueries,
460547
pubKey route.Vertex) (int64, *models.LightningNode, error) {
@@ -494,10 +581,9 @@ func buildNode(ctx context.Context, db SQLQueries, dbNode *sqlc.Node) (
494581
copy(pub[:], dbNode.PubKey)
495582

496583
node := &models.LightningNode{
497-
PubKeyBytes: pub,
498-
Features: lnwire.EmptyFeatureVector(),
499-
LastUpdate: time.Unix(0, 0),
500-
ExtraOpaqueData: make([]byte, 0),
584+
PubKeyBytes: pub,
585+
Features: lnwire.EmptyFeatureVector(),
586+
LastUpdate: time.Unix(0, 0),
501587
}
502588

503589
if len(dbNode.Signature) == 0 {
@@ -1023,3 +1109,151 @@ func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
10231109

10241110
return records, nil
10251111
}
1112+
1113+
// insertChannel inserts a new channel record into the database.
1114+
func insertChannel(ctx context.Context, db SQLQueries,
1115+
edge *models.ChannelEdgeInfo) error {
1116+
1117+
var chanIDB [8]byte
1118+
byteOrder.PutUint64(chanIDB[:], edge.ChannelID)
1119+
1120+
// Make sure that the channel doesn't already exist. We do this
1121+
// explicitly instead of relying on catching a unique constraint error
1122+
// because relying on SQL to throw that error would abort the entire
1123+
// batch of transactions.
1124+
_, err := db.GetChannelBySCID(
1125+
ctx, sqlc.GetChannelBySCIDParams{
1126+
Scid: chanIDB[:],
1127+
Version: int16(ProtocolV1),
1128+
},
1129+
)
1130+
if err == nil {
1131+
return ErrEdgeAlreadyExist
1132+
} else if !errors.Is(err, sql.ErrNoRows) {
1133+
return fmt.Errorf("unable to fetch channel: %w", err)
1134+
}
1135+
1136+
// Make sure that at least a "shell" entry for each node is present in
1137+
// the nodes table.
1138+
node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
1139+
if err != nil {
1140+
return fmt.Errorf("unable to create shell node: %w", err)
1141+
}
1142+
1143+
node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
1144+
if err != nil {
1145+
return fmt.Errorf("unable to create shell node: %w", err)
1146+
}
1147+
1148+
var capacity sql.NullInt64
1149+
if edge.Capacity != 0 {
1150+
capacity = sqldb.SQLInt64(int64(edge.Capacity))
1151+
}
1152+
1153+
createParams := sqlc.CreateChannelParams{
1154+
Version: int16(ProtocolV1),
1155+
Scid: chanIDB[:],
1156+
NodeID1: node1DBID,
1157+
NodeID2: node2DBID,
1158+
Outpoint: edge.ChannelPoint.String(),
1159+
Capacity: capacity,
1160+
BitcoinKey1: edge.BitcoinKey1Bytes[:],
1161+
BitcoinKey2: edge.BitcoinKey2Bytes[:],
1162+
}
1163+
1164+
if edge.AuthProof != nil {
1165+
proof := edge.AuthProof
1166+
1167+
createParams.Node1Signature = proof.NodeSig1Bytes
1168+
createParams.Node2Signature = proof.NodeSig2Bytes
1169+
createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
1170+
createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
1171+
}
1172+
1173+
// Insert the new channel record.
1174+
dbChanID, err := db.CreateChannel(ctx, createParams)
1175+
if err != nil {
1176+
return err
1177+
}
1178+
1179+
// Insert any channel features.
1180+
if len(edge.Features) != 0 {
1181+
chanFeatures := lnwire.NewRawFeatureVector()
1182+
err := chanFeatures.Decode(bytes.NewReader(edge.Features))
1183+
if err != nil {
1184+
return err
1185+
}
1186+
1187+
fv := lnwire.NewFeatureVector(chanFeatures, lnwire.Features)
1188+
for feature := range fv.Features() {
1189+
err = db.InsertChannelFeature(
1190+
ctx, sqlc.InsertChannelFeatureParams{
1191+
ChannelID: dbChanID,
1192+
FeatureBit: int32(feature),
1193+
},
1194+
)
1195+
if err != nil {
1196+
return fmt.Errorf("unable to insert "+
1197+
"channel(%d) feature(%v): %w", dbChanID,
1198+
feature, err)
1199+
}
1200+
}
1201+
}
1202+
1203+
// Finally, insert any extra TLV fields in the channel announcement.
1204+
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
1205+
if err != nil {
1206+
return fmt.Errorf("unable to marshal extra opaque data: %w",
1207+
err)
1208+
}
1209+
1210+
for tlvType, value := range extra {
1211+
err := db.CreateChannelExtraType(
1212+
ctx, sqlc.CreateChannelExtraTypeParams{
1213+
ChannelID: dbChanID,
1214+
Type: int64(tlvType),
1215+
Value: value,
1216+
},
1217+
)
1218+
if err != nil {
1219+
return fmt.Errorf("unable to upsert channel(%d) extra "+
1220+
"signed field(%v): %w", edge.ChannelID,
1221+
tlvType, err)
1222+
}
1223+
}
1224+
1225+
return nil
1226+
}
1227+
1228+
// maybeCreateShellNode checks if a shell node entry exists for the
1229+
// given public key. If it does not exist, then a new shell node entry is
1230+
// created. The ID of the node is returned. A shell node only has a protocol
1231+
// version and public key persisted.
1232+
func maybeCreateShellNode(ctx context.Context, db SQLQueries,
1233+
pubKey route.Vertex) (int64, error) {
1234+
1235+
dbNode, err := db.GetNodeByPubKey(
1236+
ctx, sqlc.GetNodeByPubKeyParams{
1237+
PubKey: pubKey[:],
1238+
Version: int16(ProtocolV1),
1239+
},
1240+
)
1241+
// The node exists. Return the ID.
1242+
if err == nil {
1243+
return dbNode.ID, nil
1244+
} else if !errors.Is(err, sql.ErrNoRows) {
1245+
return 0, err
1246+
}
1247+
1248+
// Otherwise, the node does not exist, so we create a shell entry for
1249+
// it.
1250+
id, err := db.UpsertNode(ctx, sqlc.UpsertNodeParams{
1251+
Version: int16(ProtocolV1),
1252+
PubKey: pubKey[:],
1253+
})
1254+
if err != nil {
1255+
return 0, fmt.Errorf("unable to create shell node: %w", err)
1256+
}
1257+
1258+
return id, nil
1259+
}

0 commit comments

Comments
 (0)