Skip to content

Commit f6baa2c

Browse files
authored
Merge pull request #1457 from lightninglabs/multi-rfq-receive
Multi rfq receive (`AddInvoice` multiple hop hints)
2 parents 1660f9c + 8abf803 commit f6baa2c

File tree

10 files changed

+841
-335
lines changed

10 files changed

+841
-335
lines changed

rfq/manager.go

Lines changed: 272 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"sort"
10+
"strings"
911
"sync"
1012
"time"
1113

@@ -17,10 +19,12 @@ import (
1719
"github.com/lightninglabs/taproot-assets/fn"
1820
"github.com/lightninglabs/taproot-assets/rfqmath"
1921
"github.com/lightninglabs/taproot-assets/rfqmsg"
20-
lfn "github.com/lightningnetwork/lnd/fn/v2"
22+
"github.com/lightninglabs/taproot-assets/taprpc/rfqrpc"
23+
"github.com/lightningnetwork/lnd/lnrpc"
2124
"github.com/lightningnetwork/lnd/lnutils"
2225
"github.com/lightningnetwork/lnd/lnwire"
2326
"github.com/lightningnetwork/lnd/routing/route"
27+
"github.com/lightningnetwork/lnd/zpay32"
2428
)
2529

2630
const (
@@ -41,7 +45,9 @@ const (
4145
type ChannelLister interface {
4246
// ListChannels returns a list of channels that are available for
4347
// routing.
44-
ListChannels(ctx context.Context) ([]lndclient.ChannelInfo, error)
48+
ListChannels(ctx context.Context, activeOnly, publicOnly bool,
49+
_ ...lndclient.ListChannelsOption) ([]lndclient.ChannelInfo,
50+
error)
4551
}
4652

4753
// ScidAliasManager is an interface that can add short channel ID (SCID) aliases
@@ -546,63 +552,38 @@ func (m *Manager) handleOutgoingMessage(outgoingMsg rfqmsg.OutgoingMsg) error {
546552
func (m *Manager) addScidAlias(scidAlias uint64, assetSpecifier asset.Specifier,
547553
peer route.Vertex) error {
548554

549-
// Retrieve all local channels.
550555
ctxb := context.Background()
551-
localChans, err := m.cfg.ChannelLister.ListChannels(ctxb)
552-
if err != nil {
553-
// Not being able to call lnd to add the alias is a critical
554-
// error, which warrants shutting down, as something is wrong.
555-
return fn.NewCriticalError(
556-
fmt.Errorf("add alias: error listing local channels: "+
557-
"%w", err),
558-
)
559-
}
560-
561-
// Filter for channels with the given peer.
562-
peerChannels := lfn.Filter(
563-
localChans, func(c lndclient.ChannelInfo) bool {
564-
return c.PubKeyBytes == peer
565-
},
556+
peerChans, err := m.FetchChannel(
557+
ctxb, assetSpecifier, &peer, NoIntention,
566558
)
559+
if err != nil && !strings.Contains(
560+
err.Error(), "no asset channel balance found for",
561+
) {
567562

568-
var baseSCID uint64
569-
for _, localChan := range peerChannels {
570-
if len(localChan.CustomChannelData) == 0 {
571-
continue
572-
}
573-
574-
var assetData rfqmsg.JsonAssetChannel
575-
err = json.Unmarshal(localChan.CustomChannelData, &assetData)
576-
if err != nil {
577-
log.Warnf("Unable to unmarshal channel asset data: %v",
578-
err)
579-
continue
580-
}
563+
return err
564+
}
581565

582-
match, err := m.ChannelMatchesFully(
583-
ctxb, assetData, assetSpecifier,
566+
// As a fallback, if we didn't find any compatible channels with the
567+
// peer, let's pick any channel that is available with this peer. This
568+
// is okay, because non-strict forwarding will ask each channel if the
569+
// bandwidth matches the provided specifier.
570+
if len(peerChans[peer]) == 0 {
571+
peerChans, err = m.FetchChannel(
572+
ctxb, asset.Specifier{}, &peer, NoIntention,
584573
)
585574
if err != nil {
586575
return err
587576
}
588-
589-
// TODO(george): Instead of returning the first result,
590-
// try to pick the best channel for what we're trying to
591-
// do (receive/send). Binding a baseSCID means we're
592-
// also binding the asset liquidity on that channel.
593-
if match {
594-
baseSCID = localChan.ChannelID
595-
break
596-
}
597577
}
598578

599-
// As a fallback, if the base SCID is not found and there's only one
600-
// channel with the target peer, assume that the base SCID corresponds
601-
// to that channel.
602-
if baseSCID == 0 && len(peerChannels) == 1 {
603-
baseSCID = peerChannels[0].ChannelID
579+
if len(peerChans[peer]) == 0 {
580+
return fmt.Errorf("cannot add scid alias with peer=%v, no "+
581+
"compatible channels found for %s", peer,
582+
&assetSpecifier)
604583
}
605584

585+
baseSCID := peerChans[peer][0].ChannelInfo.ChannelID
586+
606587
// At this point, if the base SCID is still not found, we return an
607588
// error. We can't map the SCID alias to a base SCID.
608589
if baseSCID == 0 {
@@ -1052,6 +1033,250 @@ func (m *Manager) ChannelMatchesFully(ctx context.Context,
10521033
return true, nil
10531034
}
10541035

1036+
// TapChannel is a helper struct that combines the information of an asset
1037+
// specifier that is satisfied by a channel with the channels' general
1038+
// information.
1039+
type TapChannel struct {
1040+
// Specifier is the asset Specifier that is satisfied by this channels'
1041+
// assets.
1042+
Specifier asset.Specifier
1043+
1044+
// ChannelInfo is the information about the channel the asset is
1045+
// committed to.
1046+
ChannelInfo lndclient.ChannelInfo
1047+
1048+
// AssetInfo contains the asset related info of the channel.
1049+
AssetInfo rfqmsg.JsonAssetChannel
1050+
}
1051+
1052+
// PeerChanMap is a structure that maps peers to channels. This is used for
1053+
// filtering asset channels against an asset specifier.
1054+
type PeerChanMap map[route.Vertex][]TapChannel
1055+
1056+
// ComputeChannelAssetBalance computes the total local and remote balance for
1057+
// each asset channel that matches the provided asset specifier.
1058+
func (m *Manager) ComputeChannelAssetBalance(ctx context.Context,
1059+
activeChannels []lndclient.ChannelInfo,
1060+
specifier asset.Specifier) (PeerChanMap, bool, error) {
1061+
1062+
var (
1063+
peerChanMap = make(PeerChanMap)
1064+
haveGroupedAssetChannels bool
1065+
)
1066+
for chanIdx := range activeChannels {
1067+
var (
1068+
pass bool
1069+
assetData rfqmsg.JsonAssetChannel
1070+
)
1071+
1072+
openChan := activeChannels[chanIdx]
1073+
1074+
// If there specifier is empty, we skip all asset balance
1075+
// related checks.
1076+
if specifier.IsSome() {
1077+
if len(openChan.CustomChannelData) == 0 {
1078+
continue
1079+
}
1080+
1081+
err := json.Unmarshal(
1082+
openChan.CustomChannelData, &assetData,
1083+
)
1084+
if err != nil {
1085+
return nil, false, fmt.Errorf("unable to "+
1086+
"unmarshal asset data: %w", err)
1087+
}
1088+
1089+
if len(assetData.GroupKey) > 0 {
1090+
haveGroupedAssetChannels = true
1091+
}
1092+
1093+
// Check if the assets of this channel match the
1094+
// provided specifier.
1095+
pass, err = m.ChannelMatchesFully(
1096+
ctx, assetData, specifier,
1097+
)
1098+
if err != nil {
1099+
return nil, false, err
1100+
}
1101+
}
1102+
1103+
// We also append the channel in the case where the specifier is
1104+
// empty. This means that the caller doesn't really care about
1105+
// the type of balance.
1106+
if pass || !specifier.IsSome() {
1107+
peerChanMap[openChan.PubKeyBytes] = append(
1108+
peerChanMap[openChan.PubKeyBytes],
1109+
TapChannel{
1110+
Specifier: specifier,
1111+
ChannelInfo: openChan,
1112+
AssetInfo: assetData,
1113+
},
1114+
)
1115+
}
1116+
}
1117+
1118+
return peerChanMap, haveGroupedAssetChannels, nil
1119+
}
1120+
1121+
// ChanIntention defines the intention of calling rfqChannel. This helps with
1122+
// returning the channel that is most suitable for what we want to do.
1123+
type ChanIntention uint8
1124+
1125+
const (
1126+
// NoIntention defines the absence of any intention, signalling that we
1127+
// don't really care which channel is returned.
1128+
NoIntention ChanIntention = iota
1129+
1130+
// SendIntention defines the intention to send over an asset channel.
1131+
SendIntention
1132+
1133+
// ReceiveIntention defines the intention to receive over an asset
1134+
// channel.
1135+
ReceiveIntention
1136+
)
1137+
1138+
// FetchChannel returns the channel to use for RFQ operations. It returns a map
1139+
// of peers and their eligible channels. If a peerPubKey is specified then the
1140+
// map will only contain one entry for that peer.
1141+
func (m *Manager) FetchChannel(ctx context.Context, specifier asset.Specifier,
1142+
peerPubKey *route.Vertex,
1143+
intention ChanIntention) (PeerChanMap, error) {
1144+
1145+
activeChannels, err := m.cfg.ChannelLister.ListChannels(
1146+
ctx, true, false,
1147+
)
1148+
if err != nil {
1149+
return nil, err
1150+
}
1151+
1152+
balancesMap, haveGroupChans, err := m.ComputeChannelAssetBalance(
1153+
ctx, activeChannels, specifier,
1154+
)
1155+
if err != nil {
1156+
return nil, fmt.Errorf("error computing available asset "+
1157+
"channel balance: %w", err)
1158+
}
1159+
1160+
// If the user uses the asset ID to specify what asset to use, that will
1161+
// not work for asset channels that have multiple UTXOs of grouped
1162+
// assets. The result wouldn't be deterministic anyway (meaning, it's
1163+
// not guaranteed that a specific asset ID is moved within a channel
1164+
// when an HTLC is sent, the allocation logic decides which actual UTXO
1165+
// is used). So we tell the user to use the group key instead, at least
1166+
// for channels that have multiple UTXOs of grouped assets.
1167+
if specifier.HasId() && len(balancesMap) == 0 && haveGroupChans {
1168+
return nil, fmt.Errorf("no compatible asset channel found for "+
1169+
"%s, make sure to use group key for grouped asset "+
1170+
"channels", &specifier)
1171+
}
1172+
1173+
if len(balancesMap) == 0 {
1174+
return nil, fmt.Errorf("no asset channel balance found for %s",
1175+
&specifier)
1176+
}
1177+
1178+
switch intention {
1179+
case SendIntention:
1180+
// When sending we care about the volume of our local balances,
1181+
// so we sort by local balances in descending order.
1182+
for k, v := range balancesMap {
1183+
sort.Slice(v, func(i, j int) bool {
1184+
return v[i].AssetInfo.LocalBalance >
1185+
v[j].AssetInfo.LocalBalance
1186+
})
1187+
1188+
balancesMap[k] = v
1189+
}
1190+
case ReceiveIntention:
1191+
// When sending we care about the volume of the remote balances,
1192+
// so we sort by remote balances in descending order.
1193+
for k, v := range balancesMap {
1194+
sort.Slice(v, func(i, j int) bool {
1195+
return v[i].AssetInfo.RemoteBalance >
1196+
v[j].AssetInfo.RemoteBalance
1197+
})
1198+
1199+
balancesMap[k] = v
1200+
}
1201+
case NoIntention:
1202+
// We don't care about sending or receiving, this means that
1203+
// the method was called as a dry check. Do nothing.
1204+
}
1205+
1206+
// If a peer public key was specified, we always want to use that to
1207+
// filter the asset channels.
1208+
if peerPubKey != nil {
1209+
_, ok := balancesMap[*peerPubKey]
1210+
if !ok {
1211+
return nil, fmt.Errorf("no asset channels found for "+
1212+
"%s and peer=%s", &specifier, peerPubKey)
1213+
}
1214+
1215+
filteredRes := make(PeerChanMap)
1216+
filteredRes[*peerPubKey] = balancesMap[*peerPubKey]
1217+
1218+
balancesMap = filteredRes
1219+
}
1220+
1221+
return balancesMap, nil
1222+
}
1223+
1224+
// InboundPolicyFetcher is a helper that fetches the inbound policy of a channel
1225+
// based on its chanID.
1226+
type InboundPolicyFetcher func(ctx context.Context, chanID uint64,
1227+
remotePubStr string) (*lnrpc.RoutingPolicy, error)
1228+
1229+
// RfqToHopHint creates the hop hint representation which encapsulates certain
1230+
// quote information along with some other data required by the payment to
1231+
// succeed.
1232+
// Depending on whether the hold flag is set, we either return the lnrpc version
1233+
// of a hop hint or the zpay32 version. This is because we use the lndclient
1234+
// wrapper for hold invoices while we use the raw lnrpc endpoint for simple
1235+
// invoices.
1236+
func (m *Manager) RfqToHopHint(ctx context.Context,
1237+
policyFetcher InboundPolicyFetcher, channelID uint64,
1238+
peerPubKey route.Vertex, quote *rfqrpc.PeerAcceptedBuyQuote,
1239+
hold bool) (*zpay32.HopHint, error) {
1240+
1241+
inboundPolicy, err := policyFetcher(ctx, channelID, peerPubKey.String())
1242+
if err != nil {
1243+
return nil, fmt.Errorf("unable to get inbound channel "+
1244+
"policy for channel with ID %d: %w", channelID, err)
1245+
}
1246+
1247+
peerPub, err := btcec.ParsePubKey(peerPubKey[:])
1248+
if err != nil {
1249+
return nil, fmt.Errorf("error parsing peer "+
1250+
"pubkey: %w", err)
1251+
}
1252+
hopHint := &zpay32.HopHint{
1253+
NodeID: peerPub,
1254+
ChannelID: quote.Scid,
1255+
FeeBaseMSat: uint32(inboundPolicy.FeeBaseMsat),
1256+
FeeProportionalMillionths: uint32(
1257+
inboundPolicy.FeeRateMilliMsat,
1258+
),
1259+
CLTVExpiryDelta: uint16(
1260+
inboundPolicy.TimeLockDelta,
1261+
),
1262+
}
1263+
1264+
return hopHint, nil
1265+
}
1266+
1267+
// Zpay32HopHintToLnrpc converts a zpay32 hop hint to the lnrpc representation.
1268+
func Zpay32HopHintToLnrpc(h *zpay32.HopHint) *lnrpc.HopHint {
1269+
return &lnrpc.HopHint{
1270+
NodeId: fmt.Sprintf(
1271+
"%x", h.NodeID.SerializeCompressed(),
1272+
),
1273+
ChanId: h.ChannelID,
1274+
FeeBaseMsat: h.FeeBaseMSat,
1275+
FeeProportionalMillionths: h.FeeProportionalMillionths,
1276+
CltvExpiryDelta: uint32(h.CLTVExpiryDelta),
1277+
}
1278+
}
1279+
10551280
// publishSubscriberEvent publishes an event to all subscribers.
10561281
func (m *Manager) publishSubscriberEvent(event fn.Event) {
10571282
// Iterate over the subscribers and deliver the event to each one.

0 commit comments

Comments
 (0)