Skip to content
Draft

POC das #10037

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public CustodyGroupCountManagerImpl(
this.proposersDataManager = proposersDataManager;
this.combinedChainDataClient = combinedChainDataClient;
this.custodyGroupCountChannel = custodyGroupCountChannel;
this.custodyGroupSyncedCount = new AtomicInteger(0);
final Optional<Integer> maybeCustodyCount =
combinedChainDataClient.getCustodyGroupCount().map(UInt64::intValue);
if (maybeCustodyCount.isEmpty() || maybeCustodyCount.get() < initCustodyGroupCount) {
Expand All @@ -113,7 +114,6 @@ public CustodyGroupCountManagerImpl(
updateCustodyGroupCount(maybeCustodyCount.get(), maybeCustodyCount);
}

this.custodyGroupSyncedCount = new AtomicInteger(0);
this.nodeId = nodeId;
}

Expand Down Expand Up @@ -251,10 +251,10 @@ private void updateCustodyGroupCount(
newCustodyGroupCount);
combinedChainDataClient.updateCustodyGroupCount(newCustodyGroupCount);
}
if (custodyGroupCount.get() >= newCustodyGroupCount) {
return;
final int oldValue = custodyGroupCount.getAndSet(newCustodyGroupCount);
if (oldValue == INITIAL_VALUE) {
setCustodyGroupSyncedCount(newCustodyGroupCount);
}
custodyGroupCount.set(newCustodyGroupCount);
custodyGroupCountChannel.onGroupCountUpdate(newCustodyGroupCount, getSamplingGroupCount());
custodyGroupCountGauge.set(newCustodyGroupCount);
isMaxCustodyGroups = newCustodyGroupCount == specConfigFulu.getNumberOfCustodyGroups();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ public DataColumnSidecarCustodyImpl(
final CanonicalBlockResolver blockResolver,
final DataColumnSidecarDbAccessor db,
final MinCustodyPeriodSlotCalculator minCustodyPeriodSlotCalculator,
final Supplier<CustodyGroupCountManager> custodyGroupCountManagerSupplier,
final int totalCustodyGroupCount) {
final Supplier<CustodyGroupCountManager> custodyGroupCountManagerSupplier) {
checkNotNull(spec);
checkNotNull(blockResolver);
checkNotNull(minCustodyPeriodSlotCalculator);
Expand All @@ -111,7 +110,8 @@ public DataColumnSidecarCustodyImpl(
this.blockResolver = blockResolver;
this.minCustodyPeriodSlotCalculator = minCustodyPeriodSlotCalculator;
this.custodyGroupCountManagerSupplier = custodyGroupCountManagerSupplier;
this.totalCustodyGroupCount = new AtomicInteger(totalCustodyGroupCount);
this.totalCustodyGroupCount =
new AtomicInteger(custodyGroupCountManagerSupplier.get().getCustodyGroupCount());
LOG.debug(
"Initialized DataColumnSidecar Custody with custody group count {}",
totalCustodyGroupCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void onSlot_shouldUpdateCustodyAtGenesis() {
custodyGroupCountManager.onSlot(UInt64.ZERO);

assertThat(custodyGroupCountManager.getCustodyGroupCount()).isEqualTo(4);
assertThat(custodyGroupCountManager.getCustodyGroupSyncedCount()).isZero();
assertThat(custodyGroupCountManager.getCustodyGroupSyncedCount()).isEqualTo(4);

// prepare a validator
when(proposersDataManager.getPreparedProposerInfo())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,14 @@ public class DasLongPollCustodyTest {
() -> custodyGroupCountManager;
final SpecConfigFulu config =
SpecConfigFulu.required(spec.forMilestone(SpecMilestone.FULU).getConfig());
final int groupCount = config.getNumberOfCustodyGroups();

final DataColumnSidecarCustodyImpl custodyImpl =
new DataColumnSidecarCustodyImpl(
spec,
blockResolver,
dbAccessor,
MinCustodyPeriodSlotCalculator.createFromSpec(spec),
custodyGroupCountManagerSupplier,
groupCount);
custodyGroupCountManagerSupplier);

private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, spec);
private final Duration currentSlotTimeout = ofSeconds(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -86,30 +87,32 @@ public static Stream<Arguments> minCustodyScenarios() {

@BeforeEach
public void setup() {
when(custodyGroupCountManager.getCustodyGroupCount()).thenReturn(groupCount);
custody =
new DataColumnSidecarCustodyImpl(
spec,
blockResolver,
dbAccessor,
minCustodyPeriodSlotCalculator,
custodyGroupCountManagerSupplier,
groupCount);
custodyGroupCountManagerSupplier);
when(custodyGroupCountManager.getCustodyColumnIndices())
.thenReturn(
List.of(UInt64.valueOf(0), UInt64.valueOf(1), UInt64.valueOf(2), UInt64.valueOf(3)));
verify(custodyGroupCountManager).getCustodyGroupCount();
Mockito.clearInvocations(custodyGroupCountManager);
}

private void initWithMockDb(final int initialGroupCount, final int updatedGroupCount) {
final DataColumnSidecarDbAccessor dbAccessorMock = mock(DataColumnSidecarDbAccessor.class);
when(custodyGroupCountManager.getCustodyGroupCount()).thenReturn(initialGroupCount);
when(dbAccessorMock.setFirstCustodyIncompleteSlot(any())).thenReturn(SafeFuture.COMPLETE);
custody =
new DataColumnSidecarCustodyImpl(
spec,
blockResolver,
dbAccessorMock,
minCustodyPeriodSlotCalculator,
custodyGroupCountManagerSupplier,
initialGroupCount);
custodyGroupCountManagerSupplier);
when(custodyGroupCountManager.getCustodyGroupCount()).thenReturn(updatedGroupCount);
}

Expand Down Expand Up @@ -179,8 +182,7 @@ public void getBlockRootWithBlobs_emptySlot() {
resolver,
sidecarDb,
minCustodyPeriodSlotCalculator,
custodyGroupCountManagerSupplier,
groupCount);
custodyGroupCountManagerSupplier);

final SafeFuture<Optional<Bytes32>> futureResult = custody.getBlockRootWithBlobs(ONE);
verifyNoMoreInteractions(sidecarDb);
Expand All @@ -200,8 +202,7 @@ public void getBlockRootWithBlobs_hasBlock() {
resolver,
sidecarDb,
minCustodyPeriodSlotCalculator,
custodyGroupCountManagerSupplier,
groupCount);
custodyGroupCountManagerSupplier);

final SafeFuture<Optional<Bytes32>> futureResult = custody.getBlockRootWithBlobs(ONE);

Expand Down Expand Up @@ -235,8 +236,7 @@ public void retrieveSlotCustody_insufficientCustody()
resolver,
sidecarDb,
minCustodyPeriodSlotCalculator,
custodyGroupCountManagerSupplier,
2);
custodyGroupCountManagerSupplier);

