Skip to content

Commit 6aac2ab

Browse files
tac0turtleMarko BaricevicMarko Baricevic
authored
feat: add CAT mempool (#1708)
Closes #1711 ## Description This pr adds the cat mempool --------- Co-authored-by: Marko Baricevic <markobaricevic@Markos-MacBook-Pro.local> Co-authored-by: Marko Baricevic <markobaricevic@MacBookPro.localdomain>
1 parent e95ebaa commit 6aac2ab

File tree

21 files changed

+4004
-45
lines changed

21 files changed

+4004
-45
lines changed

config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -819,7 +819,7 @@ type MempoolConfig struct {
819819
// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
820820
func DefaultMempoolConfig() *MempoolConfig {
821821
return &MempoolConfig{
822-
Type: MempoolTypeFlood,
822+
Type: MempoolTypePriority,
823823
Recheck: true,
824824
RecheckTimeout: 1000 * time.Millisecond,
825825
Broadcast: true,
@@ -858,7 +858,7 @@ func (cfg *MempoolConfig) WalEnabled() bool {
858858
// returns an error if any check fails.
859859
func (cfg *MempoolConfig) ValidateBasic() error {
860860
switch cfg.Type {
861-
case MempoolTypeFlood, MempoolTypeNop, MempoolTypePriority, LegacyMempoolTypePriority, LegacyMempoolTypeFlood:
861+
case MempoolTypeFlood, MempoolTypeNop, MempoolTypePriority, LegacyMempoolTypePriority, LegacyMempoolTypeFlood, MempoolTypeCAT, LegacyMempoolTypeCAT:
862862
case "": // allow empty string to be backwards compatible
863863
default:
864864
return fmt.Errorf("unknown mempool type: %q", cfg.Type)

config/toml.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,12 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
343343
#
344344
# Possible types:
345345
# - "flood" : concurrent linked list mempool with flooding gossip protocol
346-
# (default)
347346
# - "nop" : nop-mempool (short for no operation; the ABCI app is responsible
348347
# for storing, disseminating and proposing txs). "create_empty_blocks=false" is
349348
# not supported.
349+
# - "priority" : concurrent linked list mempool with priority queue (default)
350350
# - "cat" : content addressable mempool
351-
type = "flood"
351+
type = "priority"
352352
353353
# Recheck (default: true) defines whether CometBFT should recheck the
354354
# validity for all remaining transaction in the mempool after a block.
@@ -406,6 +406,22 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }}
406406
# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
407407
max_batch_bytes = {{ .Mempool.MaxBatchBytes }}
408408
409+
# ttl-duration, if non-zero, defines the maximum amount of time a transaction
410+
# can exist for in the mempool.
411+
#
412+
# Note, if ttl-num-blocks is also defined, a transaction will be removed if it
413+
# has existed in the mempool at least ttl-num-blocks number of blocks or if it's
414+
# insertion time into the mempool is beyond ttl-duration.
415+
ttl-duration = "{{ .Mempool.TTLDuration }}"
416+
417+
# ttl-num-blocks, if non-zero, defines the maximum number of blocks a transaction
418+
# can exist for in the mempool.
419+
#
420+
# Note, if ttl-duration is also defined, a transaction will be removed if it
421+
# has existed in the mempool at least ttl-num-blocks number of blocks or if
422+
# it's insertion time into the mempool is beyond ttl-duration.
423+
ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
424+
409425
# Experimental parameters to limit gossiping txs to up to the specified number of peers.
410426
# We use two independent upper values for persistent and non-persistent peers.
411427
# Unconditional peers are not affected by this feature.

mempool/cat/cache.go

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package cat
2+
3+
import (
4+
"container/list"
5+
"time"
6+
7+
tmsync "github.com/cometbft/cometbft/libs/sync"
8+
"github.com/cometbft/cometbft/types"
9+
)
10+
11+
// LRUTxCache maintains a thread-safe LRU cache of raw transactions. The cache
12+
// only stores the hash of the raw transaction.
13+
// NOTE: This has been copied from mempool/cache with the main diffence of using
14+
// tx keys instead of raw transactions.
15+
type LRUTxCache struct {
16+
staticSize int
17+
18+
mtx tmsync.Mutex
19+
// cacheMap is used as a quick look up table
20+
cacheMap map[types.TxKey]*list.Element
21+
// list is a doubly linked list used to capture the FIFO nature of the cache
22+
list *list.List
23+
}
24+
25+
func NewLRUTxCache(cacheSize int) *LRUTxCache {
26+
return &LRUTxCache{
27+
staticSize: cacheSize,
28+
cacheMap: make(map[types.TxKey]*list.Element, cacheSize),
29+
list: list.New(),
30+
}
31+
}
32+
33+
func (c *LRUTxCache) Reset() {
34+
c.mtx.Lock()
35+
defer c.mtx.Unlock()
36+
37+
c.cacheMap = make(map[types.TxKey]*list.Element, c.staticSize)
38+
c.list.Init()
39+
}
40+
41+
func (c *LRUTxCache) Push(txKey types.TxKey) bool {
42+
if c.staticSize == 0 {
43+
return true
44+
}
45+
46+
c.mtx.Lock()
47+
defer c.mtx.Unlock()
48+
49+
moved, ok := c.cacheMap[txKey]
50+
if ok {
51+
c.list.MoveToBack(moved)
52+
return false
53+
}
54+
55+
if c.list.Len() >= c.staticSize {
56+
front := c.list.Front()
57+
if front != nil {
58+
frontKey := front.Value.(types.TxKey)
59+
delete(c.cacheMap, frontKey)
60+
c.list.Remove(front)
61+
}
62+
}
63+
64+
e := c.list.PushBack(txKey)
65+
c.cacheMap[txKey] = e
66+
67+
return true
68+
}
69+
70+
func (c *LRUTxCache) Remove(txKey types.TxKey) {
71+
if c.staticSize == 0 {
72+
return
73+
}
74+
75+
c.mtx.Lock()
76+
defer c.mtx.Unlock()
77+
78+
e := c.cacheMap[txKey]
79+
delete(c.cacheMap, txKey)
80+
81+
if e != nil {
82+
c.list.Remove(e)
83+
}
84+
}
85+
86+
func (c *LRUTxCache) Has(txKey types.TxKey) bool {
87+
if c.staticSize == 0 {
88+
return false
89+
}
90+
91+
c.mtx.Lock()
92+
defer c.mtx.Unlock()
93+
94+
_, ok := c.cacheMap[txKey]
95+
return ok
96+
}
97+
98+
// SeenTxSet records transactions that have been
99+
// seen by other peers but not yet by us
100+
type SeenTxSet struct {
101+
mtx tmsync.Mutex
102+
set map[types.TxKey]timestampedPeerSet
103+
}
104+
105+
type timestampedPeerSet struct {
106+
peers map[uint16]struct{}
107+
time time.Time
108+
}
109+
110+
func NewSeenTxSet() *SeenTxSet {
111+
return &SeenTxSet{
112+
set: make(map[types.TxKey]timestampedPeerSet),
113+
}
114+
}
115+
116+
func (s *SeenTxSet) Add(txKey types.TxKey, peer uint16) {
117+
if peer == 0 {
118+
return
119+
}
120+
s.mtx.Lock()
121+
defer s.mtx.Unlock()
122+
seenSet, exists := s.set[txKey]
123+
if !exists {
124+
s.set[txKey] = timestampedPeerSet{
125+
peers: map[uint16]struct{}{peer: {}},
126+
time: time.Now().UTC(),
127+
}
128+
} else {
129+
seenSet.peers[peer] = struct{}{}
130+
}
131+
}
132+
133+
func (s *SeenTxSet) RemoveKey(txKey types.TxKey) {
134+
s.mtx.Lock()
135+
defer s.mtx.Unlock()
136+
delete(s.set, txKey)
137+
}
138+
139+
func (s *SeenTxSet) Remove(txKey types.TxKey, peer uint16) {
140+
s.mtx.Lock()
141+
defer s.mtx.Unlock()
142+
set, exists := s.set[txKey]
143+
if exists {
144+
if len(set.peers) == 1 {
145+
delete(s.set, txKey)
146+
} else {
147+
delete(set.peers, peer)
148+
}
149+
}
150+
}
151+
152+
func (s *SeenTxSet) RemovePeer(peer uint16) {
153+
s.mtx.Lock()
154+
defer s.mtx.Unlock()
155+
for key, seenSet := range s.set {
156+
delete(seenSet.peers, peer)
157+
if len(seenSet.peers) == 0 {
158+
delete(s.set, key)
159+
}
160+
}
161+
}
162+
163+
func (s *SeenTxSet) Prune(limit time.Time) {
164+
s.mtx.Lock()
165+
defer s.mtx.Unlock()
166+
for key, seenSet := range s.set {
167+
if seenSet.time.Before(limit) {
168+
delete(s.set, key)
169+
}
170+
}
171+
}
172+
173+
func (s *SeenTxSet) Has(txKey types.TxKey, peer uint16) bool {
174+
s.mtx.Lock()
175+
defer s.mtx.Unlock()
176+
seenSet, exists := s.set[txKey]
177+
if !exists {
178+
return false
179+
}
180+
_, has := seenSet.peers[peer]
181+
return has
182+
}
183+
184+
func (s *SeenTxSet) Get(txKey types.TxKey) map[uint16]struct{} {
185+
s.mtx.Lock()
186+
defer s.mtx.Unlock()
187+
seenSet, exists := s.set[txKey]
188+
if !exists {
189+
return nil
190+
}
191+
// make a copy of the struct to avoid concurrency issues
192+
peers := make(map[uint16]struct{}, len(seenSet.peers))
193+
for peer := range seenSet.peers {
194+
peers[peer] = struct{}{}
195+
}
196+
return peers
197+
}
198+
199+
// Len returns the amount of cached items. Mostly used for testing.
200+
func (s *SeenTxSet) Len() int {
201+
s.mtx.Lock()
202+
defer s.mtx.Unlock()
203+
return len(s.set)
204+
}
205+
206+
func (s *SeenTxSet) Reset() {
207+
s.mtx.Lock()
208+
defer s.mtx.Unlock()
209+
s.set = make(map[types.TxKey]timestampedPeerSet)
210+
}

0 commit comments

Comments
 (0)