From 5a0915ee9fcf71d23418a7055bd1a92dbc4175b8 Mon Sep 17 00:00:00 2001 From: Jeroen Ost Date: Tue, 29 Jul 2025 13:31:22 +0200 Subject: [PATCH 1/3] First version of an updated peer selection strategy that tries to cover custody column subnet coverage from randomly selected peers. --- .../subnets/PeerSubnetSubscriptions.java | 4 ++ .../eth2/peers/Eth2PeerSelectionStrategy.java | 59 +++++++++++++++++-- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java index b951ec0cf18..7c6b97be586 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/subnets/PeerSubnetSubscriptions.java @@ -138,6 +138,10 @@ public static PeerSubnetSubscriptions create( return subscriptions; } + public IntStream streamRelevantSubnets() { + return dataColumnSidecarSubnetSubscriptions.streamRelevantSubnets(); + } + private static void updateMetrics( final SchemaDefinitionsSupplier currentSchemaDefinitions, final SettableLabelledGauge subnetPeerCountGauge, diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java index 2f3f92b7343..6ffcb1b8ff5 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java @@ -24,12 +24,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.tuweni.units.bigints.UInt256; import tech.pegasys.teku.networking.eth2.gossip.subnets.PeerSubnetSubscriptions; import tech.pegasys.teku.networking.p2p.connection.PeerConnectionType; import tech.pegasys.teku.networking.p2p.connection.PeerPools; @@ -118,17 +118,68 @@ private List selectAndRemoveRandomPeers( return selectedPeers; } + private record AvailableDiscoveryPeer( + DiscoveryPeer discoveryPeer, PeerAddress peerAddress) {} + private List selectPeersByScore( final P2PNetwork network, final PeerSubnetSubscriptions peerSubnetSubscriptions, final int scoreBasedPeersToAdd, final List allCandidatePeers) { + + // Summary of the peer selection algorithm + // First peers are scored like pre-Fusaka: a score is calculated per peer individually depending on the relevant + // subnets for attestations, sync committee and data columns needed. This leads to an overall peer score + // which takes into account the already connected peer count for each subnet and this list is sorted by highest + // scoring peers. + // Then from this list we select the top peers that satisfy the datacolumn subnet needs, up to minPeersRequired. + // This list is then the list of 'must have' candidates, and the other peers are 'optional extra candidates' + // We combine both lists and select the top ones up to scoreBasedPeersToAdd + final PeerScorer peerScorer = peerSubnetSubscriptions.createScorer(); - return allCandidatePeers.stream() + final Map neededPeersPerSubnetId = + peerSubnetSubscriptions + .streamRelevantSubnets() + .boxed() + .collect( + Collectors.toMap( + subnetId -> subnetId, + peerSubnetSubscriptions::getSubscriberCountForDataColumnSidecarSubnet)); + + final List sortedMustHaveCandidates = new ArrayList<>(); + final List sortedOptionalExtraCandidates = new ArrayList<>(); + + allCandidatePeers.stream() + .map( + candidate -> + checkCandidate(candidate, network) + .map(addr -> new AvailableDiscoveryPeer(candidate, addr))) + .flatMap(Optional::stream) .sorted( - Comparator.comparing((Function) peerScorer::scoreCandidatePeer) + Comparator.comparing( + (AvailableDiscoveryPeer p) -> peerScorer.scoreCandidatePeer(p.discoveryPeer())) .reversed()) - .flatMap(candidate -> checkCandidate(candidate, network).stream()) + .forEach( + availableDiscoveryPeer -> { + peerSubnetSubscriptions + .getDataColumnSidecarSubnetSubscriptionsByNodeId( + UInt256.fromBytes(availableDiscoveryPeer.discoveryPeer.getNodeId()), + availableDiscoveryPeer.discoveryPeer.getDasCustodySubnetCount()) + .streamAllSetBits() + .forEach( + subnetId -> { + Integer neededPeerCountForSubnetId = neededPeersPerSubnetId.get(subnetId); + if ((neededPeerCountForSubnetId != null) + && (neededPeerCountForSubnetId > 0)) { + sortedMustHaveCandidates.add(availableDiscoveryPeer.peerAddress); + neededPeersPerSubnetId.put(subnetId, neededPeerCountForSubnetId - 1); + } else { + sortedOptionalExtraCandidates.add(availableDiscoveryPeer.peerAddress); + } + }); + }); + + return Stream.concat(sortedMustHaveCandidates.stream(), sortedOptionalExtraCandidates.stream()) .limit(scoreBasedPeersToAdd) .toList(); } From 27b2659c86553454b2a3f42b0541522fd22b9880 Mon Sep 17 00:00:00 2001 From: Jeroen Ost Date: Tue, 29 Jul 2025 15:19:28 +0200 Subject: [PATCH 2/3] First version of an updated peer selection strategy that tries to cover custody column subnet coverage from randomly selected peers. --- .../eth2/peers/Eth2PeerSelectionStrategy.java | 100 ++++++++++-------- 1 file changed, 56 insertions(+), 44 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java index 6ffcb1b8ff5..0feaae0369c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java @@ -118,8 +118,7 @@ private List selectAndRemoveRandomPeers( return selectedPeers; } - private record AvailableDiscoveryPeer( - DiscoveryPeer discoveryPeer, PeerAddress peerAddress) {} + private record AvailableDiscoveryPeer(DiscoveryPeer discoveryPeer, PeerAddress peerAddress) {} private List selectPeersByScore( final P2PNetwork network, @@ -128,16 +127,18 @@ private List selectPeersByScore( final List allCandidatePeers) { // Summary of the peer selection algorithm - // First peers are scored like pre-Fusaka: a score is calculated per peer individually depending on the relevant - // subnets for attestations, sync committee and data columns needed. This leads to an overall peer score - // which takes into account the already connected peer count for each subnet and this list is sorted by highest - // scoring peers. - // Then from this list we select the top peers that satisfy the datacolumn subnet needs, up to minPeersRequired. - // This list is then the list of 'must have' candidates, and the other peers are 'optional extra candidates' - // We combine both lists and select the top ones up to scoreBasedPeersToAdd + // First peers are scored like pre-Fusaka: a score is calculated per peer individually depending + // on the relevant subnets for attestations, sync committee and data columns needed. This leads + // to an overall peer score which takes into account the already connected peer count for each + // subnet + // and this list is sorted by highest scoring peers. + // Then from this list we select the top peers that satisfy the datacolumn subnet needs, up to + // minPeersRequired. + // This list is then the list of 'must have' candidates, and the other peers are 'optional extra + // candidates'. We combine both lists and select the top ones up to scoreBasedPeersToAdd final PeerScorer peerScorer = peerSubnetSubscriptions.createScorer(); - final Map neededPeersPerSubnetId = + final Map requiredExtraPeerCountPerSubnetId = peerSubnetSubscriptions .streamRelevantSubnets() .boxed() @@ -146,41 +147,52 @@ private List selectPeersByScore( subnetId -> subnetId, peerSubnetSubscriptions::getSubscriberCountForDataColumnSidecarSubnet)); - final List sortedMustHaveCandidates = new ArrayList<>(); - final List sortedOptionalExtraCandidates = new ArrayList<>(); - - allCandidatePeers.stream() - .map( - candidate -> - checkCandidate(candidate, network) - .map(addr -> new AvailableDiscoveryPeer(candidate, addr))) - .flatMap(Optional::stream) - .sorted( - Comparator.comparing( - (AvailableDiscoveryPeer p) -> peerScorer.scoreCandidatePeer(p.discoveryPeer())) - .reversed()) - .forEach( - availableDiscoveryPeer -> { - peerSubnetSubscriptions - .getDataColumnSidecarSubnetSubscriptionsByNodeId( - UInt256.fromBytes(availableDiscoveryPeer.discoveryPeer.getNodeId()), - availableDiscoveryPeer.discoveryPeer.getDasCustodySubnetCount()) - .streamAllSetBits() - .forEach( - subnetId -> { - Integer neededPeerCountForSubnetId = neededPeersPerSubnetId.get(subnetId); - if ((neededPeerCountForSubnetId != null) - && (neededPeerCountForSubnetId > 0)) { - sortedMustHaveCandidates.add(availableDiscoveryPeer.peerAddress); - neededPeersPerSubnetId.put(subnetId, neededPeerCountForSubnetId - 1); - } else { - sortedOptionalExtraCandidates.add(availableDiscoveryPeer.peerAddress); - } - }); - }); - - return Stream.concat(sortedMustHaveCandidates.stream(), sortedOptionalExtraCandidates.stream()) + // Step 1: Convert candidates to AvailableDiscoveryPeer if they pass filtering + final List availablePeers = + allCandidatePeers.stream() + .map( + candidate -> + checkCandidate(candidate, network) + .map(addr -> new AvailableDiscoveryPeer(candidate, addr))) + .flatMap(Optional::stream) + .sorted( + Comparator.comparingInt( + (AvailableDiscoveryPeer p) -> + peerScorer.scoreCandidatePeer(p.discoveryPeer())) + .reversed()) + .toList(); + + // Step 2: Partition into must-have and optional + List mustHavePeers = new ArrayList<>(); + List optionalPeers = new ArrayList<>(); + + for (AvailableDiscoveryPeer peer : availablePeers) { + boolean isMustHave = false; + + var subnets = + peerSubnetSubscriptions.getDataColumnSidecarSubnetSubscriptionsByNodeId( + UInt256.fromBytes(peer.discoveryPeer().getNodeId()), + peer.discoveryPeer().getDasCustodySubnetCount()); + + for (int subnetId : subnets.streamAllSetBits().toArray()) { + Integer remaining = requiredExtraPeerCountPerSubnetId.get(subnetId); + if (remaining != null && remaining > 0) { + requiredExtraPeerCountPerSubnetId.put(subnetId, remaining - 1); + isMustHave = true; + } + } + + if (isMustHave) { + mustHavePeers.add(peer); + } else { + optionalPeers.add(peer); + } + } + + // Step 3: Combine and limit the total result + return Stream.concat(mustHavePeers.stream(), optionalPeers.stream()) .limit(scoreBasedPeersToAdd) + .map(AvailableDiscoveryPeer::peerAddress) .toList(); } From be9201bf489d260e9e6ea7c988fbd99f4dd73e22 Mon Sep 17 00:00:00 2001 From: Jeroen Ost Date: Tue, 29 Jul 2025 15:53:55 +0200 Subject: [PATCH 3/3] First version of an updated peer selection strategy that tries to cover custody column subnet coverage from randomly selected peers. --- .../eth2/peers/Eth2PeerSelectionStrategy.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java index 0feaae0369c..ea9e95c97fe 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerSelectionStrategy.java @@ -42,6 +42,8 @@ import tech.pegasys.teku.networking.p2p.reputation.ReputationManager; public class Eth2PeerSelectionStrategy implements PeerSelectionStrategy { + + public static final int MIN_PEERS_PER_DATA_COLUMN_SUBNET = 1; private static final Logger LOG = LogManager.getLogger(); private final TargetPeerRange targetPeerCountRange; @@ -132,8 +134,8 @@ private List selectPeersByScore( // to an overall peer score which takes into account the already connected peer count for each // subnet // and this list is sorted by highest scoring peers. - // Then from this list we select the top peers that satisfy the datacolumn subnet needs, up to - // minPeersRequired. + // Then from this list, we select the top peers that satisfy the datacolumn subnet requirements + // to make sure we have at least one peer per subnet. // This list is then the list of 'must have' candidates, and the other peers are 'optional extra // candidates'. We combine both lists and select the top ones up to scoreBasedPeersToAdd @@ -145,7 +147,12 @@ private List selectPeersByScore( .collect( Collectors.toMap( subnetId -> subnetId, - peerSubnetSubscriptions::getSubscriberCountForDataColumnSidecarSubnet)); + subnetId -> + peerSubnetSubscriptions.getSubscriberCountForDataColumnSidecarSubnet( + subnetId) + >= MIN_PEERS_PER_DATA_COLUMN_SUBNET + ? 0 + : MIN_PEERS_PER_DATA_COLUMN_SUBNET)); // Step 1: Convert candidates to AvailableDiscoveryPeer if they pass filtering final List availablePeers =