Skip to content

Commit 650cf20

Browse files
committed
sweepbatcher: re-add sweeps after fully confirmed
In case of a reorg sweeps should not go to another batch but stay in the current batch until it is reorg-safely confirmed. Only after that the remaining sweeps are re-added to another batch. Field sweep.completed is now set to true only for reorg-safely confirmed sweeps. In handleConf we now use batch.persist() (i.e. store.UpdateSweepBatch) instead of ConfirmBatch, because we set not only Confirmed flag, but also batchTxid.
1 parent d2c1689 commit 650cf20

File tree

5 files changed

+249
-159
lines changed

5 files changed

+249
-159
lines changed

sweepbatcher/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ type dbBatch struct {
201201
// ID is the unique identifier of the batch.
202202
ID int32
203203

204-
// Confirmed is set when the batch is fully confirmed.
204+
// Confirmed is set when the batch is reorg-safely confirmed.
205205
Confirmed bool
206206

207207
// BatchTxid is the txid of the batch transaction.
@@ -236,7 +236,7 @@ type dbSweep struct {
236236
// Amount is the amount of the sweep.
237237
Amount btcutil.Amount
238238

239-
// Completed indicates whether this sweep is completed.
239+
// Completed indicates whether this sweep is fully-confirmed.
240240
Completed bool
241241
}
242242

sweepbatcher/sweep_batch.go

Lines changed: 130 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1943,7 +1943,6 @@ func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep,
19431943
func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19441944
var (
19451945
txHash = spendTx.TxHash()
1946-
purgeList = make([]SweepRequest, 0, len(b.sweeps))
19471946
notifyList = make([]sweep, 0, len(b.sweeps))
19481947
)
19491948
b.batchTxid = &txHash
@@ -1953,7 +1952,105 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19531952
b.Warnf("transaction %v has no outputs", txHash)
19541953
}
19551954

