Skip to content

eth: add logic to drop peers randomly when saturated #31476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
136d32d
p2p/connmanager: add connection manager to create some churn
cskiraly Mar 23, 2025
7a49bf0
p2p/connmanager: only drop from dialed peers
cskiraly Mar 24, 2025
91c1c30
p2p/connmanager: avoid dropping trusted peers
cskiraly Mar 24, 2025
75e4c26
p2p/connmanager: avoid dropping peers too early
cskiraly Mar 24, 2025
4635dac
p2p/connmanager: set meaningful defaults
cskiraly Mar 24, 2025
23cda63
p2p/peer: expose conn flags through getter functions
cskiraly Mar 25, 2025
ea8d05a
p2p/server: expose MaxInboundConns and MaxDialedConns
cskiraly Mar 25, 2025
e0b0189
eth/connmanager: move Connection Manager to package eth
cskiraly Mar 25, 2025
c41569d
eth/connmanager: use slices.DeleteFunc to filter in place
cskiraly Mar 25, 2025
61b26a9
eth/connman: fixup log levels
cskiraly Mar 25, 2025
628c5e5
eth/connmanager: get sync status
cskiraly Mar 25, 2025
cb5d672
eth/connmanager: no need to store srv
cskiraly Mar 25, 2025
301b396
eth/connmanager: monitor sync status
cskiraly Mar 26, 2025
77d634c
eth/connmanager: handle inbound and dialed peers separately
cskiraly Mar 26, 2025
d46ef40
fixing newlines
cskiraly Mar 28, 2025
8bb7f1e
eth/connmanager: randomize peer drop timers
cskiraly Mar 29, 2025
42d2c9b
eth: renaming Connection Manager to Dropper
cskiraly Apr 7, 2025
e9065ac
simplify rand usage
cskiraly Apr 10, 2025
5da26a9
eth/dropper: simplify cfg
cskiraly Apr 10, 2025
75c8ee1
eth/dropper: simplify code
cskiraly Apr 10, 2025
1647f51
eth/dropper: add metrics
cskiraly Apr 11, 2025
7a76bdd
eth/dropper: simplify sync status query
cskiraly Apr 11, 2025
4a69bf9
eth/dropper: fixing logs
cskiraly Apr 11, 2025
ff66b1c
eth/dropper: remove unused peerEvent channel
cskiraly Apr 11, 2025
2a9372e
eth/dropper: changing error code to DiscUselessPeer
cskiraly Apr 11, 2025
976e039
set doNotDropBefore to 10 minutes
cskiraly Apr 14, 2025
4961445
Update dropper.go
fjl Apr 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Ethereum struct {

handler *handler
discmix *enode.FairMix
connman *connManager

// DB interfaces
chainDb ethdb.Database // Block chain database
Expand Down Expand Up @@ -289,6 +290,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}

eth.connman = newConnManager(&connmanConfig{
maxDialPeers: eth.p2pServer.MaxDialedConns(),
maxInboundPeers: eth.p2pServer.MaxInboundConns(),
})

eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.miner.SetPrioAddresses(config.TxPool.Locals)
Expand Down Expand Up @@ -399,6 +405,9 @@ func (s *Ethereum) Start() error {
// Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers)

// Start the connection manager
s.connman.Start(s.p2pServer, func() bool { return !s.Synced() })

// start log indexer
s.filterMaps.Start()
go s.updateFilterMapsHeads()
Expand Down Expand Up @@ -500,6 +509,7 @@ func (s *Ethereum) setupDiscovery() error {
func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first.
s.discmix.Close()
s.connman.Stop()
s.handler.Stop()

// Then stop everything else.
Expand Down
254 changes: 254 additions & 0 deletions eth/connmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package eth

import (
crand "crypto/rand"
"encoding/binary"
mrand "math/rand"
"slices"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
)

const (
// Interval between peer drop events (uniform between min and max)
peerDropIntervalMin = 3 * time.Minute
// Interval between peer drop events (uniform between min and max)
peerDropIntervalMax = 7 * time.Minute
// Avoid dropping peers for some time after connection
doNotDropBefore = 2 * peerDropIntervalMax
// How close to max should we initiate the drop timer. O should be fine,
// dropping when no more peers can be added. Larger numbers result in more
// aggressive drop behavior.
peerDropThreshold = 0
// Sync status poll interval (no need to be too reactive here)
syncCheckInterval = 60 * time.Second
)

// connManager monitors the state of the peer pool and makes changes as follows:
// - during sync the Downloader handles peer connections co connManager is disabled
// - if not syncing and the peer count is close to the limit, it drops peers
// randomly every peerDropInterval to make space for new peers
// - peers are dropped separately from the inboud pool and from the dialed pool
type connManager struct {
connmanConfig
peersFunc getPeersFunc
syncingFunc getSyncingFunc

// The peerDrop timers introduce churn if we are close to limit capacity.
// We handle Dialed and Inbound connections separately
peerDropDialedTimer *mclock.Alarm
peerDropInboundTimer *mclock.Alarm

peerEventCh chan *p2p.PeerEvent // channel for peer event changes
sub event.Subscription // subscription to peerEventCh

wg sync.WaitGroup // wg for graceful shutdown
shutdownCh chan struct{}
}

// Callback type to get the list of connected peers.
type getPeersFunc func() []*p2p.Peer

// Callback type to get syncing status.
// Returns true while syncing, false when synced.
type getSyncingFunc func() bool

type connmanConfig struct {
maxDialPeers int // maximum number of dialed peers
maxInboundPeers int // maximum number of inbound peers
log log.Logger
clock mclock.Clock
rand *mrand.Rand
}

func (cfg connmanConfig) withDefaults() connmanConfig {
if cfg.log == nil {
cfg.log = log.Root()
}
if cfg.clock == nil {
cfg.clock = mclock.System{}
}
if cfg.rand == nil {
seedb := make([]byte, 8)
crand.Read(seedb)
seed := int64(binary.BigEndian.Uint64(seedb))
cfg.rand = mrand.New(mrand.NewSource(seed))
}
return cfg
}

func newConnManager(config *connmanConfig) *connManager {
cfg := config.withDefaults()
cm := &connManager{
connmanConfig: cfg,
peerDropDialedTimer: mclock.NewAlarm(cfg.clock),
peerDropInboundTimer: mclock.NewAlarm(cfg.clock),
peerEventCh: make(chan *p2p.PeerEvent),
shutdownCh: make(chan struct{}),
}
if peerDropIntervalMin > peerDropIntervalMax {
panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration")
}
cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold,
"intervalMin", peerDropIntervalMin, "intervalMax", peerDropIntervalMax)
return cm
}

// Start the connection manager.
func (cm *connManager) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
cm.wg.Add(1)
cm.peersFunc = srv.Peers
cm.syncingFunc = syncingFunc
cm.sub = srv.SubscribeEvents(cm.peerEventCh)
go cm.loop()
}

// Stop the connection manager.
func (cm *connManager) Stop() {
cm.sub.Unsubscribe()
cm.peerDropInboundTimer.Stop()
cm.peerDropDialedTimer.Stop()
close(cm.shutdownCh)
cm.wg.Wait()
}

// numPeers returns the current number of peers and its breakdown (dialed or inbound).
func (cm *connManager) numPeers() (numPeers int, numDialed int, numInbound int) {
peers := cm.peersFunc()
dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound)
return len(peers), len(dialed), len(peers) - len(dialed)
}

// dropRandomPeer selects one of the peers randomly and drops it from the peer pool.
func (cm *connManager) dropRandomPeer(dialed bool) bool {
peers := cm.peersFunc()

selectDoNotDrop := func(p *p2p.Peer) bool {
if dialed {
// Only drop from dyndialed peers. Avoid dropping trusted peers.
// Give some time to peers before considering them for a drop.
return !p.DynDialed() ||
p.Trusted() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore)
} else {
// Only drop from inbound peers. Avoid dropping trusted peers.
// Give some time to peers before considering them for a drop.
return p.DynDialed() || p.StaticDialed() ||
p.Trusted() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore)
}
}
droppable := slices.DeleteFunc(peers, selectDoNotDrop)
if len(droppable) > 0 {
p := droppable[cm.rand.Intn(len(droppable))]
cm.log.Debug("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()),
"dialed", dialed, "peercountbefore", len(peers))
p.Disconnect(p2p.DiscTooManyPeers)
return true
}
return false
}

// randomDuration generates a random duration between min and max.
// TODO: maybe we should move this to a common utlity package.
// TODO: panic might be too harsh, maybe return an error.
func randomDuration(rand *mrand.Rand, min, max time.Duration) time.Duration {
if min > max {
panic("min duration must be less than or equal to max duration")
}
nanos := rand.Int63n(max.Nanoseconds()-min.Nanoseconds()) + min.Nanoseconds()
return time.Duration(nanos)
}

