Skip to content
This repository was archived by the owner on Oct 20, 2024. It is now read-only.

Commit 059f533

Browse files
authored
Optimize CPU utilization on Bundler go-routine (#96)
1 parent 72655eb commit 059f533

File tree

4 files changed

+55
-5
lines changed

4 files changed

+55
-5
lines changed

internal/config/values.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ func GetValues() *Values {
5656
viper.SetDefault("erc4337_bundler_supported_entry_points", "0x0F46c65C17AA6b4102046935F33301f0510B163A")
5757
viper.SetDefault("erc4337_bundler_max_verification_gas", 1500000)
5858
viper.SetDefault("erc4337_bundler_max_ops_for_unstaked_sender", 4)
59-
viper.SetDefault("erc4337_bundler_debug_mode", false)
6059
viper.SetDefault("erc4337_bundler_blocks_in_the_future", 25)
60+
viper.SetDefault("erc4337_bundler_debug_mode", false)
6161
viper.SetDefault("erc4337_bundler_gin_mode", gin.ReleaseMode)
6262

6363
// Read in from .env file if available
@@ -84,6 +84,7 @@ func GetValues() *Values {
8484
_ = viper.BindEnv("erc4337_bundler_max_ops_for_unstaked_sender")
8585
_ = viper.BindEnv("erc4337_bundler_eth_builder_url")
8686
_ = viper.BindEnv("erc4337_bundler_blocks_in_the_future")
87+
_ = viper.BindEnv("erc4337_bundler_debug_mode")
8788
_ = viper.BindEnv("erc4337_bundler_gin_mode")
8889

8990
// Validate required variables

pkg/bundler/bundler.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type Bundler struct {
2424
logger logr.Logger
2525
isRunning bool
2626
stop chan bool
27+
watch chan bool
28+
onStop func()
2729
}
2830

2931
// New initializes a new EIP-4337 bundler which can be extended with modules for validating batches and
@@ -37,6 +39,8 @@ func New(mempool *mempool.Mempool, chainID *big.Int, supportedEntryPoints []comm
3739
logger: logger.NewZeroLogr().WithName("bundler"),
3840
isRunning: false,
3941
stop: make(chan bool),
42+
watch: make(chan bool),
43+
onStop: func() {},
4044
}
4145
}
4246

@@ -63,7 +67,7 @@ func (i *Bundler) Run() error {
6367
select {
6468
case <-i.stop:
6569
return
66-
default:
70+
case <-i.watch:
6771
for _, ep := range i.supportedEntryPoints {
6872
start := time.Now()
6973
l := logger.
@@ -115,6 +119,7 @@ func (i *Bundler) Run() error {
115119
}(i)
116120

117121
i.isRunning = true
122+
i.onStop = i.mempool.OnAdd(i.watch)
118123
return nil
119124
}
120125

@@ -126,4 +131,5 @@ func (i *Bundler) Stop() {
126131

127132
i.isRunning = false
128133
i.stop <- true
134+
i.onStop()
129135
}

pkg/mempool/instance.go

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,40 @@ package mempool
55
import (
66
badger "github.com/dgraph-io/badger/v3"
77
"github.com/ethereum/go-ethereum/common"
8+
"github.com/google/uuid"
89
"github.com/stackup-wallet/stackup-bundler/pkg/userop"
910
)
1011

12+
type watch struct {
13+
key string
14+
sig chan bool
15+
}
16+
1117
// Mempool provides read and write access to a pool of pending UserOperations which have passed all Client
1218
// checks.
1319
type Mempool struct {
14-
db *badger.DB
15-
queue *userOpQueues
20+
db *badger.DB
21+
queue *userOpQueues
22+
watches []watch
1623
}
1724

1825
// New creates an instance of a mempool that uses an embedded DB to persist and load UserOperations from disk
1926
// incase of a reset.
2027
func New(db *badger.DB) (*Mempool, error) {
2128
queue := newUserOpQueue()
29+
watches := []watch{}
2230
err := loadFromDisk(db, queue)
2331
if err != nil {
2432
return nil, err
2533
}
2634

27-
return &Mempool{db, queue}, nil
35+
return &Mempool{db, queue, watches}, nil
36+
}
37+
38+
func (m *Mempool) pushSignals() {
39+
for _, watch := range m.watches {
40+
watch.sig <- true
41+
}
2842
}
2943

3044
// GetOps returns all the UserOperations associated with an EntryPoint and Sender address.
@@ -49,6 +63,7 @@ func (m *Mempool) AddOp(entryPoint common.Address, op *userop.UserOperation) err
4963
}
5064

5165
m.queue.AddOp(entryPoint, op)
66+
m.pushSignals()
5267
return nil
5368
}
5469

@@ -77,6 +92,29 @@ func (m *Mempool) RemoveOps(entryPoint common.Address, ops ...*userop.UserOperat
7792
return nil
7893
}
7994

95+
// OnAdd allows entities to register a channel that will receive a signal every time a UserOperation is added.
96+
// It returns a function to stop receiving.
97+
func (m *Mempool) OnAdd(sig chan bool) func() {
98+
key := uuid.New().String()
99+
w := watch{key, sig}
100+
m.watches = append(m.watches, w)
101+
102+
// initial push if queue is non-empty
103+
if m.queue.opCount > 0 {
104+
w.sig <- true
105+
}
106+
107+
return func() {
108+
f := []watch{}
109+
for _, w := range m.watches {
110+
if w.key != key {
111+
f = append(f, w)
112+
}
113+
}
114+
m.watches = f
115+
}
116+
}
117+
80118
// Dump will return a list of UserOperations from the mempool by EntryPoint in the order it arrived.
81119
func (m *Mempool) Dump(entryPoint common.Address) ([]*userop.UserOperation, error) {
82120
return m.queue.All(entryPoint), nil

pkg/mempool/queue.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package mempool
22

33
import (
4+
"sync/atomic"
5+
46
"github.com/ethereum/go-ethereum/common"
57
"github.com/stackup-wallet/stackup-bundler/pkg/userop"
68
"github.com/wangjia184/sortedset"
@@ -24,6 +26,7 @@ func (s *set) getSenderSortedSet(sender common.Address) *sortedset.SortedSet {
2426

2527
type userOpQueues struct {
2628
maxBatchSize int
29+
opCount uint64
2730
setsByEntryPoint map[common.Address]*set
2831
}
2932

@@ -47,6 +50,7 @@ func (q *userOpQueues) AddOp(entryPoint common.Address, op *userop.UserOperation
4750
eps.all.AddOrUpdate(key, sortedset.SCORE(op.MaxPriorityFeePerGas.Int64()), op)
4851
eps.arrival.AddOrUpdate(key, sortedset.SCORE(eps.all.GetCount()), op)
4952
sss.AddOrUpdate(key, sortedset.SCORE(op.Nonce.Int64()), op)
53+
atomic.AddUint64(&q.opCount, 1)
5054
}
5155

5256
func (q *userOpQueues) GetOps(entryPoint common.Address, sender common.Address) []*userop.UserOperation {
@@ -101,6 +105,7 @@ func (q *userOpQueues) RemoveOps(entryPoint common.Address, ops ...*userop.UserO
101105
eps.arrival.Remove(key)
102106
sss.Remove(key)
103107
}
108+
atomic.AddUint64(&q.opCount, ^uint64(0))
104109
}
105110

106111
func newUserOpQueue() *userOpQueues {

0 commit comments

Comments
 (0)