Skip to content

Commit c1740c1

Browse files
authored
Merge pull request #9887 from ellemouton/graphSQL9-chan-policies-schema
graph/db+sqldb: channel policy SQL schemas, queries and upsert CRUD
2 parents a5c4a7c + c327988 commit c1740c1

File tree

13 files changed

+594
-11
lines changed

13 files changed

+594
-11
lines changed

docs/release-notes/release-notes-0.20.0.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ circuit. The indices are only available for forwarding events saved after v0.20.
7474
cache population.
7575
* Add graph schemas, queries and CRUD:
7676
* [1](https://github.com/lightningnetwork/lnd/pull/9866)
77+
* [2](https://github.com/lightningnetwork/lnd/pull/9869)
78+
* [3](https://github.com/lightningnetwork/lnd/pull/9887)
7779

7880
## RPC Updates
7981

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ require (
199199
sigs.k8s.io/yaml v1.2.0 // indirect
200200
)
201201

202+
// TODO(elle): remove once all the schemas and queries for the graph
203+
// store have been included in a tagged sqldb version.
204+
replace github.com/lightningnetwork/lnd/sqldb => ./sqldb
205+
202206
// This replace is for https://github.com/advisories/GHSA-25xm-hr59-7c27
203207
replace github.com/ulikunitz/xz => github.com/ulikunitz/xz v0.5.11
204208

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,6 @@ github.com/lightningnetwork/lnd/kvdb v1.4.16 h1:9BZgWdDfjmHRHLS97cz39bVuBAqMc4/p
375375
github.com/lightningnetwork/lnd/kvdb v1.4.16/go.mod h1:HW+bvwkxNaopkz3oIgBV6NEnV4jCEZCACFUcNg4xSjM=
376376
github.com/lightningnetwork/lnd/queue v1.1.1 h1:99ovBlpM9B0FRCGYJo6RSFDlt8/vOkQQZznVb18iNMI=
377377
github.com/lightningnetwork/lnd/queue v1.1.1/go.mod h1:7A6nC1Qrm32FHuhx/mi1cieAiBZo5O6l8IBIoQxvkz4=
378-
github.com/lightningnetwork/lnd/sqldb v1.0.10 h1:ZLV7TGwjnKupVfCd+DJ43MAc9BKVSFCnvhpSPGKdN3M=
379-
github.com/lightningnetwork/lnd/sqldb v1.0.10/go.mod h1:c/vWoQfcxu6FAfHzGajkIQi7CEIeIZFhhH4DYh1BJpc=
380378
github.com/lightningnetwork/lnd/ticker v1.1.1 h1:J/b6N2hibFtC7JLV77ULQp++QLtCwT6ijJlbdiZFbSM=
381379
github.com/lightningnetwork/lnd/ticker v1.1.1/go.mod h1:waPTRAAcwtu7Ji3+3k+u/xH5GHovTsCoSVpho0KDvdA=
382380
github.com/lightningnetwork/lnd/tlv v1.3.2 h1:MO4FCk7F4k5xPMqVZF6Nb/kOpxlwPrUQpYjmyKny5s0=

graph/db/graph_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4038,7 +4038,7 @@ func TestBatchedAddChannelEdge(t *testing.T) {
40384038
func TestBatchedUpdateEdgePolicy(t *testing.T) {
40394039
t.Parallel()
40404040

4041-
graph := MakeTestGraph(t)
4041+
graph := MakeTestGraphNew(t)
40424042

40434043
// We'd like to test the update of edges inserted into the database, so
40444044
// we create two vertexes to connect.

graph/db/sql_store.go

Lines changed: 241 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"github.com/btcsuite/btcd/btcec/v2"
17+
"github.com/btcsuite/btcd/chaincfg/chainhash"
1718
"github.com/lightningnetwork/lnd/batch"
1819
"github.com/lightningnetwork/lnd/graph/db/models"
1920
"github.com/lightningnetwork/lnd/lnwire"
@@ -75,10 +76,19 @@ type SQLQueries interface {
7576
*/
7677
CreateChannel(ctx context.Context, arg sqlc.CreateChannelParams) (int64, error)
7778
GetChannelBySCID(ctx context.Context, arg sqlc.GetChannelBySCIDParams) (sqlc.Channel, error)
79+
GetChannelAndNodesBySCID(ctx context.Context, arg sqlc.GetChannelAndNodesBySCIDParams) (sqlc.GetChannelAndNodesBySCIDRow, error)
7880
HighestSCID(ctx context.Context, version int16) ([]byte, error)
7981

8082
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
8183
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
84+
85+
/*
86+
Channel Policy table queries.
87+
*/
88+
UpsertEdgePolicy(ctx context.Context, arg sqlc.UpsertEdgePolicyParams) (int64, error)
89+
90+
InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error
91+
DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error
8292
}
8393

8494
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@@ -96,7 +106,8 @@ type BatchedSQLQueries interface {
96106
// implemented, things will fall back to the KVStore. This is ONLY the case
97107
// for the time being while this struct is purely used in unit tests only.
98108
type SQLStore struct {
99-
db BatchedSQLQueries
109+
cfg *SQLStoreConfig
110+
db BatchedSQLQueries
100111

101112
// cacheMu guards all caches (rejectCache and chanCache). If
102113
// this mutex will be acquired at the same time as the DB mutex then
@@ -117,9 +128,16 @@ type SQLStore struct {
117128
// interface.
118129
var _ V1Store = (*SQLStore)(nil)
119130

131+
// SQLStoreConfig holds the configuration for the SQLStore.
132+
type SQLStoreConfig struct {
133+
// ChainHash is the genesis hash for the chain that all the gossip
134+
// messages in this store are aimed at.
135+
ChainHash chainhash.Hash
136+
}
137+
120138
// NewSQLStore creates a new SQLStore instance given an open BatchedSQLQueries
121139
// storage backend.
122-
func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
140+
func NewSQLStore(cfg *SQLStoreConfig, db BatchedSQLQueries, kvStore *KVStore,
123141
options ...StoreOptionModifier) (*SQLStore, error) {
124142

125143
opts := DefaultOptions()
@@ -133,6 +151,7 @@ func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
133151
}
134152

135153
s := &SQLStore{
154+
cfg: cfg,
136155
db: db,
137156
KVStore: kvStore,
138157
rejectCache: newRejectCache(opts.RejectCacheSize),
@@ -542,6 +561,193 @@ func (s *SQLStore) HighestChanID() (uint64, error) {
542561
return highestChanID, nil
543562
}
544563

564+
// UpdateEdgePolicy updates the edge routing policy for a single directed edge
565+
// within the database for the referenced channel. The `flags` attribute within
566+
// the ChannelEdgePolicy determines which of the directed edges are being
567+
// updated. If the flag is 1, then the first node's information is being
568+
// updated, otherwise it's the second node's information. The node ordering is
569+
// determined by the lexicographical ordering of the identity public keys of the
570+
// nodes on either side of the channel.
571+
//
572+
// NOTE: part of the V1Store interface.
573+
func (s *SQLStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
574+
opts ...batch.SchedulerOption) (route.Vertex, route.Vertex, error) {
575+
576+
ctx := context.TODO()
577+
578+
var (
579+
isUpdate1 bool
580+
edgeNotFound bool
581+
from, to route.Vertex
582+
)
583+
584+
r := &batch.Request[SQLQueries]{
585+
Opts: batch.NewSchedulerOptions(opts...),
586+
Reset: func() {
587+
isUpdate1 = false
588+
edgeNotFound = false
589+
},
590+
Do: func(tx SQLQueries) error {
591+
var err error
592+
from, to, isUpdate1, err = updateChanEdgePolicy(
593+
ctx, tx, edge,
594+
)
595+
if err != nil {
596+
log.Errorf("UpdateEdgePolicy faild: %v", err)
597+
}
598+
599+
// Silence ErrEdgeNotFound so that the batch can
600+
// succeed, but propagate the error via local state.
601+
if errors.Is(err, ErrEdgeNotFound) {
602+
edgeNotFound = true
603+
return nil
604+
}
605+
606+
return err
607+
},
608+
OnCommit: func(err error) error {
609+
switch {
610+
case err != nil:
611+
return err
612+
case edgeNotFound:
613+
return ErrEdgeNotFound
614+
default:
615+
s.updateEdgeCache(edge, isUpdate1)
616+
return nil
617+
}
618+
},
619+
}
620+
621+
err := s.chanScheduler.Execute(ctx, r)
622+
623+
return from, to, err
624+
}
625+
626+
// updateEdgeCache updates our reject and channel caches with the new
627+
// edge policy information.
628+
func (s *SQLStore) updateEdgeCache(e *models.ChannelEdgePolicy,
629+
isUpdate1 bool) {
630+
631+
// If an entry for this channel is found in reject cache, we'll modify
632+
// the entry with the updated timestamp for the direction that was just
633+
// written. If the edge doesn't exist, we'll load the cache entry lazily
634+
// during the next query for this edge.
635+
if entry, ok := s.rejectCache.get(e.ChannelID); ok {
636+
if isUpdate1 {
637+
entry.upd1Time = e.LastUpdate.Unix()
638+
} else {
639+
entry.upd2Time = e.LastUpdate.Unix()
640+
}
641+
s.rejectCache.insert(e.ChannelID, entry)
642+
}
643+
644+
// If an entry for this channel is found in channel cache, we'll modify
645+
// the entry with the updated policy for the direction that was just
646+
// written. If the edge doesn't exist, we'll defer loading the info and
647+
// policies and lazily read from disk during the next query.
648+
if channel, ok := s.chanCache.get(e.ChannelID); ok {
649+
if isUpdate1 {
650+
channel.Policy1 = e
651+
} else {
652+
channel.Policy2 = e
653+
}
654+
s.chanCache.insert(e.ChannelID, channel)
655+
}
656+
}
657+
658+
// updateChanEdgePolicy upserts the channel policy info we have stored for
659+
// a channel we already know of.
660+
func updateChanEdgePolicy(ctx context.Context, tx SQLQueries,
661+
edge *models.ChannelEdgePolicy) (route.Vertex, route.Vertex, bool,
662+
error) {
663+
664+
var (
665+
node1Pub, node2Pub route.Vertex
666+
isNode1 bool
667+
chanIDB [8]byte
668+
)
669+
byteOrder.PutUint64(chanIDB[:], edge.ChannelID)
670+
671+
// Check that this edge policy refers to a channel that we already
672+
// know of. We do this explicitly so that we can return the appropriate
673+
// ErrEdgeNotFound error if the channel doesn't exist, rather than
674+
// abort the transaction which would abort the entire batch.
675+
dbChan, err := tx.GetChannelAndNodesBySCID(
676+
ctx, sqlc.GetChannelAndNodesBySCIDParams{
677+
Scid: chanIDB[:],
678+
Version: int16(ProtocolV1),
679+
},
680+
)
681+
if errors.Is(err, sql.ErrNoRows) {
682+
return node1Pub, node2Pub, false, ErrEdgeNotFound
683+
} else if err != nil {
684+
return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
685+
"fetch channel(%v): %w", edge.ChannelID, err)
686+
}
687+
688+
copy(node1Pub[:], dbChan.Node1PubKey)
689+
copy(node2Pub[:], dbChan.Node2PubKey)
690+
691+
// Figure out which node this edge is from.
692+
isNode1 = edge.ChannelFlags&lnwire.ChanUpdateDirection == 0
693+
nodeID := dbChan.NodeID1
694+
if !isNode1 {
695+
nodeID = dbChan.NodeID2
696+
}
697+
698+
var (
699+
inboundBase sql.NullInt64
700+
inboundRate sql.NullInt64
701+
)
702+
edge.InboundFee.WhenSome(func(fee lnwire.Fee) {
703+
inboundRate = sqldb.SQLInt64(fee.FeeRate)
704+
inboundBase = sqldb.SQLInt64(fee.BaseFee)
705+
})
706+
707+
id, err := tx.UpsertEdgePolicy(ctx, sqlc.UpsertEdgePolicyParams{
708+
Version: int16(ProtocolV1),
709+
ChannelID: dbChan.ID,
710+
NodeID: nodeID,
711+
Timelock: int32(edge.TimeLockDelta),
712+
FeePpm: int64(edge.FeeProportionalMillionths),
713+
BaseFeeMsat: int64(edge.FeeBaseMSat),
714+
MinHtlcMsat: int64(edge.MinHTLC),
715+
LastUpdate: sqldb.SQLInt64(edge.LastUpdate.Unix()),
716+
Disabled: sql.NullBool{
717+
Valid: true,
718+
Bool: edge.IsDisabled(),
719+
},
720+
MaxHtlcMsat: sql.NullInt64{
721+
Valid: edge.MessageFlags.HasMaxHtlc(),
722+
Int64: int64(edge.MaxHTLC),
723+
},
724+
InboundBaseFeeMsat: inboundBase,
725+
InboundFeeRateMilliMsat: inboundRate,
726+
Signature: edge.SigBytes,
727+
})
728+
if err != nil {
729+
return node1Pub, node2Pub, isNode1,
730+
fmt.Errorf("unable to upsert edge policy: %w", err)
731+
}
732+
733+
// Convert the flat extra opaque data into a map of TLV types to
734+
// values.
735+
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
736+
if err != nil {
737+
return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
738+
"marshal extra opaque data: %w", err)
739+
}
740+
741+
// Update the channel policy's extra signed fields.
742+
err = upsertChanPolicyExtraSignedFields(ctx, tx, id, extra)
743+
if err != nil {
744+
return node1Pub, node2Pub, false, fmt.Errorf("inserting chan "+
745+
"policy extra TLVs: %w", err)
746+
}
747+
748+
return node1Pub, node2Pub, isNode1, nil
749+
}
750+
545751
// getNodeByPubKey attempts to look up a target node by its public key.
546752
func getNodeByPubKey(ctx context.Context, db SQLQueries,
547753
pubKey route.Vertex) (int64, *models.LightningNode, error) {
@@ -1257,3 +1463,36 @@ func maybeCreateShellNode(ctx context.Context, db SQLQueries,
12571463

12581464
return id, nil
12591465
}
1466+
1467+
// upsertChanPolicyExtraSignedFields updates the policy's extra signed fields in
1468+
// the database. This includes deleting any existing types and then inserting
1469+
// the new types.
1470+
func upsertChanPolicyExtraSignedFields(ctx context.Context, db SQLQueries,
1471+
chanPolicyID int64, extraFields map[uint64][]byte) error {
1472+
1473+
// Delete all existing extra signed fields for the channel policy.
1474+
err := db.DeleteChannelPolicyExtraTypes(ctx, chanPolicyID)
1475+
if err != nil {
1476+
return fmt.Errorf("unable to delete "+
1477+
"existing policy extra signed fields for policy %d: %w",
1478+
chanPolicyID, err)
1479+
}
1480+
1481+
// Insert all new extra signed fields for the channel policy.
1482+
for tlvType, value := range extraFields {
1483+
err = db.InsertChanPolicyExtraType(
1484+
ctx, sqlc.InsertChanPolicyExtraTypeParams{
1485+
ChannelPolicyID: chanPolicyID,
1486+
Type: int64(tlvType),
1487+
Value: value,
1488+
},
1489+
)
1490+
if err != nil {
1491+
return fmt.Errorf("unable to insert "+
1492+
"channel_policy(%d) extra signed field(%v): %w",
1493+
chanPolicyID, tlvType, err)
1494+
}
1495+
}
1496+
1497+
return nil
1498+
}

graph/db/test_postgres.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"database/sql"
77
"testing"
88

9+
"github.com/btcsuite/btcd/chaincfg"
910
"github.com/lightningnetwork/lnd/kvdb"
1011
"github.com/lightningnetwork/lnd/sqldb"
1112
"github.com/stretchr/testify/require"
@@ -38,7 +39,11 @@ func NewTestDB(t testing.TB) V1Store {
3839
},
3940
)
4041

41-
store, err := NewSQLStore(executor, graphStore)
42+
store, err := NewSQLStore(
43+
&SQLStoreConfig{
44+
ChainHash: *chaincfg.MainNetParams.GenesisHash,
45+
}, executor, graphStore,
46+
)
4247
require.NoError(t, err)
4348

4449
return store

graph/db/test_sqlite.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"database/sql"
77
"testing"
88

9+
"github.com/btcsuite/btcd/chaincfg"
910
"github.com/lightningnetwork/lnd/kvdb"
1011
"github.com/lightningnetwork/lnd/sqldb"
1112
"github.com/stretchr/testify/require"
@@ -31,7 +32,11 @@ func NewTestDB(t testing.TB) V1Store {
3132
},
3233
)
3334

34-
store, err := NewSQLStore(executor, graphStore)
35+
store, err := NewSQLStore(
36+
&SQLStoreConfig{
37+
ChainHash: *chaincfg.MainNetParams.GenesisHash,
38+
}, executor, graphStore,
39+
)
3540
require.NoError(t, err)
3641

3742
return store

0 commit comments

Comments
 (0)