1956-
// Determine if we should use presigned mode for the batch.
1955+
// Make a set of confirmed sweeps.
1956+
confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn))
1957+
for _, txIn := range spendTx.TxIn {
1958+
confirmedSet[txIn.PreviousOutPoint] = struct{}{}
1959+
}
1960+
1961+
// As a previous version of the batch transaction may get confirmed,
1962+
// which does not contain the latest sweeps, we need to detect which
1963+
// sweeps are in the transaction to correctly calculate fee portions
1964+
// and notify proper sweeps.
1965+
var (
1966+
totalSweptAmt btcutil.Amount
1967+
confirmedSweeps = []wire.OutPoint{}
1968+
)
1969+
for _, sweep := range b.sweeps {
1970+
// Skip sweeps that were not included into the confirmed tx.
1971+
_, found := confirmedSet[sweep.outpoint]
1972+
if !found {
1973+
continue
1974+
}
1975+
1976+
totalSweptAmt += sweep.value
1977+
notifyList = append(notifyList, sweep)
1978+
confirmedSweeps = append(confirmedSweeps, sweep.outpoint)
1979+
}
1980+
1981+
// Calculate the fee portion that each sweep should pay for the batch.
1982+
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
1983+
spendTx, len(notifyList), totalSweptAmt,
1984+
)
1985+
1986+
for _, sweep := range notifyList {
1987+
// If the sweep's notifier is empty then this means that a swap
1988+
// is not waiting to read an update from it, so we can skip
1989+
// the notification part.
1990+
if sweep.notifier == nil ||
1991+
*sweep.notifier == (SpendNotifier{}) {
1992+
1993+
continue
1994+
}
1995+
1996+
spendDetail := SpendDetail{
1997+
Tx: spendTx,
1998+
OnChainFeePortion: getFeePortionPaidBySweep(
1999+
spendTx, feePortionPaidPerSweep,
2000+
roundingDifference, &sweep,
2001+
),
2002+
}
2003+
2004+
// Dispatch the sweep notifier, we don't care about the outcome
2005+
// of this action so we don't wait for it.
2006+
go func() {
2007+
// Make sure this context doesn't expire so we
2008+
// successfully notify the caller.
2009+
ctx := context.WithoutCancel(ctx)
2010+
2011+
sweep.notifySweepSpend(ctx, &spendDetail)
2012+
}()
2013+
}
2014+
2015+
b.Infof("spent, confirmed sweeps: %v", confirmedSweeps)
2016+
2017+
// We are no longer able to accept new sweeps, so we mark the batch as
2018+
// closed and persist on storage.
2019+
b.state = Closed
2020+
2021+
if err := b.persist(ctx); err != nil {
2022+
return fmt.Errorf("saving batch failed: %w", err)
2023+
}
2024+
2025+
if err := b.monitorConfirmations(ctx); err != nil {
2026+
return fmt.Errorf("monitorConfirmations failed: %w", err)
2027+
}
2028+
2029+
return nil
2030+
}
2031+
2032+
// handleConf handles a confirmation notification. This is the final step of the
2033+
// batch. Here we signal to the batcher that this batch was completed.
2034+
func (b *batch) handleConf(ctx context.Context,
2035+
conf *chainntnfs.TxConfirmation) error {
2036+
2037+
spendTx := conf.Tx
2038+
txHash := spendTx.TxHash()
2039+
if b.batchTxid == nil || *b.batchTxid != txHash {
2040+
b.Warnf("Mismatch of batch txid: tx in spend notification had "+
2041+
"txid %v, but confirmation notification has txif %v. "+
2042+
"Using the later.", b.batchTxid, txHash)
2043+
}
2044+
b.batchTxid = &txHash
2045+
2046+
b.Infof("confirmed in txid %s", b.batchTxid)
2047+
b.state = Confirmed
2048+
2049+
if err := b.persist(ctx); err != nil {
2050+
return fmt.Errorf("saving batch failed: %w", err)
2051+
}
2052+
2053+
// If the batch is in presigned mode, cleanup presignedHelper.
19572054
presigned, err := b.isPresigned()
19582055
if err != nil {
19592056
return fmt.Errorf("failed to determine if the batch %d uses "+
@@ -1971,40 +2068,46 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
19712068
b.id, err)
19722069
}
19732070

2071+
// Make a set of confirmed sweeps.
2072+
confirmedSet := make(map[wire.OutPoint]struct{}, len(spendTx.TxIn))
2073+
for _, txIn := range spendTx.TxIn {
2074+
confirmedSet[txIn.PreviousOutPoint] = struct{}{}
2075+
}
2076+
19742077
// As a previous version of the batch transaction may get confirmed,
19752078
// which does not contain the latest sweeps, we need to detect the
19762079
// sweeps that did not make it to the confirmed transaction and feed
19772080
// them back to the batcher. This will ensure that the sweeps will enter
19782081
// a new batch instead of remaining dangling.
19792082
var (
1980-
totalSweptAmt btcutil.Amount
19812083
confirmedSweeps = []wire.OutPoint{}
1982-
purgedSweeps = []wire.OutPoint{}
1983-
purgedSwaps = []lntypes.Hash{}
2084+
purgeList = make([]SweepRequest, 0, len(b.sweeps))
2085+
totalSweptAmt btcutil.Amount
19842086
)
19852087
for _, sweep := range allSweeps {
1986-
found := false
1987-
1988-
for _, txIn := range spendTx.TxIn {
1989-
if txIn.PreviousOutPoint == sweep.outpoint {
1990-
found = true
1991-
totalSweptAmt += sweep.value
1992-
notifyList = append(notifyList, sweep)
1993-
confirmedSweeps = append(
1994-
confirmedSweeps, sweep.outpoint,
1995-
)
1996-
1997-
break
2088+
_, found := confirmedSet[sweep.outpoint]
2089+
if found {
2090+
// Save the sweep as completed. Note that sweeps are
2091+
// marked completed after the batch is marked confirmed
2092+
// because the check in handleSweeps checks sweep's
2093+
// status first and then checks the batch status.
2094+
err := b.persistSweep(ctx, sweep, true)
2095+
if err != nil {
2096+
return err
19982097
}
2098+
2099+
confirmedSweeps = append(
2100+
confirmedSweeps, sweep.outpoint,
2101+
)
2102+
2103+
totalSweptAmt += sweep.value
2104+
2105+
continue
19992106
}
20002107

20012108
// If the sweep's outpoint was not found in the transaction's
20022109
// inputs this means it was left out. So we delete it from this
20032110
// batch and feed it back to the batcher.
2004-
if found {
2005-
continue
2006-
}
2007-
20082111
newSweep := sweep
20092112
delete(b.sweeps, sweep.outpoint)
20102113

@@ -2036,52 +2139,19 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
20362139
})
20372140
}
20382141
}
2142+
var (
2143+
purgedSweeps = []wire.OutPoint{}
2144+
purgedSwaps = []lntypes.Hash{}
2145+
)
20392146
for _, sweepReq := range purgeList {
20402147
purgedSwaps = append(purgedSwaps, sweepReq.SwapHash)
20412148
for _, input := range sweepReq.Inputs {
20422149
purgedSweeps = append(purgedSweeps, input.Outpoint)
20432150
}
20442151
}
20452152

