From 5c84a3bcbd6c7988ab703b331495d5926e87eb73 Mon Sep 17 00:00:00 2001 From: Paul Harris Date: Tue, 21 Oct 2025 17:12:58 +1000 Subject: [PATCH 1/2] POC --- .../CustodyGroupCountManagerImpl.java | 4 +- .../DataColumnSidecarCustodyImpl.java | 6 +-- .../datacolumns/DasLongPollCustodyTest.java | 4 +- .../DataColumnSidecarCustodyImplTest.java | 20 ++++----- .../datacolumns/DasCustodyStand.java | 3 +- .../networking/eth2/ActiveEth2P2PNetwork.java | 5 --- .../eth2/Eth2P2PNetworkBuilder.java | 15 ++----- .../eth2/peers/Eth2PeerManager.java | 4 -- .../methods/MetadataMessagesFactory.java | 1 + .../eth2/ActiveEth2P2PNetworkTest.java | 3 -- .../eth2/Eth2P2PNetworkFactory.java | 16 ++++---- .../beaconchain/BeaconChainController.java | 41 +++++++++++++++---- 12 files changed, 60 insertions(+), 62 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java index 20d739b9ace..21653b8483f 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java @@ -251,9 +251,7 @@ private void updateCustodyGroupCount( newCustodyGroupCount); combinedChainDataClient.updateCustodyGroupCount(newCustodyGroupCount); } - if (custodyGroupCount.get() >= newCustodyGroupCount) { - return; - } + custodyGroupCount.set(newCustodyGroupCount); custodyGroupCountChannel.onGroupCountUpdate(newCustodyGroupCount, getSamplingGroupCount()); custodyGroupCountGauge.set(newCustodyGroupCount); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImpl.java index d7b2fa4b402..248a3aa71d4 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImpl.java @@ -99,8 +99,7 @@ public DataColumnSidecarCustodyImpl( final CanonicalBlockResolver blockResolver, final DataColumnSidecarDbAccessor db, final MinCustodyPeriodSlotCalculator minCustodyPeriodSlotCalculator, - final Supplier custodyGroupCountManagerSupplier, - final int totalCustodyGroupCount) { + final Supplier custodyGroupCountManagerSupplier) { checkNotNull(spec); checkNotNull(blockResolver); checkNotNull(minCustodyPeriodSlotCalculator); @@ -111,7 +110,8 @@ public DataColumnSidecarCustodyImpl( this.blockResolver = blockResolver; this.minCustodyPeriodSlotCalculator = minCustodyPeriodSlotCalculator; this.custodyGroupCountManagerSupplier = custodyGroupCountManagerSupplier; - this.totalCustodyGroupCount = new AtomicInteger(totalCustodyGroupCount); + this.totalCustodyGroupCount = + new AtomicInteger(custodyGroupCountManagerSupplier.get().getCustodyGroupCount()); LOG.debug( "Initialized DataColumnSidecar Custody with custody group count {}", totalCustodyGroupCount); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java index 9d29409948e..b8650ae1f60 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java @@ -59,7 +59,6 @@ public class DasLongPollCustodyTest { () -> custodyGroupCountManager; final SpecConfigFulu config = SpecConfigFulu.required(spec.forMilestone(SpecMilestone.FULU).getConfig()); - final int groupCount = config.getNumberOfCustodyGroups(); final DataColumnSidecarCustodyImpl custodyImpl = new DataColumnSidecarCustodyImpl( @@ -67,8 +66,7 @@ public class DasLongPollCustodyTest { blockResolver, dbAccessor, MinCustodyPeriodSlotCalculator.createFromSpec(spec), - custodyGroupCountManagerSupplier, - groupCount); + custodyGroupCountManagerSupplier); private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, spec); private final Duration currentSlotTimeout = ofSeconds(3); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImplTest.java index 094d03ba980..e02f7777e9b 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DataColumnSidecarCustodyImplTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; @@ -86,21 +87,24 @@ public static Stream minCustodyScenarios() { @BeforeEach public void setup() { + when(custodyGroupCountManager.getCustodyGroupCount()).thenReturn(groupCount); custody = new DataColumnSidecarCustodyImpl( spec, blockResolver, dbAccessor, minCustodyPeriodSlotCalculator, - custodyGroupCountManagerSupplier, - groupCount); + custodyGroupCountManagerSupplier); when(custodyGroupCountManager.getCustodyColumnIndices()) .thenReturn( List.of(UInt64.valueOf(0), UInt64.valueOf(1), UInt64.valueOf(2), UInt64.valueOf(3))); + verify(custodyGroupCountManager).getCustodyGroupCount(); + Mockito.clearInvocations(custodyGroupCountManager); } private void initWithMockDb(final int initialGroupCount, final int updatedGroupCount) { final DataColumnSidecarDbAccessor dbAccessorMock = mock(DataColumnSidecarDbAccessor.class); + when(custodyGroupCountManager.getCustodyGroupCount()).thenReturn(initialGroupCount); when(dbAccessorMock.setFirstCustodyIncompleteSlot(any())).thenReturn(SafeFuture.COMPLETE); custody = new DataColumnSidecarCustodyImpl( @@ -108,8 +112,7 @@ private void initWithMockDb(final int initialGroupCount, final int updatedGroupC blockResolver, dbAccessorMock, minCustodyPeriodSlotCalculator, - custodyGroupCountManagerSupplier, - initialGroupCount); + custodyGroupCountManagerSupplier); when(custodyGroupCountManager.getCustodyGroupCount()).thenReturn(updatedGroupCount); } @@ -179,8 +182,7 @@ public void getBlockRootWithBlobs_emptySlot() { resolver, sidecarDb, minCustodyPeriodSlotCalculator, - custodyGroupCountManagerSupplier, - groupCount); + custodyGroupCountManagerSupplier); final SafeFuture> futureResult = custody.getBlockRootWithBlobs(ONE); verifyNoMoreInteractions(sidecarDb); @@ -200,8 +202,7 @@ public void getBlockRootWithBlobs_hasBlock() { resolver, sidecarDb, minCustodyPeriodSlotCalculator, - custodyGroupCountManagerSupplier, - groupCount); + custodyGroupCountManagerSupplier); final SafeFuture> futureResult = custody.getBlockRootWithBlobs(ONE); @@ -235,8 +236,7 @@ public void retrieveSlotCustody_insufficientCustody() resolver, sidecarDb, minCustodyPeriodSlotCalculator, - custodyGroupCountManagerSupplier, - 2); + custodyGroupCountManagerSupplier); SafeFuture future = custody.retrieveSlotCustody(ONE); verify(sidecarDb).getColumnIdentifiers(ONE); diff --git a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/DasCustodyStand.java b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/DasCustodyStand.java index e0007bd4575..06f850b4f61 100644 --- a/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/DasCustodyStand.java +++ b/ethereum/statetransition/src/testFixtures/java/tech/pegasys/teku/statetransition/datacolumns/DasCustodyStand.java @@ -105,8 +105,7 @@ private DasCustodyStand( asyncBlockResolver, dbAccessor, minCustodyPeriodSlotCalculator, - () -> custodyGroupCountManager, - totalCustodyGroupCount); + () -> custodyGroupCountManager); subscribeToSlotEvents(this.custody); subscribeToFinalizedEvents(this.custody); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java index 7fbcaa62107..dbf7f71822f 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java @@ -76,7 +76,6 @@ public class ActiveEth2P2PNetwork extends DelegatingP2PNetwork impleme private final SubnetSubscriptionService executionProofSubnetService; private final ProcessedAttestationSubscriptionProvider processedAttestationSubscriptionProvider; private final AtomicBoolean gossipStarted = new AtomicBoolean(false); - private final int dasTotalCustodySubnetCount; private final GossipForkManager gossipForkManager; @@ -103,7 +102,6 @@ public ActiveEth2P2PNetwork( final GossipEncoding gossipEncoding, final GossipConfigurator gossipConfigurator, final ProcessedAttestationSubscriptionProvider processedAttestationSubscriptionProvider, - final int dasTotalCustodySubnetCount, final boolean allTopicsFilterEnabled) { super(discoveryNetwork); this.spec = spec; @@ -120,7 +118,6 @@ public ActiveEth2P2PNetwork( this.dataColumnSidecarSubnetService = dataColumnSidecarSubnetService; this.executionProofSubnetService = executionProofSubnetService; this.processedAttestationSubscriptionProvider = processedAttestationSubscriptionProvider; - this.dasTotalCustodySubnetCount = dasTotalCustodySubnetCount; this.allTopicsFilterEnabled = allTopicsFilterEnabled; } @@ -181,8 +178,6 @@ private synchronized void startGossip() { discoveryNetwork::setSyncCommitteeSubnetSubscriptions); final UInt64 currentEpoch = recentChainData.getCurrentEpoch().orElseThrow(); if (spec.isMilestoneSupported(SpecMilestone.FULU)) { - LOG.info("Using custody sidecar subnets count: {}", dasTotalCustodySubnetCount); - discoveryNetwork.setDASTotalCustodySubnetCount(dasTotalCustodySubnetCount); recentChainData .getNextForkDigest(currentEpoch) .ifPresent(discoveryNetwork::setNextForkDigest); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java index c91808ecdc6..451dcd068c8 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkBuilder.java @@ -32,7 +32,6 @@ import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge; import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; import tech.pegasys.teku.infrastructure.time.TimeProvider; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkManager; import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkSubscriptions; @@ -99,6 +98,7 @@ import tech.pegasys.teku.spec.datastructures.util.ForkAndSpecMilestone; import tech.pegasys.teku.spec.logic.versions.fulu.helpers.BlobParameters; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsSupplier; +import tech.pegasys.teku.statetransition.CustodyGroupCountChannel; import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; import tech.pegasys.teku.statetransition.datacolumns.log.gossip.DasGossipLogger; @@ -181,14 +181,9 @@ public Eth2P2PNetwork build() { statusMessageFactory = new StatusMessageFactory(spec, combinedChainDataClient); eventChannels.subscribe(SlotEventsChannel.class, statusMessageFactory); } - - final Optional dasTotalCustodyGroupCount = - spec.isMilestoneSupported(SpecMilestone.FULU) - ? Optional.of( - UInt64.valueOf( - config.getTotalCustodyGroupCount(spec.forMilestone(SpecMilestone.FULU)))) - : Optional.empty(); - + if (metadataMessagesFactory != null && spec.isMilestoneSupported(SpecMilestone.FULU)) { + eventChannels.subscribe(CustodyGroupCountChannel.class, metadataMessagesFactory); + } final Eth2PeerManager eth2PeerManager = Eth2PeerManager.create( asyncRunner, @@ -211,7 +206,6 @@ public Eth2P2PNetwork build() { config.getPeerRequestLimit(), spec, discoveryNodeIdExtractor, - dasTotalCustodyGroupCount, dasReqRespLogger); final Collection> eth2RpcMethods = eth2PeerManager.getBeaconChainMethods().all(); @@ -240,7 +234,6 @@ public Eth2P2PNetwork build() { gossipEncoding, config.getGossipConfigurator(), processedAttestationSubscriptionProvider, - dasTotalCustodyGroupCount.orElse(UInt64.ZERO).intValue(), config.isAllTopicsFilterEnabled()); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java index 61cea72229c..d47b68497a4 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/Eth2PeerManager.java @@ -29,7 +29,6 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.time.TimeProvider; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.SubnetSubscriptionService; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.BeaconChainMethods; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.MetadataMessagesFactory; @@ -132,10 +131,7 @@ public static Eth2PeerManager create( final int peerRequestLimit, final Spec spec, final DiscoveryNodeIdExtractor discoveryNodeIdExtractor, - final Optional custodyGroupCount, final DasReqRespLogger dasLogger) { - - custodyGroupCount.ifPresent(metadataMessagesFactory::updateCustodyGroupCount); attestationSubnetService.subscribeToUpdates( metadataMessagesFactory::updateAttestationSubnetIds); syncCommitteeSubnetService.subscribeToUpdates( diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/MetadataMessagesFactory.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/MetadataMessagesFactory.java index 475e7921f60..3b6a860848e 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/MetadataMessagesFactory.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/MetadataMessagesFactory.java @@ -46,6 +46,7 @@ public synchronized void updateSyncCommitteeSubnetIds( public synchronized void updateCustodyGroupCount(final UInt64 custodyGroupCount) { this.custodyGroupCount = Optional.of(custodyGroupCount); + LOG.info("Updating custody group count {}", custodyGroupCount); handleUpdate(); } diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java index 54102c73c3f..9e0376104e6 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java @@ -15,7 +15,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -162,7 +161,6 @@ public void onEpoch_shouldUpdateDiscoveryNetworkForkInfo() { network.onEpoch(UInt64.ONE); asyncRunner.executeDueActions(); verify(discoveryNetwork).updateGossipTopicScoring(any()); - verify(discoveryNetwork).setDASTotalCustodySubnetCount(anyInt()); verify(discoveryNetwork).setNextForkDigest(altairForkDigest); verifyNoMoreInteractions(discoveryNetwork); @@ -457,7 +455,6 @@ ActiveEth2P2PNetwork createNetwork() { gossipEncoding, gossipConfigurator, processedAttestationSubscriptionProvider, - 0, true); } } diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java index 4b57306f951..84f7aadd9c0 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetworkFactory.java @@ -112,6 +112,7 @@ import tech.pegasys.teku.spec.datastructures.util.ForkAndSpecMilestone; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsSupplier; import tech.pegasys.teku.statetransition.BeaconChainUtil; +import tech.pegasys.teku.statetransition.CustodyGroupCountChannel; import tech.pegasys.teku.statetransition.block.VerifiedBlockOperationsListener; import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.DataColumnSidecarByRootCustody; @@ -236,16 +237,15 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { new DataColumnSidecarSubnetTopicProvider( combinedChainDataClient.getRecentChainData(), gossipEncoding); + final MetadataMessagesFactory metadataMessagesFactory = new MetadataMessagesFactory(); + if (spec.isMilestoneSupported(SpecMilestone.FULU)) { + eventChannels.subscribe(CustodyGroupCountChannel.class, metadataMessagesFactory); + } + if (rpcEncoding == null) { rpcEncoding = RpcEncoding.createSszSnappyEncoding(spec.getNetworkingConfig().getMaxPayloadSize()); } - final Optional dasTotalCustodySubnetCount = - spec.isMilestoneSupported(SpecMilestone.FULU) - ? Optional.of( - UInt64.valueOf( - config.getTotalCustodyGroupCount(spec.forMilestone(SpecMilestone.FULU)))) - : Optional.empty(); final UInt256 discoveryNodeId = DISCOVERY_NODE_ID_GENERATOR.next(); final Eth2PeerManager eth2PeerManager = Eth2PeerManager.create( @@ -253,7 +253,7 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { combinedChainDataClient, () -> DataColumnSidecarByRootCustody.NOOP, () -> CustodyGroupCountManager.NOOP, - new MetadataMessagesFactory(), + metadataMessagesFactory, METRICS_SYSTEM, attestationSubnetService, syncCommitteeSubnetService, @@ -269,7 +269,6 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { P2PConfig.DEFAULT_PEER_REQUEST_LIMIT, spec, __ -> Optional.of(discoveryNodeId), - dasTotalCustodySubnetCount, DasReqRespLogger.NOOP); List> rpcMethods = @@ -392,7 +391,6 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) { gossipEncoding, GossipConfigurator.NOOP, processedAttestationSubscriptionProvider, - 0, config.isAllTopicsFilterEnabled()); } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 2ac9f0a8bb0..304ef308ce5 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -674,6 +674,7 @@ public void initAll() { initActiveValidatorTracker(); initSubnetSubscriber(); initDataColumnSidecarSubnetBackboneSubscriber(); + initFuluCustodyEventChannel(); initSlashingEventsSubscriptions(); initPerformanceTracker(); initBlobSidecarReconstructionProvider(); @@ -858,12 +859,6 @@ protected void initDasCustody() { final int minCustodyGroupRequirement = specConfigFulu.getCustodyRequirement(); final int maxGroups = specConfigFulu.getNumberOfCustodyGroups(); - final int totalMyCustodyGroups = - beaconConfig.p2pConfig().getTotalCustodyGroupCount(specVersionFulu); - eventChannels - .getPublisher(CustodyGroupCountChannel.class) - .onGroupCountUpdate( - totalMyCustodyGroups, miscHelpersFulu.getSamplingGroupCount(totalMyCustodyGroups)); final DataColumnSidecarCustodyImpl dataColumnSidecarCustodyImpl = new DataColumnSidecarCustodyImpl( @@ -871,8 +866,7 @@ protected void initDasCustody() { canonicalBlockResolver, dbAccessor, minCustodyPeriodSlotCalculator, - this::getCustodyGroupCountManager, - totalMyCustodyGroups); + this::getCustodyGroupCountManager); eventChannels.subscribe(SlotEventsChannel.class, dataColumnSidecarCustodyImpl); eventChannels.subscribe(FinalizedCheckpointChannel.class, dataColumnSidecarCustodyImpl); @@ -1008,7 +1002,9 @@ protected void initDasCustody() { recoveringSidecarRetriever, this::getCustodyGroupCountManager, recentChainData); - LOG.info("DAS Basic Sampler initialized with {} groups to sample", totalMyCustodyGroups); + LOG.info( + "DAS Basic Sampler initialized with {} groups to sample", + getCustodyGroupCountManager().getSamplingGroupCount()); eventChannels.subscribe(SlotEventsChannel.class, dasSampler); dataColumnSidecarManager.subscribeToValidDataColumnSidecars( dasSampler::onNewValidatedDataColumnSidecar); @@ -1024,6 +1020,33 @@ protected void initDasSyncPreSampler() { this.dataColumnSidecarCustodyRef.get(), this::getCustodyGroupCountManager); eventChannels.subscribe(SyncPreImportBlockChannel.class, dasPreSampler::onNewPreImportBlocks); + + final int totalMyCustodyGroups = custodyGroupCountManagerRef.get().getCustodyGroupCount(); + + final MiscHelpersFulu miscHelpersFulu = + MiscHelpersFulu.required(spec.forMilestone(SpecMilestone.FULU).miscHelpers()); + + eventChannels + .getPublisher(CustodyGroupCountChannel.class) + .onGroupCountUpdate( + totalMyCustodyGroups, miscHelpersFulu.getSamplingGroupCount(totalMyCustodyGroups)); + } + } + + // final das initialisation to broadcast the correct custody count + protected void initFuluCustodyEventChannel() { + if (spec.isMilestoneSupported(SpecMilestone.FULU)) { + final int totalMyCustodyGroups = custodyGroupCountManagerRef.get().getCustodyGroupCount(); + final MiscHelpersFulu miscHelpersFulu = + MiscHelpersFulu.required(spec.forMilestone(SpecMilestone.FULU).miscHelpers()); + final int samplingCount = miscHelpersFulu.getSamplingGroupCount(totalMyCustodyGroups); + LOG.info( + "Initializing fulu custody group count to {}, sampling count to {}", + totalMyCustodyGroups, + samplingCount); + eventChannels + .getPublisher(CustodyGroupCountChannel.class) + .onGroupCountUpdate(totalMyCustodyGroups, samplingCount); } } From db1ef9ec45b1caf1a1c1d4e3d6d83cadb369007d Mon Sep 17 00:00:00 2001 From: Paul Harris Date: Wed, 22 Oct 2025 09:15:01 +1000 Subject: [PATCH 2/2] addressed startup issue --- .../CustodyGroupCountManagerImpl.java | 8 ++++-- .../CustodyGroupCountManagerImplTest.java | 2 +- .../beaconchain/BeaconChainController.java | 28 ------------------- 3 files changed, 6 insertions(+), 32 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java index 21653b8483f..11809e33af3 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImpl.java @@ -103,6 +103,7 @@ public CustodyGroupCountManagerImpl( this.proposersDataManager = proposersDataManager; this.combinedChainDataClient = combinedChainDataClient; this.custodyGroupCountChannel = custodyGroupCountChannel; + this.custodyGroupSyncedCount = new AtomicInteger(0); final Optional maybeCustodyCount = combinedChainDataClient.getCustodyGroupCount().map(UInt64::intValue); if (maybeCustodyCount.isEmpty() || maybeCustodyCount.get() < initCustodyGroupCount) { @@ -113,7 +114,6 @@ public CustodyGroupCountManagerImpl( updateCustodyGroupCount(maybeCustodyCount.get(), maybeCustodyCount); } - this.custodyGroupSyncedCount = new AtomicInteger(0); this.nodeId = nodeId; } @@ -251,8 +251,10 @@ private void updateCustodyGroupCount( newCustodyGroupCount); combinedChainDataClient.updateCustodyGroupCount(newCustodyGroupCount); } - - custodyGroupCount.set(newCustodyGroupCount); + final int oldValue = custodyGroupCount.getAndSet(newCustodyGroupCount); + if (oldValue == INITIAL_VALUE) { + setCustodyGroupSyncedCount(newCustodyGroupCount); + } custodyGroupCountChannel.onGroupCountUpdate(newCustodyGroupCount, getSamplingGroupCount()); custodyGroupCountGauge.set(newCustodyGroupCount); isMaxCustodyGroups = newCustodyGroupCount == specConfigFulu.getNumberOfCustodyGroups(); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImplTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImplTest.java index 6135a116865..3bbf27dab1b 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImplTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/CustodyGroupCountManagerImplTest.java @@ -168,7 +168,7 @@ public void onSlot_shouldUpdateCustodyAtGenesis() { custodyGroupCountManager.onSlot(UInt64.ZERO); assertThat(custodyGroupCountManager.getCustodyGroupCount()).isEqualTo(4); - assertThat(custodyGroupCountManager.getCustodyGroupSyncedCount()).isZero(); + assertThat(custodyGroupCountManager.getCustodyGroupSyncedCount()).isEqualTo(4); // prepare a validator when(proposersDataManager.getPreparedProposerInfo()) diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 304ef308ce5..cf074ac4350 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -674,7 +674,6 @@ public void initAll() { initActiveValidatorTracker(); initSubnetSubscriber(); initDataColumnSidecarSubnetBackboneSubscriber(); - initFuluCustodyEventChannel(); initSlashingEventsSubscriptions(); initPerformanceTracker(); initBlobSidecarReconstructionProvider(); @@ -1020,33 +1019,6 @@ protected void initDasSyncPreSampler() { this.dataColumnSidecarCustodyRef.get(), this::getCustodyGroupCountManager); eventChannels.subscribe(SyncPreImportBlockChannel.class, dasPreSampler::onNewPreImportBlocks); - - final int totalMyCustodyGroups = custodyGroupCountManagerRef.get().getCustodyGroupCount(); - - final MiscHelpersFulu miscHelpersFulu = - MiscHelpersFulu.required(spec.forMilestone(SpecMilestone.FULU).miscHelpers()); - - eventChannels - .getPublisher(CustodyGroupCountChannel.class) - .onGroupCountUpdate( - totalMyCustodyGroups, miscHelpersFulu.getSamplingGroupCount(totalMyCustodyGroups)); - } - } - - // final das initialisation to broadcast the correct custody count - protected void initFuluCustodyEventChannel() { - if (spec.isMilestoneSupported(SpecMilestone.FULU)) { - final int totalMyCustodyGroups = custodyGroupCountManagerRef.get().getCustodyGroupCount(); - final MiscHelpersFulu miscHelpersFulu = - MiscHelpersFulu.required(spec.forMilestone(SpecMilestone.FULU).miscHelpers()); - final int samplingCount = miscHelpersFulu.getSamplingGroupCount(totalMyCustodyGroups); - LOG.info( - "Initializing fulu custody group count to {}, sampling count to {}", - totalMyCustodyGroups, - samplingCount); - eventChannels - .getPublisher(CustodyGroupCountChannel.class) - .onGroupCountUpdate(totalMyCustodyGroups, samplingCount); } }