Skip to content

Commit ed94e5f

Browse files
cskiralyfjl
authored andcommitted
eth: add logic to drop peers randomly when saturated (ethereum#31476)
As of now, Geth disconnects peers only on protocol error or timeout, meaning once connection slots are filled, the peerset is largely fixed. As mentioned in ethereum#31321, Geth should occasionally disconnect peers to ensure some churn. What/when to disconnect could depend on: - the state of geth (e.g. sync or not) - current number of peers - peer level metrics This PR adds a very slow churn using a random drop. --------- Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent ca29804 commit ed94e5f

File tree

4 files changed

+204
-6
lines changed

4 files changed

+204
-6
lines changed

eth/backend.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type Ethereum struct {
7676

7777
handler *handler
7878
discmix *enode.FairMix
79+
dropper *dropper
7980

8081
// DB interfaces
8182
chainDb ethdb.Database // Block chain database
@@ -300,6 +301,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
300301
return nil, err
301302
}
302303

304+
eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns())
305+
303306
eth.miner = miner.New(eth, config.Miner, eth.engine)
304307
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
305308
eth.miner.SetPrioAddresses(config.TxPool.Locals)
@@ -410,6 +413,9 @@ func (s *Ethereum) Start() error {
410413
// Start the networking layer
411414
s.handler.Start(s.p2pServer.MaxPeers)
412415

416+
// Start the connection manager
417+
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })
418+
413419
// start log indexer
414420
s.filterMaps.Start()
415421
go s.updateFilterMapsHeads()
@@ -511,6 +517,7 @@ func (s *Ethereum) setupDiscovery() error {
511517
func (s *Ethereum) Stop() error {
512518
// Stop all the peer-related stuff first.
513519
s.discmix.Close()
520+
s.dropper.Stop()
514521
s.handler.Stop()
515522

516523
// Then stop everything else.

eth/dropper.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2025 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package eth
18+
19+
import (
20+
mrand "math/rand"
21+
"slices"
22+
"sync"
23+
"time"
24+
25+
"github.com/ethereum/go-ethereum/common"
26+
"github.com/ethereum/go-ethereum/common/mclock"
27+
"github.com/ethereum/go-ethereum/log"
28+
"github.com/ethereum/go-ethereum/metrics"
29+
"github.com/ethereum/go-ethereum/p2p"
30+
)
31+
32+
const (
33+
// Interval between peer drop events (uniform between min and max)
34+
peerDropIntervalMin = 3 * time.Minute
35+
// Interval between peer drop events (uniform between min and max)
36+
peerDropIntervalMax = 7 * time.Minute
37+
// Avoid dropping peers for some time after connection
38+
doNotDropBefore = 10 * time.Minute
39+
// How close to max should we initiate the drop timer. O should be fine,
40+
// dropping when no more peers can be added. Larger numbers result in more
41+
// aggressive drop behavior.
42+
peerDropThreshold = 0
43+
)
44+
45+
var (
46+
// droppedInbound is the number of inbound peers dropped
47+
droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil)
48+
// droppedOutbound is the number of outbound peers dropped
49+
droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil)
50+
)
51+
52+
// dropper monitors the state of the peer pool and makes changes as follows:
53+
// - during sync the Downloader handles peer connections, so dropper is disabled
54+
// - if not syncing and the peer count is close to the limit, it drops peers
55+
// randomly every peerDropInterval to make space for new peers
56+
// - peers are dropped separately from the inboud pool and from the dialed pool
57+
type dropper struct {
58+
maxDialPeers int // maximum number of dialed peers
59+
maxInboundPeers int // maximum number of inbound peers
60+
peersFunc getPeersFunc
61+
syncingFunc getSyncingFunc
62+
63+
// peerDropTimer introduces churn if we are close to limit capacity.
64+
// We handle Dialed and Inbound connections separately
65+
peerDropTimer *time.Timer
66+
67+
wg sync.WaitGroup // wg for graceful shutdown
68+
shutdownCh chan struct{}
69+
}
70+
71+
// Callback type to get the list of connected peers.
72+
type getPeersFunc func() []*p2p.Peer
73+
74+
// Callback type to get syncing status.
75+
// Returns true while syncing, false when synced.
76+
type getSyncingFunc func() bool
77+
78+
func newDropper(maxDialPeers, maxInboundPeers int) *dropper {
79+
cm := &dropper{
80+
maxDialPeers: maxDialPeers,
81+
maxInboundPeers: maxInboundPeers,
82+
peerDropTimer: time.NewTimer(randomDuration(peerDropIntervalMin, peerDropIntervalMax)),
83+
shutdownCh: make(chan struct{}),
84+
}
85+
if peerDropIntervalMin > peerDropIntervalMax {
86+
panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration")
87+
}
88+
return cm
89+
}
90+
91+
// Start the dropper.
92+
func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
93+
cm.peersFunc = srv.Peers
94+
cm.syncingFunc = syncingFunc
95+
cm.wg.Add(1)
96+
go cm.loop()
97+
}
98+
99+
// Stop the dropper.
100+
func (cm *dropper) Stop() {
101+
cm.peerDropTimer.Stop()
102+
close(cm.shutdownCh)
103+
cm.wg.Wait()
104+
}
105+
106+
// dropRandomPeer selects one of the peers randomly and drops it from the peer pool.
107+
func (cm *dropper) dropRandomPeer() bool {
108+
peers := cm.peersFunc()
109+
var numInbound int
110+
for _, p := range peers {
111+
if p.Inbound() {
112+
numInbound++
113+
}
114+
}
115+
numDialed := len(peers) - numInbound
116+
117+
selectDoNotDrop := func(p *p2p.Peer) bool {
118+
// Avoid dropping trusted and static peers, or recent peers.
119+
// Only drop peers if their respective category (dialed/inbound)
120+
// is close to limit capacity.
121+
return p.Trusted() || p.StaticDialed() ||
122+
p.Lifetime() < mclock.AbsTime(doNotDropBefore) ||
123+
(p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) ||
124+
(p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold)
125+
}
126+
127+
droppable := slices.DeleteFunc(peers, selectDoNotDrop)
128+
if len(droppable) > 0 {
129+
p := droppable[mrand.Intn(len(droppable))]
130+
log.Debug("Dropping random peer", "inbound", p.Inbound(),
131+
"id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers))
132+
p.Disconnect(p2p.DiscUselessPeer)
133+
if p.Inbound() {
134+
droppedInbound.Mark(1)
135+
} else {
136+
droppedOutbound.Mark(1)
137+
}
138+
return true
139+
}
140+
return false
141+
}
142+
143+
// randomDuration generates a random duration between min and max.
144+
func randomDuration(min, max time.Duration) time.Duration {
145+
if min > max {
146+
panic("min duration must be less than or equal to max duration")
147+
}
148+
return time.Duration(mrand.Int63n(int64(max-min)) + int64(min))
149+
}
150+
151+
// loop is the main loop of the connection dropper.
152+
func (cm *dropper) loop() {
153+
defer cm.wg.Done()
154+
155+
for {
156+
select {
157+
case <-cm.peerDropTimer.C:
158+
// Drop a random peer if we are not syncing and the peer count is close to the limit.
159+
if !cm.syncingFunc() {
160+
cm.dropRandomPeer()
161+
}
162+
cm.peerDropTimer.Reset(randomDuration(peerDropIntervalMin, peerDropIntervalMax))
163+
case <-cm.shutdownCh:
164+
return
165+
}
166+
}
167+
}

p2p/peer.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,35 @@ func (p *Peer) String() string {
220220
return fmt.Sprintf("Peer %x %v", id[:8], p.RemoteAddr())
221221
}
222222

223-
// Inbound returns true if the peer is an inbound connection
223+
// Inbound returns true if the peer is an inbound (not dialed) connection.
224224
func (p *Peer) Inbound() bool {
225225
return p.rw.is(inboundConn)
226226
}
227227

228+
// Trusted returns true if the peer is configured as trusted.
229+
// Trusted peers are accepted in above the MaxInboundConns limit.
230+
// The peer can be either inbound or dialed.
231+
func (p *Peer) Trusted() bool {
232+
return p.rw.is(trustedConn)
233+
}
234+
235+
// DynDialed returns true if the peer was dialed successfully (passed handshake) and
236+
// it is not configured as static.
237+
func (p *Peer) DynDialed() bool {
238+
return p.rw.is(dynDialedConn)
239+
}
240+
241+
// StaticDialed returns true if the peer was dialed successfully (passed handshake) and
242+
// it is configured as static.
243+
func (p *Peer) StaticDialed() bool {
244+
return p.rw.is(staticDialedConn)
245+
}
246+
247+
// Lifetime returns the time since peer creation.
248+
func (p *Peer) Lifetime() mclock.AbsTime {
249+
return mclock.Now() - p.created
250+
}
251+
228252
func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
229253
protomap := matchProtocols(protocols, conn.caps, conn)
230254
p := &Peer{

p2p/server.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ func (srv *Server) setupDiscovery() error {
508508
func (srv *Server) setupDialScheduler() {
509509
config := dialConfig{
510510
self: srv.localnode.ID(),
511-
maxDialPeers: srv.maxDialedConns(),
511+
maxDialPeers: srv.MaxDialedConns(),
512512
maxActiveDials: srv.MaxPendingPeers,
513513
log: srv.Logger,
514514
netRestrict: srv.NetRestrict,
@@ -527,11 +527,11 @@ func (srv *Server) setupDialScheduler() {
527527
}
528528
}
529529

530-
func (srv *Server) maxInboundConns() int {
531-
return srv.MaxPeers - srv.maxDialedConns()
530+
func (srv *Server) MaxInboundConns() int {
531+
return srv.MaxPeers - srv.MaxDialedConns()
532532
}
533533

534-
func (srv *Server) maxDialedConns() (limit int) {
534+
func (srv *Server) MaxDialedConns() (limit int) {
535535
if srv.NoDial || srv.MaxPeers == 0 {
536536
return 0
537537
}
@@ -736,7 +736,7 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in
736736
switch {
737737
case !c.is(trustedConn) && len(peers) >= srv.MaxPeers:
738738
return DiscTooManyPeers
739-
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
739+
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.MaxInboundConns():
740740
return DiscTooManyPeers
741741
case peers[c.node.ID()] != nil:
742742
return DiscAlreadyConnected

0 commit comments

Comments
 (0)