Skip to content

Commit b1d52fd

Browse files
chore: backport tx status (#1405)
## Description backport TxStatus work from main --------- Co-authored-by: CHAMI Rachid <chamirachid1@gmail.com>
1 parent 2e81d61 commit b1d52fd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1279
-51
lines changed

consensus/replay_file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
330330
}
331331

332332
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
333-
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
333+
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, sm.WithBlockStore(blockStore))
334334

335335
consensusState := NewState(csConfig, state.Copy(), blockExec,
336336
blockStore, mempool, evpool)

consensus/replay_stubs.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ func (emptyMempool) TxsBytes() int64 { return 0 }
4747
func (emptyMempool) TxsFront() *clist.CElement { return nil }
4848
func (emptyMempool) TxsWaitChan() <-chan struct{} { return nil }
4949

50-
func (emptyMempool) InitWAL() error { return nil }
51-
func (emptyMempool) CloseWAL() {}
50+
func (emptyMempool) InitWAL() error { return nil }
51+
func (emptyMempool) CloseWAL() {}
52+
func (emptyMempool) GetTxByKey(types.TxKey) (types.Tx, bool) { return nil, false }
53+
func (emptyMempool) WasRecentlyEvicted(types.TxKey) bool { return false }
5254

5355
//-----------------------------------------------------------------------------
5456
// mockProxyApp uses ABCIResponses to give the right results.

consensus/replay_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
mempl "github.com/tendermint/tendermint/mempool"
2828
"github.com/tendermint/tendermint/privval"
2929
cmtstate "github.com/tendermint/tendermint/proto/tendermint/state"
30+
cmtstore "github.com/tendermint/tendermint/proto/tendermint/store"
3031
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
3132
"github.com/tendermint/tendermint/proxy"
3233
sm "github.com/tendermint/tendermint/state"
@@ -58,7 +59,7 @@ func TestMain(m *testing.M) {
5859
// the `Handshake Tests` are for failures in applying the block.
5960
// With the help of the WAL, we can recover from it all!
6061

61-
//------------------------------------------------------------------------------------------
62+
// ------------------------------------------------------------------------------------------
6263
// WAL Tests
6364

6465
// TODO: It would be better to verify explicitly which states we can recover from without the wal
@@ -320,7 +321,7 @@ var (
320321
sim testSim
321322
)
322323

323-
//---------------------------------------
324+
// ---------------------------------------
324325
// Test handshake/replay
325326

326327
// 0 - all synced up
@@ -1041,7 +1042,7 @@ func (app *badApp) Commit() abci.ResponseCommit {
10411042
panic("either allHashesAreWrong or onlyLastHashIsWrong must be set")
10421043
}
10431044

1044-
//--------------------------
1045+
// --------------------------
10451046
// utils for making blocks
10461047

10471048
func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
@@ -1187,8 +1188,9 @@ func stateAndStore(
11871188
return stateDB, state, store
11881189
}
11891190

1190-
//----------------------------------
1191+
// ----------------------------------
11911192
// mock block store
1193+
var _ sm.BlockStore = &mockBlockStore{}
11921194

11931195
type mockBlockStore struct {
11941196
config *cfg.Config
@@ -1222,6 +1224,10 @@ func (bs *mockBlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
12221224
func (bs *mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { return nil }
12231225
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
12241226
}
1227+
func (bs *mockBlockStore) SaveTxInfo(block *types.Block, txResponseCode []uint32) error {
1228+
return nil
1229+
}
1230+
func (bs *mockBlockStore) LoadTxInfo(hash []byte) *cmtstore.TxInfo { return &cmtstore.TxInfo{} }
12251231

12261232
func (bs *mockBlockStore) LoadBlockCommit(height int64) *types.Commit {
12271233
return bs.commits[height-1]
@@ -1242,7 +1248,7 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
12421248
return pruned, nil
12431249
}
12441250

1245-
//---------------------------------------
1251+
// ---------------------------------------
12461252
// Test handshake/init chain
12471253

12481254
func TestHandshakeUpdatesValidators(t *testing.T) {

consensus/wal_generator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
8484
})
8585
mempool := emptyMempool{}
8686
evpool := sm.EmptyEvidencePool{}
87-
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
87+
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, sm.WithBlockStore(blockStore))
8888
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
8989
consensusState.SetLogger(logger)
9090
consensusState.SetEventBus(eventBus)

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/go-logfmt/logfmt v0.6.0
2323
github.com/gofrs/uuid v4.4.0+incompatible
2424
github.com/gogo/protobuf v1.3.2
25+
github.com/golang/mock v1.4.4
2526
github.com/golang/protobuf v1.5.3
2627
github.com/golangci/golangci-lint v1.52.0
2728
github.com/google/orderedcode v0.0.1

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU
370370
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
371371
github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
372372
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
373+
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
373374
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
374375
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
375376
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

light/proxy/routes.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
3939
"consensus_params": rpcserver.NewRPCFunc(makeConsensusParamsFunc(c), "height", rpcserver.Cacheable("height")),
4040
"unconfirmed_txs": rpcserver.NewRPCFunc(makeUnconfirmedTxsFunc(c), "limit"),
4141
"num_unconfirmed_txs": rpcserver.NewRPCFunc(makeNumUnconfirmedTxsFunc(c), ""),
42+
"tx_status": rpcserver.NewRPCFunc(makeTxStatusFunc(c), "hash"),
4243

4344
// tx broadcast API
4445
"broadcast_tx_commit": rpcserver.NewRPCFunc(makeBroadcastTxCommitFunc(c), "tx"),
@@ -143,6 +144,14 @@ func makeBlockResultsFunc(c *lrpc.Client) rpcBlockResultsFunc {
143144
}
144145
}
145146