SafeFuture<DataColumnSidecarCustodyImpl.SlotCustody> future = custody.retrieveSlotCustody(ONE);
verify(sidecarDb).getColumnIdentifiers(ONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ private DasCustodyStand(
asyncBlockResolver,
dbAccessor,
minCustodyPeriodSlotCalculator,
() -> custodyGroupCountManager,
totalCustodyGroupCount);
() -> custodyGroupCountManager);
subscribeToSlotEvents(this.custody);
subscribeToFinalizedEvents(this.custody);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public class ActiveEth2P2PNetwork extends DelegatingP2PNetwork<Eth2Peer> impleme
private final SubnetSubscriptionService executionProofSubnetService;
private final ProcessedAttestationSubscriptionProvider processedAttestationSubscriptionProvider;
private final AtomicBoolean gossipStarted = new AtomicBoolean(false);
private final int dasTotalCustodySubnetCount;

private final GossipForkManager gossipForkManager;

Expand All @@ -103,7 +102,6 @@ public ActiveEth2P2PNetwork(
final GossipEncoding gossipEncoding,
final GossipConfigurator gossipConfigurator,
final ProcessedAttestationSubscriptionProvider processedAttestationSubscriptionProvider,
final int dasTotalCustodySubnetCount,
final boolean allTopicsFilterEnabled) {
super(discoveryNetwork);
this.spec = spec;
Expand All @@ -120,7 +118,6 @@ public ActiveEth2P2PNetwork(
this.dataColumnSidecarSubnetService = dataColumnSidecarSubnetService;
this.executionProofSubnetService = executionProofSubnetService;
this.processedAttestationSubscriptionProvider = processedAttestationSubscriptionProvider;
this.dasTotalCustodySubnetCount = dasTotalCustodySubnetCount;
this.allTopicsFilterEnabled = allTopicsFilterEnabled;
}

Expand Down Expand Up @@ -181,8 +178,6 @@ private synchronized void startGossip() {
discoveryNetwork::setSyncCommitteeSubnetSubscriptions);
final UInt64 currentEpoch = recentChainData.getCurrentEpoch().orElseThrow();
if (spec.isMilestoneSupported(SpecMilestone.FULU)) {
LOG.info("Using custody sidecar subnets count: {}", dasTotalCustodySubnetCount);
discoveryNetwork.setDASTotalCustodySubnetCount(dasTotalCustodySubnetCount);
recentChainData
.getNextForkDigest(currentEpoch)
.ifPresent(discoveryNetwork::setNextForkDigest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkManager;
import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkSubscriptions;
Expand Down Expand Up @@ -99,6 +98,7 @@
import tech.pegasys.teku.spec.datastructures.util.ForkAndSpecMilestone;
import tech.pegasys.teku.spec.logic.versions.fulu.helpers.BlobParameters;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsSupplier;
import tech.pegasys.teku.statetransition.CustodyGroupCountChannel;
import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager;
import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody;
import tech.pegasys.teku.statetransition.datacolumns.log.gossip.DasGossipLogger;
Expand Down Expand Up @@ -181,14 +181,9 @@ public Eth2P2PNetwork build() {
statusMessageFactory = new StatusMessageFactory(spec, combinedChainDataClient);
eventChannels.subscribe(SlotEventsChannel.class, statusMessageFactory);
}

final Optional<UInt64> dasTotalCustodyGroupCount =
spec.isMilestoneSupported(SpecMilestone.FULU)
? Optional.of(
UInt64.valueOf(
config.getTotalCustodyGroupCount(spec.forMilestone(SpecMilestone.FULU))))
: Optional.empty();

if (metadataMessagesFactory != null && spec.isMilestoneSupported(SpecMilestone.FULU)) {
eventChannels.subscribe(CustodyGroupCountChannel.class, metadataMessagesFactory);
}
final Eth2PeerManager eth2PeerManager =
Eth2PeerManager.create(
asyncRunner,
Expand All @@ -211,7 +206,6 @@ public Eth2P2PNetwork build() {
config.getPeerRequestLimit(),
spec,
discoveryNodeIdExtractor,
dasTotalCustodyGroupCount,
dasReqRespLogger);
final Collection<RpcMethod<?, ?, ?>> eth2RpcMethods =
eth2PeerManager.getBeaconChainMethods().all();
Expand Down Expand Up @@ -240,7 +234,6 @@ public Eth2P2PNetwork build() {
gossipEncoding,
config.getGossipConfigurator(),
processedAttestationSubscriptionProvider,
dasTotalCustodyGroupCount.orElse(UInt64.ZERO).intValue(),
config.isAllTopicsFilterEnabled());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.SubnetSubscriptionService;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.BeaconChainMethods;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.MetadataMessagesFactory;
Expand Down Expand Up @@ -132,10 +131,7 @@ public static Eth2PeerManager create(
final int peerRequestLimit,
final Spec spec,
final DiscoveryNodeIdExtractor discoveryNodeIdExtractor,
final Optional<UInt64> custodyGroupCount,
final DasReqRespLogger dasLogger) {

custodyGroupCount.ifPresent(metadataMessagesFactory::updateCustodyGroupCount);
attestationSubnetService.subscribeToUpdates(
metadataMessagesFactory::updateAttestationSubnetIds);
syncCommitteeSubnetService.subscribeToUpdates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public synchronized void updateSyncCommitteeSubnetIds(

public synchronized void updateCustodyGroupCount(final UInt64 custodyGroupCount) {
this.custodyGroupCount = Optional.of(custodyGroupCount);
LOG.info("Updating custody group count {}", custodyGroupCount);
handleUpdate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -162,7 +161,6 @@ public void onEpoch_shouldUpdateDiscoveryNetworkForkInfo() {
network.onEpoch(UInt64.ONE);
asyncRunner.executeDueActions();
verify(discoveryNetwork).updateGossipTopicScoring(any());
verify(discoveryNetwork).setDASTotalCustodySubnetCount(anyInt());
verify(discoveryNetwork).setNextForkDigest(altairForkDigest);
verifyNoMoreInteractions(discoveryNetwork);

Expand Down Expand Up @@ -457,7 +455,6 @@ ActiveEth2P2PNetwork createNetwork() {
gossipEncoding,
gossipConfigurator,
processedAttestationSubscriptionProvider,
0,
true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import tech.pegasys.teku.spec.datastructures.util.ForkAndSpecMilestone;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionsSupplier;
import tech.pegasys.teku.statetransition.BeaconChainUtil;
import tech.pegasys.teku.statetransition.CustodyGroupCountChannel;
import tech.pegasys.teku.statetransition.block.VerifiedBlockOperationsListener;
import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager;
import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody;
Expand Down Expand Up @@ -236,24 +237,23 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) {
new DataColumnSidecarSubnetTopicProvider(
combinedChainDataClient.getRecentChainData(), gossipEncoding);

final MetadataMessagesFactory metadataMessagesFactory = new MetadataMessagesFactory();
if (spec.isMilestoneSupported(SpecMilestone.FULU)) {
eventChannels.subscribe(CustodyGroupCountChannel.class, metadataMessagesFactory);
}

if (rpcEncoding == null) {
rpcEncoding =
RpcEncoding.createSszSnappyEncoding(spec.getNetworkingConfig().getMaxPayloadSize());
}
final Optional<UInt64> dasTotalCustodySubnetCount =
spec.isMilestoneSupported(SpecMilestone.FULU)
? Optional.of(
UInt64.valueOf(
config.getTotalCustodyGroupCount(spec.forMilestone(SpecMilestone.FULU))))
: Optional.empty();
final UInt256 discoveryNodeId = DISCOVERY_NODE_ID_GENERATOR.next();
final Eth2PeerManager eth2PeerManager =
Eth2PeerManager.create(
asyncRunner,
combinedChainDataClient,
() -> DataColumnSidecarByRootCustody.NOOP,
() -> CustodyGroupCountManager.NOOP,
new MetadataMessagesFactory(),
metadataMessagesFactory,
METRICS_SYSTEM,
attestationSubnetService,
syncCommitteeSubnetService,
Expand All @@ -269,7 +269,6 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) {
P2PConfig.DEFAULT_PEER_REQUEST_LIMIT,
spec,
__ -> Optional.of(discoveryNodeId),
dasTotalCustodySubnetCount,
DasReqRespLogger.NOOP);

List<RpcMethod<?, ?, ?>> rpcMethods =
Expand Down Expand Up @@ -392,7 +391,6 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) {
gossipEncoding,
GossipConfigurator.NOOP,
processedAttestationSubscriptionProvider,
0,
config.isAllTopicsFilterEnabled());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,21 +858,14 @@ protected void initDasCustody() {

final int minCustodyGroupRequirement = specConfigFulu.getCustodyRequirement();
final int maxGroups = specConfigFulu.getNumberOfCustodyGroups();
final int totalMyCustodyGroups =
beaconConfig.p2pConfig().getTotalCustodyGroupCount(specVersionFulu);
eventChannels
.getPublisher(CustodyGroupCountChannel.class)
.onGroupCountUpdate(
totalMyCustodyGroups, miscHelpersFulu.getSamplingGroupCount(totalMyCustodyGroups));

final DataColumnSidecarCustodyImpl dataColumnSidecarCustodyImpl =
new DataColumnSidecarCustodyImpl(
spec,
canonicalBlockResolver,
dbAccessor,
minCustodyPeriodSlotCalculator,
this::getCustodyGroupCountManager,
totalMyCustodyGroups);
this::getCustodyGroupCountManager);
eventChannels.subscribe(SlotEventsChannel.class, dataColumnSidecarCustodyImpl);
eventChannels.subscribe(FinalizedCheckpointChannel.class, dataColumnSidecarCustodyImpl);

Expand Down Expand Up @@ -1008,7 +1001,9 @@ protected void initDasCustody() {
recoveringSidecarRetriever,
this::getCustodyGroupCountManager,
recentChainData);
LOG.info("DAS Basic Sampler initialized with {} groups to sample", totalMyCustodyGroups);
LOG.info(
"DAS Basic Sampler initialized with {} groups to sample",
getCustodyGroupCountManager().getSamplingGroupCount());
eventChannels.subscribe(SlotEventsChannel.class, dasSampler);
dataColumnSidecarManager.subscribeToValidDataColumnSidecars(
dasSampler::onNewValidatedDataColumnSidecar);
Expand Down