-
Notifications
You must be signed in to change notification settings - Fork 20.9k
eth: add logic to drop peers randomly when saturated #31476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
136d32d
p2p/connmanager: add connection manager to create some churn
cskiraly 7a49bf0
p2p/connmanager: only drop from dialed peers
cskiraly 91c1c30
p2p/connmanager: avoid dropping trusted peers
cskiraly 75e4c26
p2p/connmanager: avoid dropping peers too early
cskiraly 4635dac
p2p/connmanager: set meaningful defaults
cskiraly 23cda63
p2p/peer: expose conn flags through getter functions
cskiraly ea8d05a
p2p/server: expose MaxInboundConns and MaxDialedConns
cskiraly e0b0189
eth/connmanager: move Connection Manager to package eth
cskiraly c41569d
eth/connmanager: use slices.DeleteFunc to filter in place
cskiraly 61b26a9
eth/connman: fixup log levels
cskiraly 628c5e5
eth/connmanager: get sync status
cskiraly cb5d672
eth/connmanager: no need to store srv
cskiraly 301b396
eth/connmanager: monitor sync status
cskiraly 77d634c
eth/connmanager: handle inbound and dialed peers separately
cskiraly d46ef40
fixing newlines
cskiraly 8bb7f1e
eth/connmanager: randomize peer drop timers
cskiraly 42d2c9b
eth: renaming Connection Manager to Dropper
cskiraly e9065ac
simplify rand usage
cskiraly 5da26a9
eth/dropper: simplify cfg
cskiraly 75c8ee1
eth/dropper: simplify code
cskiraly 1647f51
eth/dropper: add metrics
cskiraly 7a76bdd
eth/dropper: simplify sync status query
cskiraly 4a69bf9
eth/dropper: fixing logs
cskiraly ff66b1c
eth/dropper: remove unused peerEvent channel
cskiraly 2a9372e
eth/dropper: changing error code to DiscUselessPeer
cskiraly 976e039
set doNotDropBefore to 10 minutes
cskiraly 4961445
Update dropper.go
fjl File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
// Copyright 2025 The go-ethereum Authors | ||
// This file is part of the go-ethereum library. | ||
// | ||
// The go-ethereum library is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU Lesser General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
// | ||
// The go-ethereum library is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU Lesser General Public License for more details. | ||
// | ||
// You should have received a copy of the GNU Lesser General Public License | ||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
package eth | ||
|
||
import ( | ||
mrand "math/rand" | ||
"slices" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/ethereum/go-ethereum/common/mclock" | ||
"github.com/ethereum/go-ethereum/log" | ||
"github.com/ethereum/go-ethereum/metrics" | ||
"github.com/ethereum/go-ethereum/p2p" | ||
) | ||
|
||
const ( | ||
// Interval between peer drop events (uniform between min and max) | ||
peerDropIntervalMin = 3 * time.Minute | ||
// Interval between peer drop events (uniform between min and max) | ||
peerDropIntervalMax = 7 * time.Minute | ||
// Avoid dropping peers for some time after connection | ||
doNotDropBefore = 10 * time.Minute | ||
// How close to max should we initiate the drop timer. O should be fine, | ||
// dropping when no more peers can be added. Larger numbers result in more | ||
// aggressive drop behavior. | ||
peerDropThreshold = 0 | ||
) | ||
|
||
var ( | ||
// droppedInbound is the number of inbound peers dropped | ||
droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil) | ||
// droppedOutbound is the number of outbound peers dropped | ||
droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil) | ||
) | ||
|
||
// dropper monitors the state of the peer pool and makes changes as follows: | ||
// - during sync the Downloader handles peer connections, so dropper is disabled | ||
// - if not syncing and the peer count is close to the limit, it drops peers | ||
// randomly every peerDropInterval to make space for new peers | ||
// - peers are dropped separately from the inboud pool and from the dialed pool | ||
type dropper struct { | ||
maxDialPeers int // maximum number of dialed peers | ||
maxInboundPeers int // maximum number of inbound peers | ||
peersFunc getPeersFunc | ||
syncingFunc getSyncingFunc | ||
|
||
// peerDropTimer introduces churn if we are close to limit capacity. | ||
// We handle Dialed and Inbound connections separately | ||
peerDropTimer *time.Timer | ||
|
||
wg sync.WaitGroup // wg for graceful shutdown | ||
shutdownCh chan struct{} | ||
} | ||
|
||
// Callback type to get the list of connected peers. | ||
type getPeersFunc func() []*p2p.Peer | ||
|
||
// Callback type to get syncing status. | ||
// Returns true while syncing, false when synced. | ||
type getSyncingFunc func() bool | ||
|
||
func newDropper(maxDialPeers, maxInboundPeers int) *dropper { | ||
cm := &dropper{ | ||
maxDialPeers: maxDialPeers, | ||
maxInboundPeers: maxInboundPeers, | ||
peerDropTimer: time.NewTimer(randomDuration(peerDropIntervalMin, peerDropIntervalMax)), | ||
shutdownCh: make(chan struct{}), | ||
} | ||
if peerDropIntervalMin > peerDropIntervalMax { | ||
panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration") | ||
} | ||
return cm | ||
} | ||
|
||
// Start the dropper. | ||
func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { | ||
cm.peersFunc = srv.Peers | ||
cm.syncingFunc = syncingFunc | ||
cm.wg.Add(1) | ||
go cm.loop() | ||
} | ||
|
||
// Stop the dropper. | ||
func (cm *dropper) Stop() { | ||
cm.peerDropTimer.Stop() | ||
close(cm.shutdownCh) | ||
cm.wg.Wait() | ||
} | ||
|
||
// dropRandomPeer selects one of the peers randomly and drops it from the peer pool. | ||
func (cm *dropper) dropRandomPeer() bool { | ||
peers := cm.peersFunc() | ||
var numInbound int | ||
for _, p := range peers { | ||
if p.Inbound() { | ||
numInbound++ | ||
} | ||
} | ||
numDialed := len(peers) - numInbound | ||
|
||
selectDoNotDrop := func(p *p2p.Peer) bool { | ||
// Avoid dropping trusted and static peers, or recent peers. | ||
// Only drop peers if their respective category (dialed/inbound) | ||
// is close to limit capacity. | ||
return p.Trusted() || p.StaticDialed() || | ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore) || | ||
(p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) || | ||
(p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) | ||
} | ||
|
||
droppable := slices.DeleteFunc(peers, selectDoNotDrop) | ||
if len(droppable) > 0 { | ||
p := droppable[mrand.Intn(len(droppable))] | ||
log.Debug("Dropping random peer", "inbound", p.Inbound(), | ||
"id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) | ||
p.Disconnect(p2p.DiscUselessPeer) | ||
if p.Inbound() { | ||
droppedInbound.Mark(1) | ||
} else { | ||
droppedOutbound.Mark(1) | ||
} | ||
return true | ||
} | ||
return false | ||
} | ||
|
||
// randomDuration generates a random duration between min and max. | ||
func randomDuration(min, max time.Duration) time.Duration { | ||
if min > max { | ||
panic("min duration must be less than or equal to max duration") | ||
} | ||
return time.Duration(mrand.Int63n(int64(max-min)) + int64(min)) | ||
} | ||
|
||
// loop is the main loop of the connection dropper. | ||
func (cm *dropper) loop() { | ||
defer cm.wg.Done() | ||
|
||
for { | ||
select { | ||
case <-cm.peerDropTimer.C: | ||
// Drop a random peer if we are not syncing and the peer count is close to the limit. | ||
if !cm.syncingFunc() { | ||
cm.dropRandomPeer() | ||
} | ||
cm.peerDropTimer.Reset(randomDuration(peerDropIntervalMin, peerDropIntervalMax)) | ||
case <-cm.shutdownCh: | ||
return | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe best to remove this setting. I wouldn't want people to tinker with this, since a non-zero setting here can lead to weird behavior if peer finding efficiency is bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing this will not change the drop interval, which ultimately drives churn in the system.
Changing this is somewhat similar to changing maxpeercount, which we do allow people to change (even on command line).
The effect is different on the dialed and on the served pool.
So I would prefer keeping it to simplify experimenting with it for us.