2046-
// Calculate the fee portion that each sweep should pay for the batch.
2047-
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
2048-
spendTx, len(notifyList), totalSweptAmt,
2049-
)
2050-
2051-
for _, sweep := range notifyList {
2052-
// Save the sweep as completed.
2053-
err := b.persistSweep(ctx, sweep, true)
2054-
if err != nil {
2055-
return err
2056-
}
2057-
2058-
// If the sweep's notifier is empty then this means that a swap
2059-
// is not waiting to read an update from it, so we can skip
2060-
// the notification part.
2061-
if sweep.notifier == nil ||
2062-
*sweep.notifier == (SpendNotifier{}) {
2063-
2064-
continue
2065-
}
2066-
2067-
spendDetail := SpendDetail{
2068-
Tx: spendTx,
2069-
OnChainFeePortion: getFeePortionPaidBySweep(
2070-
spendTx, feePortionPaidPerSweep,
2071-
roundingDifference, &sweep,
2072-
),
2073-
}
2074-
2075-
// Dispatch the sweep notifier, we don't care about the outcome
2076-
// of this action so we don't wait for it.
2077-
go func() {
2078-
// Make sure this context doesn't expire so we
2079-
// successfully notify the caller.
2080-
ctx := context.WithoutCancel(ctx)
2081-
2082-
sweep.notifySweepSpend(ctx, &spendDetail)
2083-
}()
2084-
}
2153+
b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+
2154+
"purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps)
20852155

20862156
// Proceed with purging the sweeps. This will feed the sweeps that
20872157
// didn't make it to the confirmed batch transaction back to the batcher
@@ -2103,49 +2173,6 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
21032173
}
21042174
}()
21052175

2106-
b.Infof("spent, confirmed sweeps: %v, purged sweeps: %v, "+
2107-
"purged swaps: %v, purged groups: %v", confirmedSweeps,
2108-
purgedSweeps, purgedSwaps, len(purgeList))
2109-
2110-
// We are no longer able to accept new sweeps, so we mark the batch as
2111-
// closed and persist on storage.
2112-
b.state = Closed
2113-
2114-
if err = b.persist(ctx); err != nil {
2115-
return fmt.Errorf("saving batch failed: %w", err)
2116-
}
2117-
2118-
if err = b.monitorConfirmations(ctx); err != nil {
2119-
return fmt.Errorf("monitorConfirmations failed: %w", err)
2120-
}
2121-
2122-
return nil
2123-
}
2124-
2125-
// handleConf handles a confirmation notification. This is the final step of the
2126-
// batch. Here we signal to the batcher that this batch was completed. We also
2127-
// cleanup up presigned transactions whose primarySweepID is one of the sweeps
2128-
// that were spent and fully confirmed: such a transaction can't be broadcasted
2129-
// since it is either in a block or double-spends one of spent outputs.
2130-
func (b *batch) handleConf(ctx context.Context,
2131-
conf *chainntnfs.TxConfirmation) error {
2132-
2133-
spendTx := conf.Tx
2134-
txHash := spendTx.TxHash()
2135-
if b.batchTxid == nil || *b.batchTxid != txHash {
2136-
b.Warnf("Mismatch of batch txid: tx in spend notification had "+
2137-
"txid %v, but confirmation notification has txif %v. "+
2138-
"Using the later.", b.batchTxid, txHash)
2139-
}
2140-
b.batchTxid = &txHash
2141-
2142-
// If the batch is in presigned mode, cleanup presignedHelper.
2143-
presigned, err := b.isPresigned()
2144-
if err != nil {
2145-
return fmt.Errorf("failed to determine if the batch %d uses "+
2146-
"presigned mode: %w", b.id, err)
2147-
}
2148-
21492176
if presigned {
21502177
b.Infof("Cleaning up presigned store")
21512178

@@ -2161,19 +2188,7 @@ func (b *batch) handleConf(ctx context.Context,
21612188
}
21622189
}
21632190

2164-
b.Infof("confirmed in txid %s", b.batchTxid)
2165-
b.state = Confirmed
2166-
2167-
if err := b.store.ConfirmBatch(ctx, b.id); err != nil {
2168-
return fmt.Errorf("failed to store confirmed state: %w", err)
2169-
}
2170-
21712191
// Calculate the fee portion that each sweep should pay for the batch.
2172-
// TODO: make sure spendTx matches b.sweeps.
2173-
var totalSweptAmt btcutil.Amount
2174-
for _, s := range b.sweeps {
2175-
totalSweptAmt += s.value
2176-
}
21772192
feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep(
21782193
spendTx, len(b.sweeps), totalSweptAmt,
21792194
)

0 commit comments

Comments
 (0)