Skip to content

Commit 5eb134b

Browse files
authored
Merge pull request #1530 from lightninglabs/1520-skip-send-transfer-anchor-broadcast
Add support for skipping anchor transaction broadcast during asset transfer
2 parents f6baa2c + db6356f commit 5eb134b

24 files changed

+1208
-964
lines changed

itest/mint_fund_seal_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ func testMintFundSealAssets(t *harnessTest) {
464464

465465
// Finalize and publish the transfer anchor TX.
466466
signedPkt := FinalizePacket(t.t, aliceLnd.RPC, transferPkt)
467-
logResp := LogAndPublish(
467+
logResp := PublishAndLogTransfer(
468468
t.t, aliceTapd, signedPkt, bobVpsbt, passiveVpsbts,
469469
commitResp,
470470
)

itest/multi_send_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ func testAnchorMultipleVirtualTransactions(t *harnessTest) {
374374

375375
btcPacket = signPacket(t.t, aliceLnd, btcPacket)
376376
btcPacket = FinalizePacket(t.t, aliceLnd.RPC, btcPacket)
377-
sendResp = LogAndPublish(
377+
sendResp = PublishAndLogTransfer(
378378
t.t, aliceTapd, btcPacket, activeAssets, passiveAssets,
379379
commitResp,
380380
)

itest/multisig.go

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func MultiSigTest(t *testing.T, ctx context.Context, aliceTapd,
334334
// transaction.
335335
signedPkt := FinalizePacket(t, bobLnd, btcWithdrawPkt)
336336

337-
logResp := LogAndPublish(
337+
logResp := PublishAndLogTransfer(
338338
t, bobTapd, signedPkt, finalizedWithdrawPackets, nil,
339339
commitResp,
340340
)
@@ -490,10 +490,57 @@ func FinalizePacket(t *testing.T, lnd *rpc.HarnessRPC,
490490
return signedPacket
491491
}
492492

493-
func LogAndPublish(t *testing.T, tapd commands.RpcClientsBundle,
493+
// PublishAndLogTransferOption defines a functional option for
494+
// PublishAndLogTransfer.
495+
type PublishAndLogTransferOption func(*publishAndLogTransferOptions)
496+
497+
// publishAndLogTransferOptions contains the options for PublishAndLogTransfer.
498+
type publishAndLogTransferOptions struct {
499+
// skipAnchorTxBroadcast indicates whether to skip the broadcast of
500+
// the anchor transaction.
501+
skipAnchorTxBroadcast bool
502+
503+
// label is the label to use for the transfer.
504+
label string
505+
}
506+
507+
// defaultPublishAndLogTransferOptions returns the default options for
508+
// PublishAndLogTransfer.
509+
func defaultPublishAndLogTransferOptions() *publishAndLogTransferOptions {
510+
return &publishAndLogTransferOptions{}
511+
}
512+
513+
// withSkipAnchorTxBroadcast is an option for PublishAndLogTransfer that
514+
// indicates whether to skip the broadcast of the anchor transaction.
515+
func withSkipAnchorTxBroadcast() PublishAndLogTransferOption {
516+
return func(opts *publishAndLogTransferOptions) {
517+
opts.skipAnchorTxBroadcast = true
518+
}
519+
}
520+
521+
// withLabel is an option for PublishAndLogTransfer that sets the label for
522+
// the transfer.
523+
func withLabel(label string) PublishAndLogTransferOption {
524+
return func(opts *publishAndLogTransferOptions) {
525+
opts.label = label
526+
}
527+
}
528+
529+
// PublishAndLogTransfer is a helper function that invokes the
530+
// PublishAndLogTransfer RPC endpoint on the specified tapd node. This endpoint
531+
// performs a pre-anchored transfer.
532+
func PublishAndLogTransfer(t *testing.T, tapd commands.RpcClientsBundle,
494533
btcPkt *psbt.Packet, activeAssets []*tappsbt.VPacket,
495534
passiveAssets []*tappsbt.VPacket,
496-
commitResp *wrpc.CommitVirtualPsbtsResponse) *taprpc.SendAssetResponse {
535+
commitResp *wrpc.CommitVirtualPsbtsResponse,
536+
opts ...PublishAndLogTransferOption) *taprpc.SendAssetResponse {
537+
538+
t.Helper()
539+
540+
options := defaultPublishAndLogTransferOptions()
541+
for _, opt := range opts {
542+
opt(options)
543+
}
497544

498545
ctxb := context.Background()
499546
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
@@ -504,11 +551,13 @@ func LogAndPublish(t *testing.T, tapd commands.RpcClientsBundle,
504551
require.NoError(t, err)
505552

506553
request := &wrpc.PublishAndLogRequest{
507-
AnchorPsbt: buf.Bytes(),
508-
VirtualPsbts: make([][]byte, len(activeAssets)),
509-
PassiveAssetPsbts: make([][]byte, len(passiveAssets)),
510-
ChangeOutputIndex: commitResp.ChangeOutputIndex,
511-
LndLockedUtxos: commitResp.LndLockedUtxos,
554+
AnchorPsbt: buf.Bytes(),
555+
VirtualPsbts: make([][]byte, len(activeAssets)),
556+
PassiveAssetPsbts: make([][]byte, len(passiveAssets)),
557+
ChangeOutputIndex: commitResp.ChangeOutputIndex,
558+
LndLockedUtxos: commitResp.LndLockedUtxos,
559+
SkipAnchorTxBroadcast: options.skipAnchorTxBroadcast,
560+
Label: options.label,
512561
}
513562

514563
for idx := range activeAssets {

itest/psbt_test.go

Lines changed: 73 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ import (
77
"fmt"
88
"net/url"
99
"testing"
10+
"time"
1011

1112
"github.com/btcsuite/btcd/btcec/v2/schnorr"
1213
"github.com/btcsuite/btcd/btcutil"
1314
"github.com/btcsuite/btcd/btcutil/hdkeychain"
1415
"github.com/btcsuite/btcd/btcutil/psbt"
16+
"github.com/btcsuite/btcd/chaincfg/chainhash"
1517
"github.com/btcsuite/btcd/txscript"
18+
"github.com/btcsuite/btcd/wire"
1619
"github.com/btcsuite/btcwallet/waddrmgr"
1720
"github.com/davecgh/go-spew/spew"
1821
"github.com/lightninglabs/taproot-assets/address"
@@ -1124,7 +1127,7 @@ func testPsbtInteractiveAltLeafAnchoring(t *harnessTest) {
11241127

11251128
commitPacket = signPacket(t.t, senderLnd, commitPacket)
11261129
commitPacket = FinalizePacket(t.t, senderLnd.RPC, commitPacket)
1127-
publishResp := LogAndPublish(
1130+
publishResp := PublishAndLogTransfer(
11281131
t.t, sender, commitPacket, []*tappsbt.VPacket{activePacket},
11291132
[]*tappsbt.VPacket{passivePacket}, commitResp,
11301133
)
@@ -2438,7 +2441,7 @@ func testPsbtTrustlessSwap(t *harnessTest) {
24382441
signedPkt := finalizePacket(t.t, lndBob, finalPsbt)
24392442
require.True(t.t, signedPkt.IsComplete())
24402443

2441-
logResp := logAndPublish(
2444+
logResp := PublishAndLogTransfer(
24422445
t.t, alice, signedPkt, []*tappsbt.VPacket{bobVPsbt}, nil, resp,
24432446
)
24442447
t.Logf("Logged transaction: %v", toJSON(t.t, logResp))
@@ -2611,18 +2614,80 @@ func testPsbtExternalCommit(t *harnessTest) {
26112614

26122615
btcPacket = signPacket(t.t, aliceLnd, btcPacket)
26132616
btcPacket = FinalizePacket(t.t, aliceLnd.RPC, btcPacket)
2614-
sendResp := LogAndPublish(
2617+
2618+
transferLabel := "itest-psbt-external-commit"
2619+
2620+
// Subscribe to the send event stream so we can verify the sender's
2621+
// state transitions during this test.
2622+
ctx, streamCancel := context.WithCancel(ctxb)
2623+
stream, err := aliceTapd.SubscribeSendEvents(
2624+
ctx, &taprpc.SubscribeSendEventsRequest{
2625+
FilterLabel: transferLabel,
2626+
},
2627+
)
2628+
require.NoError(t.t, err)
2629+
sendEvents := &EventSubscription[*taprpc.SendEvent]{
2630+
ClientEventStream: stream,
2631+
Cancel: streamCancel,
2632+
}
2633+
2634+
// Execute the transfer but skip anchor transaction broadcast. This
2635+
// simulates a packaging workflow where broadcasting is handled
2636+
// externally.
2637+
sendResp := PublishAndLogTransfer(
26152638
t.t, aliceTapd, btcPacket, activeAssets, passiveAssets,
2616-
commitResp,
2639+
commitResp, withSkipAnchorTxBroadcast(),
2640+
withLabel(transferLabel),
26172641
)
26182642

2643+
// Assert that the state machine transitions directly to waiting for
2644+
// tx confirmation, skipping the broadcast state.
2645+
require.Eventually(t.t, func() bool {
2646+
isMatchingState := func(msg *taprpc.SendEvent) bool {
2647+
lastState := tapfreighter.SendStateStorePreBroadcast
2648+
nextState := tapfreighter.SendStateWaitTxConf
2649+
2650+
return msg.SendState == lastState.String() &&
2651+
msg.NextSendState == nextState.String()
2652+
}
2653+
2654+
for {
2655+
msg, err := sendEvents.Recv()
2656+
if err != nil {
2657+
return false
2658+
}
2659+
2660+
return isMatchingState(msg)
2661+
}
2662+
}, defaultWaitTimeout, time.Second)
2663+
2664+
// Unmarshal the anchor transaction.
2665+
var anchorTx wire.MsgTx
2666+
reader := bytes.NewReader(sendResp.Transfer.AnchorTx)
2667+
err = anchorTx.Deserialize(reader)
2668+
require.NoError(t.t, err)
2669+
2670+
// Ensure that the anchor PSBT matches the returned anchor
2671+
// transaction.
2672+
require.Equal(t.t, anchorTx.TxHash(), btcPacket.UnsignedTx.TxHash())
2673+
2674+
// Assert that the anchor transaction is not in the mempool.
2675+
miner := t.lndHarness.Miner()
2676+
miner.AssertTxnsNotInMempool([]chainhash.Hash{
2677+
anchorTx.TxHash(),
2678+
})
2679+
2680+
// Add the anchor transaction to the mempool and mine.
2681+
miner.MineBlockWithTx(&anchorTx)
2682+
2683+
// Assert that the transfer has the correct number of outputs.
26192684
expectedAmounts := []uint64{
26202685
targetAsset.Amount - assetsToSend, assetsToSend,
26212686
}
2622-
ConfirmAndAssertOutboundTransferWithOutputs(
2687+
AssertAssetOutboundTransferWithOutputs(
26232688
t.t, t.lndHarness.Miner().Client, aliceTapd,
2624-
sendResp, targetAssetGenesis.AssetId, expectedAmounts,
2625-
0, 1, len(expectedAmounts),
2689+
sendResp.Transfer, targetAssetGenesis.AssetId, expectedAmounts,
2690+
0, 1, len(expectedAmounts), false,
26262691
)
26272692

26282693
// And now the event should be completed on both sides.
@@ -3273,7 +3338,7 @@ func testPsbtRelativeLockTimeSendProofFail(t *harnessTest) {
32733338
Cancel: streamCancel,
32743339
}
32753340

3276-
LogAndPublish(t.t, bob, btcPacket, vPackets, nil, commitResp)
3341+
PublishAndLogTransfer(t.t, bob, btcPacket, vPackets, nil, commitResp)
32773342

32783343
MineBlocks(t.t, t.lndHarness.Miner().Client, 1, 1)
32793344

@@ -3424,45 +3489,6 @@ func finalizePacket(t *testing.T, lnd *node.HarnessNode,
34243489
return signedPacket
34253490
}
34263491

3427-
func logAndPublish(t *testing.T, tapd *tapdHarness, btcPkt *psbt.Packet,
3428-
activeAssets []*tappsbt.VPacket, passiveAssets []*tappsbt.VPacket,
3429-
commitResp *wrpc.CommitVirtualPsbtsResponse) *taprpc.SendAssetResponse {
3430-
3431-
ctxb := context.Background()
3432-
ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout)
3433-
defer cancel()
3434-
3435-
var buf bytes.Buffer
3436-
err := btcPkt.Serialize(&buf)
3437-
require.NoError(t, err)
3438-
3439-
request := &wrpc.PublishAndLogRequest{
3440-
AnchorPsbt: buf.Bytes(),
3441-
VirtualPsbts: make([][]byte, len(activeAssets)),
3442-
PassiveAssetPsbts: make([][]byte, len(passiveAssets)),
3443-
ChangeOutputIndex: commitResp.ChangeOutputIndex,
3444-
LndLockedUtxos: commitResp.LndLockedUtxos,
3445-
}
3446-
3447-
for idx := range activeAssets {
3448-
request.VirtualPsbts[idx], err = tappsbt.Encode(
3449-
activeAssets[idx],
3450-
)
3451-
require.NoError(t, err)
3452-
}
3453-
for idx := range passiveAssets {
3454-
request.PassiveAssetPsbts[idx], err = tappsbt.Encode(
3455-
passiveAssets[idx],
3456-
)
3457-
require.NoError(t, err)
3458-
}
3459-
3460-
resp, err := tapd.PublishAndLogTransfer(ctxt, request)
3461-
require.NoError(t, err)
3462-
3463-
return resp
3464-
}
3465-
34663492
// getAddressBip32Derivation returns the PSBT BIP-0032 derivation info of an
34673493
// address.
34683494
func getAddressBip32Derivation(t testing.TB, addr string,

rpcserver.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2994,6 +2994,7 @@ func (r *rpcServer) PublishAndLogTransfer(ctx context.Context,
29942994
resp, err := r.cfg.ChainPorter.RequestShipment(
29952995
tapfreighter.NewPreAnchoredParcel(
29962996
activePackets, passivePackets, anchorTx,
2997+
req.SkipAnchorTxBroadcast, req.Label,
29972998
),
29982999
)
29993000
if err != nil {
@@ -3713,6 +3714,19 @@ func marshalOutboundParcel(
37133714
}
37143715
})
37153716

3717+
// Serialize the anchor transaction if it exists.
3718+
var anchorTxBytes []byte
3719+
if parcel.AnchorTx != nil {
3720+
var b bytes.Buffer
3721+
err := parcel.AnchorTx.Serialize(&b)
3722+
if err != nil {
3723+
return nil, fmt.Errorf("unable to serialize anchor "+
3724+
"tx: %w", err)
3725+
}
3726+
3727+
anchorTxBytes = b.Bytes()
3728+
}
3729+
37163730
return &taprpc.AssetTransfer{
37173731
TransferTimestamp: parcel.TransferTime.Unix(),
37183732
AnchorTxHash: anchorTxHash[:],
@@ -3723,6 +3737,7 @@ func marshalOutboundParcel(
37233737
Inputs: rpcInputs,
37243738
Outputs: rpcOutputs,
37253739
Label: parcel.Label,
3740+
AnchorTx: anchorTxBytes,
37263741
}, nil
37273742
}
37283743

@@ -4242,6 +4257,7 @@ func marshalSendEvent(event fn.Event) (*taprpc.SendEvent, error) {
42424257
VirtualPackets: make([][]byte, len(e.VirtualPackets)),
42434258
PassiveVirtualPackets: make([][]byte, len(e.PassivePackets)),
42444259
TransferLabel: e.TransferLabel,
4260+
NextSendState: e.NextSendState.String(),
42454261
}
42464262

42474263
if e.Error != nil {

tapchannel/aux_closer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ func shipChannelTxn(txSender tapfreighter.Porter, chanTx *wire.MsgTx,
645645
FinalTx: chanTx,
646646
}
647647
preSignedParcel := tapfreighter.NewPreAnchoredParcel(
648-
vPkts, nil, closeAnchor,
648+
vPkts, nil, closeAnchor, false, "",
649649
)
650650
_, err = txSender.RequestShipment(preSignedParcel)
651651
if err != nil {

tapchannel/aux_funding_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1392,7 +1392,7 @@ func (f *FundingController) completeChannelFunding(ctx context.Context,
13921392
FinalTx: signedFundingTx,
13931393
}
13941394
preSignedParcel := tapfreighter.NewPreAnchoredParcel(
1395-
activePkts, passivePkts, anchorTx,
1395+
activePkts, passivePkts, anchorTx, false, "",
13961396
)
13971397
_, err = f.cfg.TxSender.RequestShipment(preSignedParcel)
13981398
if err != nil {

tapdb/assets_store.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,10 +2442,11 @@ func (a *AssetStore) LogPendingParcel(ctx context.Context,
24422442
// outputs will reference. We'll insert this next, so we can
24432443
// use its ID.
24442444
transferID, err := q.InsertAssetTransfer(ctx, NewAssetTransfer{
2445-
HeightHint: int32(spend.AnchorTxHeightHint),
2446-
AnchorTxid: newAnchorTXID[:],
2447-
TransferTimeUnix: spend.TransferTime,
2448-
Label: sqlStr(spend.Label),
2445+
HeightHint: int32(spend.AnchorTxHeightHint),
2446+
AnchorTxid: newAnchorTXID[:],
2447+
TransferTimeUnix: spend.TransferTime,
2448+
Label: sqlStr(spend.Label),
2449+
SkipAnchorTxBroadcast: spend.SkipAnchorTxBroadcast,
24492450
})
24502451
if err != nil {
24512452
return fmt.Errorf("unable to insert asset transfer: "+
@@ -3545,15 +3546,17 @@ func (a *AssetStore) QueryParcels(ctx context.Context,
35453546
)
35463547
}
35473548

3549+
// nolint:lll
35483550
parcel := &tapfreighter.OutboundParcel{
3549-
AnchorTx: anchorTx,
3550-
AnchorTxHeightHint: uint32(dbT.HeightHint),
3551-
AnchorTxBlockHash: anchorTxBlockHash,
3552-
TransferTime: dbT.TransferTimeUnix.UTC(),
3553-
ChainFees: dbAnchorTx.ChainFees,
3554-
Inputs: inputs,
3555-
Outputs: outputs,
3556-
Label: dbT.Label.String,
3551+
AnchorTx: anchorTx,
3552+
AnchorTxHeightHint: uint32(dbT.HeightHint),
3553+
AnchorTxBlockHash: anchorTxBlockHash,
3554+
TransferTime: dbT.TransferTimeUnix.UTC(),
3555+
ChainFees: dbAnchorTx.ChainFees,
3556+
Inputs: inputs,
3557+
Outputs: outputs,
3558+
Label: dbT.Label.String,
3559+
SkipAnchorTxBroadcast: dbT.SkipAnchorTxBroadcast,
35573560
}
35583561

35593562
// Set the block height if the anchor is marked as

tapdb/migrations.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const (
2424
// daemon.
2525
//
2626
// NOTE: This MUST be updated when a new migration is added.
27-
LatestMigrationVersion = 34
27+
LatestMigrationVersion = 35
2828
)
2929

3030
// DatabaseBackend is an interface that contains all methods our different
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- Remove the skip_anchor_tx_broadcast flag from asset_transfers table.
2+
ALTER TABLE asset_transfers DROP COLUMN skip_anchor_tx_broadcast;

0 commit comments

Comments
 (0)