147+
type rpcTxStatusFunc func(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error)
148+
149+
func makeTxStatusFunc(c *lrpc.Client) rpcTxStatusFunc {
150+
return func(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultTxStatus, error) {
151+
return c.TxStatus(ctx.Context(), hash)
152+
}
153+
}
154+
146155
type rpcCommitFunc func(ctx *rpctypes.Context, height *int64) (*ctypes.ResultCommit, error)
147156

148157
func makeCommitFunc(c *lrpc.Client) rpcCommitFunc {

light/rpc/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,11 @@ func (c *Client) BlockResults(ctx context.Context, height *int64) (*ctypes.Resul
487487
return res, nil
488488
}
489489

490+
// TxStatus retrieves the status of the transaction given its hash.
491+
func (c *Client) TxStatus(ctx context.Context, hash []byte) (*ctypes.ResultTxStatus, error) {
492+
return c.next.TxStatus(ctx, hash)
493+
}
494+
490495
// Header fetches and verifies the header directly via the light client
491496
func (c *Client) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) {
492497
lb, err := c.updateLightClientIfNeededTo(ctx, height)

mempool/cache.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ type TxCache interface {
2626
// Has reports whether tx is present in the cache. Checking for presence is
2727
// not treated as an access of the value.
2828
Has(tx types.Tx) bool
29+
30+
// HasKey reports whether the given key is present in the cache.
31+
HasKey(key types.TxKey) bool
2932
}
3033

3134
var _ TxCache = (*LRUTxCache)(nil)
@@ -113,12 +116,21 @@ func (c *LRUTxCache) Has(tx types.Tx) bool {
113116
return ok
114117
}
115118

119+
func (c *LRUTxCache) HasKey(key types.TxKey) bool {
120+
c.mtx.Lock()
121+
defer c.mtx.Unlock()
122+
123+
_, ok := c.cacheMap[key]
124+
return ok
125+
}
126+
116127
// NopTxCache defines a no-op raw transaction cache.
117128
type NopTxCache struct{}
118129

119130
var _ TxCache = (*NopTxCache)(nil)
120131

121-
func (NopTxCache) Reset() {}
122-
func (NopTxCache) Push(types.Tx) bool { return true }
123-
func (NopTxCache) Remove(types.Tx) {}
124-
func (NopTxCache) Has(types.Tx) bool { return false }
132+
func (NopTxCache) Reset() {}
133+
func (NopTxCache) Push(types.Tx) bool { return true }
134+
func (NopTxCache) Remove(types.Tx) {}
135+
func (NopTxCache) Has(types.Tx) bool { return false }
136+
func (NopTxCache) HasKey(types.TxKey) bool { return false }

mempool/cat/pool.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type TxPool struct {
6464

6565
// Thread-safe cache of rejected transactions for quick look-up
6666
rejectedTxCache *LRUTxCache
67+
// Thread-safe cache of evicted transactions for quick look-up
68+
evictedTxCache *LRUTxCache
6769
// Thread-safe list of transactions peers have seen that we have not yet seen
6870
seenByPeersSet *SeenTxSet
6971

@@ -92,6 +94,7 @@ func NewTxPool(
9294
proxyAppConn: proxyAppConn,
9395
metrics: mempool.NopMetrics(),
9496
rejectedTxCache: NewLRUTxCache(cfg.CacheSize),
97+
evictedTxCache: NewLRUTxCache(cfg.CacheSize / 5),
9598
seenByPeersSet: NewSeenTxSet(),
9699
height: height,
97100
preCheckFn: func(_ types.Tx) error { return nil },
@@ -171,16 +174,28 @@ func (txmp *TxPool) Has(txKey types.TxKey) bool {
171174
return txmp.store.has(txKey)
172175
}
173176

174-
// Get retrieves a transaction based on the key. It returns a bool
175-
// if the transaction exists or not
177+
// Get retrieves a transaction based on the key.
178+
// Deprecated: use GetTxByKey instead.
176179
func (txmp *TxPool) Get(txKey types.TxKey) (types.Tx, bool) {
180+
return txmp.GetTxByKey(txKey)
181+
}
182+
183+
// GetTxByKey retrieves a transaction based on the key. It returns a bool
184+
// indicating whether transaction was found in the cache.
185+
func (txmp *TxPool) GetTxByKey(txKey types.TxKey) (types.Tx, bool) {
177186
wtx := txmp.store.get(txKey)
178187
if wtx != nil {
179188
return wtx.tx, true
180189
}
181190
return types.Tx{}, false
182191
}
183192

193+
// WasRecentlyEvicted returns a bool indicating whether the transaction with
194+
// the specified key was recently evicted and is currently within the cache.
195+
func (txmp *TxPool) WasRecentlyEvicted(txKey types.TxKey) bool {
196+
return txmp.evictedTxCache.Has(txKey)
197+
}
198+
184199
// IsRejectedTx returns true if the transaction was recently rejected and is
185200
// currently within the cache
186201
func (txmp *TxPool) IsRejectedTx(txKey types.TxKey) bool {
@@ -195,9 +210,13 @@ func (txmp *TxPool) CheckToPurgeExpiredTxs() {
195210
defer txmp.updateMtx.Unlock()
196211
if txmp.config.TTLDuration > 0 && time.Since(txmp.lastPurgeTime) > txmp.config.TTLDuration {
197212
expirationAge := time.Now().Add(-txmp.config.TTLDuration)
198-
// a height of 0 means no transactions will be removed because of height
213+
// A height of 0 means no transactions will be removed because of height
199214
// (in other words, no transaction has a height less than 0)
200-
numExpired := txmp.store.purgeExpiredTxs(0, expirationAge)
215+
purgedTxs, numExpired := txmp.store.purgeExpiredTxs(0, expirationAge)
216+
// Add the purged transactions to the evicted cache
217+
for _, tx := range purgedTxs {
218+
txmp.evictedTxCache.Push(tx.key)
219+
}
201220
txmp.metrics.EvictedTxs.Add(float64(numExpired))
202221
txmp.lastPurgeTime = time.Now()
203222
}
@@ -373,6 +392,7 @@ func (txmp *TxPool) Flush() {
373392
txmp.store.reset()
374393
txmp.seenByPeersSet.Reset()
375394
txmp.rejectedTxCache.Reset()
395+
txmp.evictedTxCache.Reset()
376396
txmp.metrics.EvictedTxs.Add(float64(size))
377397
txmp.broadcastMtx.Lock()
378398
defer txmp.broadcastMtx.Unlock()
@@ -537,6 +557,7 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC
537557
// drop the new one.
538558
if len(victims) == 0 || victimBytes < wtx.size() {
539559
txmp.metrics.EvictedTxs.Add(1)
560+
txmp.evictedTxCache.Push(wtx.key)
540561
checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
541562
wtx.key)
542563
return fmt.Errorf("rejected valid incoming transaction; mempool is full (%X). Size: (%d:%d)",
@@ -591,6 +612,7 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC
591612

592613
func (txmp *TxPool) evictTx(wtx *wrappedTx) {
593614
txmp.store.remove(wtx.key)
615+
txmp.evictedTxCache.Push(wtx.key)
594616
txmp.metrics.EvictedTxs.Add(1)
595617
txmp.logger.Debug(
596618
"evicted valid existing transaction; mempool full",
@@ -720,7 +742,11 @@ func (txmp *TxPool) purgeExpiredTxs(blockHeight int64) {
720742
expirationAge = time.Time{}
721743
}
722744

723-
numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge)
745+
purgedTxs, numExpired := txmp.store.purgeExpiredTxs(expirationHeight, expirationAge)
746+
// Add the purged transactions to the evicted cache
747+
for _, tx := range purgedTxs {
748+
txmp.evictedTxCache.Push(tx.key)
749+
}
724750
txmp.metrics.EvictedTxs.Add(float64(numExpired))
725751

726752
// purge old evicted and seen transactions

mempool/cat/pool_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ func TestTxPool_Eviction(t *testing.T) {
244244
mustCheckTx(t, txmp, "key1=0000=25")
245245
require.True(t, txExists("key1=0000=25"))
246246
require.False(t, txExists(bigTx))
247+
require.True(t, txmp.WasRecentlyEvicted(types.Tx(bigTx).Key()))
247248
require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes())
248249

249250
// Now fill up the rest of the slots with other transactions.
@@ -257,23 +258,27 @@ func TestTxPool_Eviction(t *testing.T) {
257258
require.Error(t, err)
258259
require.Contains(t, err.Error(), "mempool is full")
259260
require.False(t, txExists("key6=0005=1"))
261+
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key6=0005=1").Key()))
260262

261263
// A new transaction with higher priority should evict key5, which is the
262264
// newest of the two transactions with lowest priority.
263265
mustCheckTx(t, txmp, "key7=0006=7")
264266
require.True(t, txExists("key7=0006=7")) // new transaction added
265267
require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted
266-
require.True(t, txExists("key4=0003=3")) // older low-priority tx retained
268+
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key5=0004=3").Key()))
269+
require.True(t, txExists("key4=0003=3")) // older low-priority tx retained
267270

268271
// Another new transaction evicts the other low-priority element.
269272
mustCheckTx(t, txmp, "key8=0007=20")
270273
require.True(t, txExists("key8=0007=20"))
271274
require.False(t, txExists("key4=0003=3"))
275+
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key4=0003=3").Key()))
272276

273277
// Now the lowest-priority tx is 5, so that should be the next to go.
274278
mustCheckTx(t, txmp, "key9=0008=9")
275279
require.True(t, txExists("key9=0008=9"))
276280
require.False(t, txExists("key2=0001=5"))
281+
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key2=0001=5").Key()))
277282

278283
// Add a transaction that requires eviction of multiple lower-priority
279284
// entries, in order to fit the size of the element.
@@ -282,8 +287,11 @@ func TestTxPool_Eviction(t *testing.T) {
282287
require.True(t, txExists("key8=0007=20"))
283288
require.True(t, txExists("key10=0123456789abcdef=11"))
284289
require.False(t, txExists("key3=0002=10"))
290+
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key3=0002=10").Key()))
285291
require.False(t, txExists("key9=0008=9"))
292+
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key9=0008=9").Key()))
286293
require.False(t, txExists("key7=0006=7"))
294+
require.True(t, txmp.WasRecentlyEvicted(types.Tx("key7=0006=7").Key()))
287295

288296
// Free up some space so we can add back previously evicted txs
289297
err = txmp.Update(1, types.Txs{types.Tx("key10=0123456789abcdef=11")}, []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}}, nil, nil)
@@ -296,6 +304,7 @@ func TestTxPool_Eviction(t *testing.T) {
296304
// space for the previously evicted tx
297305
require.NoError(t, txmp.RemoveTxByKey(types.Tx("key8=0007=20").Key()))
298306
require.False(t, txExists("key8=0007=20"))
307+
require.False(t, txmp.WasRecentlyEvicted(types.Tx("key8=0007=20").Key()))
299308
}
300309

301310
func TestTxPool_Flush(t *testing.T) {
@@ -567,6 +576,10 @@ func TestTxPool_ExpiredTxs_Timestamp(t *testing.T) {
567576

568577
// All the transactions in the original set should have been purged.
569578
for _, tx := range added1 {
579+
// Check that it was added to the evictedTxCache
580+
evicted := txmp.WasRecentlyEvicted(tx.tx.Key())
581+
require.True(t, evicted)
582+
570583
if txmp.store.has(tx.tx.Key()) {
571584
t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key())
572585
}

mempool/cat/reactor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
318318
txKey[:],
319319
schema.Download,
320320
)
321-
tx, has := memR.mempool.Get(txKey)
321+
tx, has := memR.mempool.GetTxByKey(txKey)
322322
if has && !memR.opts.ListenOnly {
323323
peerID := memR.ids.GetIDForPeer(e.Src.ID())
324324
memR.Logger.Debug("sending a tx in response to a want msg", "peer", peerID)

mempool/cat/store.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,23 @@ func (s *store) getTxsBelowPriority(priority int64) ([]*wrappedTx, int64) {
141141
}
142142

143143
// purgeExpiredTxs removes all transactions that are older than the given height
144-
// and time. Returns the amount of transactions that were removed
145-
func (s *store) purgeExpiredTxs(expirationHeight int64, expirationAge time.Time) int {
144+
// and time. Returns the purged txs and amount of transactions that were purged.
145+
func (s *store) purgeExpiredTxs(expirationHeight int64, expirationAge time.Time) ([]*wrappedTx, int) {
146146
s.mtx.Lock()
147147
defer s.mtx.Unlock()
148+
149+
var purgedTxs []*wrappedTx
148150
counter := 0
151+
149152
for key, tx := range s.txs {
150153
if tx.height < expirationHeight || tx.timestamp.Before(expirationAge) {
151154
s.bytes -= tx.size()
152155
delete(s.txs, key)
156+
purgedTxs = append(purgedTxs, tx)
153157
counter++
154158
}
155159
}
156-
return counter
160+
return purgedTxs, counter
157161
}
158162

159163
func (s *store) reset() {

0 commit comments

Comments
 (0)