Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public static PeerSubnetSubscriptions create(
return subscriptions;
}

public IntStream streamRelevantSubnets() {
return dataColumnSidecarSubnetSubscriptions.streamRelevantSubnets();
}

private static void updateMetrics(
final SchemaDefinitionsSupplier currentSchemaDefinitions,
final SettableLabelledGauge subnetPeerCountGauge,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.units.bigints.UInt256;
import tech.pegasys.teku.networking.eth2.gossip.subnets.PeerSubnetSubscriptions;
import tech.pegasys.teku.networking.p2p.connection.PeerConnectionType;
import tech.pegasys.teku.networking.p2p.connection.PeerPools;
Expand All @@ -42,6 +42,8 @@
import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;

public class Eth2PeerSelectionStrategy implements PeerSelectionStrategy {

public static final int MIN_PEERS_PER_DATA_COLUMN_SUBNET = 1;
private static final Logger LOG = LogManager.getLogger();

private final TargetPeerRange targetPeerCountRange;
Expand Down Expand Up @@ -118,18 +120,86 @@ private List<PeerAddress> selectAndRemoveRandomPeers(
return selectedPeers;
}

private record AvailableDiscoveryPeer(DiscoveryPeer discoveryPeer, PeerAddress peerAddress) {}

private List<PeerAddress> selectPeersByScore(
final P2PNetwork<?> network,
final PeerSubnetSubscriptions peerSubnetSubscriptions,
final int scoreBasedPeersToAdd,
final List<DiscoveryPeer> allCandidatePeers) {

// Summary of the peer selection algorithm
// First peers are scored like pre-Fusaka: a score is calculated per peer individually depending
// on the relevant subnets for attestations, sync committee and data columns needed. This leads
// to an overall peer score which takes into account the already connected peer count for each
// subnet
// and this list is sorted by highest scoring peers.
// Then from this list, we select the top peers that satisfy the datacolumn subnet requirements
// to make sure we have at least one peer per subnet.
// This list is then the list of 'must have' candidates, and the other peers are 'optional extra
// candidates'. We combine both lists and select the top ones up to scoreBasedPeersToAdd

final PeerScorer peerScorer = peerSubnetSubscriptions.createScorer();
return allCandidatePeers.stream()
.sorted(
Comparator.comparing((Function<DiscoveryPeer, Integer>) peerScorer::scoreCandidatePeer)
.reversed())
.flatMap(candidate -> checkCandidate(candidate, network).stream())
final Map<Integer, Integer> requiredExtraPeerCountPerSubnetId =
peerSubnetSubscriptions
.streamRelevantSubnets()
.boxed()
.collect(
Collectors.toMap(
subnetId -> subnetId,
subnetId ->
peerSubnetSubscriptions.getSubscriberCountForDataColumnSidecarSubnet(
subnetId)
>= MIN_PEERS_PER_DATA_COLUMN_SUBNET
? 0
: MIN_PEERS_PER_DATA_COLUMN_SUBNET));

// Step 1: Convert candidates to AvailableDiscoveryPeer if they pass filtering
final List<AvailableDiscoveryPeer> availablePeers =
allCandidatePeers.stream()
.map(
candidate ->
checkCandidate(candidate, network)
.map(addr -> new AvailableDiscoveryPeer(candidate, addr)))
.flatMap(Optional::stream)
.sorted(
Comparator.comparingInt(
(AvailableDiscoveryPeer p) ->
peerScorer.scoreCandidatePeer(p.discoveryPeer()))
.reversed())
.toList();

// Step 2: Partition into must-have and optional
List<AvailableDiscoveryPeer> mustHavePeers = new ArrayList<>();
List<AvailableDiscoveryPeer> optionalPeers = new ArrayList<>();

for (AvailableDiscoveryPeer peer : availablePeers) {
boolean isMustHave = false;

var subnets =
peerSubnetSubscriptions.getDataColumnSidecarSubnetSubscriptionsByNodeId(
UInt256.fromBytes(peer.discoveryPeer().getNodeId()),
peer.discoveryPeer().getDasCustodySubnetCount());

for (int subnetId : subnets.streamAllSetBits().toArray()) {
Integer remaining = requiredExtraPeerCountPerSubnetId.get(subnetId);
if (remaining != null && remaining > 0) {
requiredExtraPeerCountPerSubnetId.put(subnetId, remaining - 1);
isMustHave = true;
}
}

if (isMustHave) {
mustHavePeers.add(peer);
} else {
optionalPeers.add(peer);
}
}

// Step 3: Combine and limit the total result
return Stream.concat(mustHavePeers.stream(), optionalPeers.stream())
.limit(scoreBasedPeersToAdd)
.map(AvailableDiscoveryPeer::peerAddress)
.toList();
}

Expand Down