// updatePeerDropTimers checks and starts/stops the timer for peer drop.
func (cm *connManager) updatePeerDropTimers(syncing bool) {
numPeers, numDialed, numInbound := cm.numPeers()
cm.log.Trace("ConnManager status", "syncing", syncing,
"peers", numPeers, "out", numDialed, "in", numInbound,
"maxout", cm.maxDialPeers, "maxin", cm.maxInboundPeers)

if !syncing {
// If a drop was already scheduled, Schedule does nothing.
if cm.maxDialPeers-numDialed <= peerDropThreshold {
interval := randomDuration(cm.rand, peerDropIntervalMin, peerDropIntervalMax)
cm.peerDropDialedTimer.Schedule(cm.clock.Now().Add(interval))
} else {
cm.peerDropDialedTimer.Stop()
}

if cm.maxInboundPeers-numInbound <= peerDropThreshold {
interval := randomDuration(cm.rand, peerDropIntervalMin, peerDropIntervalMax)
cm.peerDropInboundTimer.Schedule(cm.clock.Now().Add(interval))
} else {
cm.peerDropInboundTimer.Stop()
}
} else {
// Downloader is managing connections while syncing.
cm.peerDropDialedTimer.Stop()
cm.peerDropInboundTimer.Stop()
}
}

// loop is the main loop of the connection manager.
func (cm *connManager) loop() {
defer cm.wg.Done()

// Set up periodic timer to pull syncing status.
// We could get syncing status in a few ways:
// - poll the sync status (we use this for now)
// - subscribe to Downloader.mux
// - subscribe to DownloaderAPI (which itself polls the sync status)
syncing := cm.syncingFunc()
cm.log.Trace("Sync status", "syncing", syncing)
syncCheckTimer := mclock.NewAlarm(cm.connmanConfig.clock)
syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval))
defer syncCheckTimer.Stop()

for {
select {
case <-syncCheckTimer.C():
// Update info about syncing status, and rearm the timers.
syncingNew := cm.syncingFunc()
if syncing != syncingNew {
// Syncing status changed, we might need to update the timers.
cm.log.Trace("Sync status changed", "syncing", syncingNew)
syncing = syncingNew
cm.updatePeerDropTimers(syncing)
}
syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval))
case ev := <-cm.peerEventCh:
if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop {
// Number of peers changed, we might need to start the timers.
cm.updatePeerDropTimers(syncing)
}
case <-cm.peerDropDialedTimer.C():
cm.dropRandomPeer(true)
case <-cm.peerDropInboundTimer.C():
cm.dropRandomPeer(false)
case <-cm.shutdownCh:
return
}
}
}
26 changes: 25 additions & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,35 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer %x %v", id[:8], p.RemoteAddr())
}

// Inbound returns true if the peer is an inbound connection
// Inbound returns true if the peer is an inbound (not dialed) connection.
func (p *Peer) Inbound() bool {
return p.rw.is(inboundConn)
}

// Trusted returns true if the peer is configured as trusted.
// Trusted peers are accepted in above the MaxInboundConns limit.
// The peer can be either inbound or dialed.
func (p *Peer) Trusted() bool {
return p.rw.is(trustedConn)
}

// DynDialed returns true if the peer was dialed successfully (passed handshake) and
// it is not configured as static.
func (p *Peer) DynDialed() bool {
return p.rw.is(dynDialedConn)
}

// StaticDialed returns true if the peer was dialed successfully (passed handshake) and
// it is configured as static.
func (p *Peer) StaticDialed() bool {
return p.rw.is(staticDialedConn)
}

// Lifetime returns the time since peer creation.
func (p *Peer) Lifetime() mclock.AbsTime {
return mclock.Now() - p.created
}

func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
Expand Down
10 changes: 5 additions & 5 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (srv *Server) setupDiscovery() error {
func (srv *Server) setupDialScheduler() {
config := dialConfig{
self: srv.localnode.ID(),
maxDialPeers: srv.maxDialedConns(),
maxDialPeers: srv.MaxDialedConns(),
maxActiveDials: srv.MaxPendingPeers,
log: srv.Logger,
netRestrict: srv.NetRestrict,
Expand All @@ -527,11 +527,11 @@ func (srv *Server) setupDialScheduler() {
}
}

func (srv *Server) maxInboundConns() int {
return srv.MaxPeers - srv.maxDialedConns()
func (srv *Server) MaxInboundConns() int {
return srv.MaxPeers - srv.MaxDialedConns()
}

func (srv *Server) maxDialedConns() (limit int) {
func (srv *Server) MaxDialedConns() (limit int) {
if srv.NoDial || srv.MaxPeers == 0 {
return 0
}
Expand Down Expand Up @@ -736,7 +736,7 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in
switch {
case !c.is(trustedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.MaxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
Expand Down