From 343c70eb9e0c135f7b7df18fde33a774893f1ab0 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 22 Oct 2025 11:40:44 +0200 Subject: [PATCH 1/2] remove DasLongPollCustody --- .../datacolumns/DasLongPollCustody.java | 190 ------------- .../datacolumns/DasLongPollCustodyTest.java | 265 ------------------ .../beaconchain/BeaconChainController.java | 15 +- 3 files changed, 1 insertion(+), 469 deletions(-) delete mode 100644 ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustody.java delete mode 100644 ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustody.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustody.java deleted file mode 100644 index 6fb06f34904..00000000000 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustody.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2024 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.statetransition.datacolumns; - -import com.google.common.annotations.VisibleForTesting; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.NavigableMap; -import java.util.Optional; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; -import tech.pegasys.teku.ethereum.events.SlotEventsChannel; -import tech.pegasys.teku.infrastructure.async.AsyncRunner; -import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.async.stream.AsyncStream; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; -import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; - -public class DasLongPollCustody implements DataColumnSidecarCustody, SlotEventsChannel { - - public interface GossipWaitTimeoutCalculator { - /** Returns the duration to wait for a column to be gossiped */ - Duration getGossipWaitTimeout(UInt64 slot); - } - - private final DataColumnSidecarCustody delegate; - private final AsyncRunner asyncRunner; - private final GossipWaitTimeoutCalculator gossipWaitTimeoutCalculator; - - @VisibleForTesting final PendingRequests pendingRequests = new PendingRequests(); - - public DasLongPollCustody( - final DataColumnSidecarCustody delegate, - final AsyncRunner asyncRunner, - GossipWaitTimeoutCalculator gossipWaitTimeoutCalculator) { - this.delegate = delegate; - this.asyncRunner = asyncRunner; - this.gossipWaitTimeoutCalculator = gossipWaitTimeoutCalculator; - } - - @Override - public SafeFuture onNewValidatedDataColumnSidecar( - final DataColumnSidecar dataColumnSidecar, final RemoteOrigin remoteOrigin) { - return delegate - .onNewValidatedDataColumnSidecar(dataColumnSidecar, remoteOrigin) - .thenRun( - () -> { - final List>> pendingRequests = - this.pendingRequests.remove( - DataColumnSlotAndIdentifier.fromDataColumn(dataColumnSidecar)); - for (SafeFuture> pendingRequest : pendingRequests) { - pendingRequest.complete(Optional.of(dataColumnSidecar)); - } - }); - } - - @Override - public SafeFuture> getCustodyDataColumnSidecar( - final DataColumnSlotAndIdentifier columnId) { - final SafeFuture> pendingFuture = addPendingRequest(columnId); - final SafeFuture> existingFuture = - delegate.getCustodyDataColumnSidecar(columnId); - return anyNonEmpty(pendingFuture, existingFuture); - } - - @Override - public SafeFuture hasCustodyDataColumnSidecar( - final DataColumnSlotAndIdentifier columnId) { - final SafeFuture> pendingFuture = - addPendingRequest(columnId).thenApply(maybeSidecar -> maybeSidecar.map(__ -> true)); - final SafeFuture> existingFuture = - delegate - .hasCustodyDataColumnSidecar(columnId) - .thenApply(doesExist -> doesExist ? Optional.of(true) : Optional.empty()); - return anyNonEmpty(pendingFuture, existingFuture) - .thenApply(maybeResult -> maybeResult.orElse(false)); - } - - @Override - public AsyncStream retrieveMissingColumns() { - return delegate.retrieveMissingColumns(); - } - - private SafeFuture> addPendingRequest( - final DataColumnSlotAndIdentifier columnId) { - final SafeFuture> promise = new SafeFuture<>(); - pendingRequests.add(columnId, promise); - return promise; - } - - @Override - public void onSlot(final UInt64 slot) { - final Duration waitPeriodForCurrentSlot = - gossipWaitTimeoutCalculator.getGossipWaitTimeout(slot); - asyncRunner - .runAfterDelay( - () -> pendingRequests.setNoWaitSlot(slot.increment()), waitPeriodForCurrentSlot) - .finishStackTrace(); - } - - private static SafeFuture> anyNonEmpty( - final SafeFuture> future1, final SafeFuture> future2) { - return SafeFuture.anyOf(future1, future2) - .thenCompose( - __ -> { - if (future1.isCompletedNormally()) { - if (future1.getImmediately().isPresent()) { - return future1; - } else { - return future2; - } - } else if (future2.isCompletedNormally()) { - if (future2.getImmediately().isPresent()) { - return future2; - } else { - return future1; - } - } else { - throw new IllegalStateException("Unexpected: None of futures is complete"); - } - }); - } - - @VisibleForTesting - static class PendingRequests { - final NavigableMap>>> - requests = new TreeMap<>(); - private UInt64 noWaitSlot = UInt64.ZERO; - - void add( - final DataColumnSlotAndIdentifier columnId, - final SafeFuture> promise) { - final boolean cancelImmediately; - synchronized (this) { - if (columnId.slot().isLessThan(noWaitSlot)) { - cancelImmediately = true; - } else { - clearCancelledPendingRequests(); - requests.computeIfAbsent(columnId, __ -> new ArrayList<>()).add(promise); - cancelImmediately = false; - } - } - if (cancelImmediately) { - promise.complete(Optional.empty()); - } - } - - synchronized List>> remove( - final DataColumnSlotAndIdentifier columnId) { - final List>> ret = requests.remove(columnId); - return ret == null ? Collections.emptyList() : ret; - } - - void setNoWaitSlot(final UInt64 tillSlotExclusive) { - final List>> toCancel; - synchronized (this) { - this.noWaitSlot = tillSlotExclusive; - final SortedMap>>> - toRemove = - requests.headMap( - DataColumnSlotAndIdentifier.minimalComparableForSlot(tillSlotExclusive)); - toCancel = toRemove.values().stream().flatMap(Collection::stream).toList(); - toRemove.clear(); - } - toCancel.forEach(future -> future.complete(Optional.empty())); - } - - private void clearCancelledPendingRequests() { - requests.values().forEach(promises -> promises.removeIf(CompletableFuture::isDone)); - requests.entrySet().removeIf(e -> e.getValue().isEmpty()); - } - } -} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java deleted file mode 100644 index 0ff192c7907..00000000000 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/datacolumns/DasLongPollCustodyTest.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2024 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.statetransition.datacolumns; - -import static java.time.Duration.ofMillis; -import static java.time.Duration.ofSeconds; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.function.Supplier; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; -import tech.pegasys.teku.infrastructure.events.FutureValueObserver; -import tech.pegasys.teku.infrastructure.time.StubTimeProvider; -import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.SpecMilestone; -import tech.pegasys.teku.spec.TestSpecFactory; -import tech.pegasys.teku.spec.config.SpecConfigFulu; -import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar; -import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; -import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlockHeader; -import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier; -import tech.pegasys.teku.spec.util.DataStructureUtil; -import tech.pegasys.teku.statetransition.blobs.RemoteOrigin; -import tech.pegasys.teku.statetransition.datacolumns.db.DataColumnSidecarDB; -import tech.pegasys.teku.statetransition.datacolumns.db.DataColumnSidecarDbAccessor; -import tech.pegasys.teku.statetransition.datacolumns.db.DelayedDasDb; - -@SuppressWarnings({"JavaCase", "FutureReturnValueIgnored"}) -public class DasLongPollCustodyTest { - final StubTimeProvider stubTimeProvider = StubTimeProvider.withTimeInSeconds(0); - final StubAsyncRunner stubAsyncRunner = new StubAsyncRunner(stubTimeProvider); - - static final Spec spec = TestSpecFactory.createMinimalFulu(); - final DataColumnSidecarDB db = new DataColumnSidecarDBStub(); - final Duration dbDelay = ofMillis(5); - final DelayedDasDb delayedDb = new DelayedDasDb(db, stubAsyncRunner, dbDelay); - final DataColumnSidecarDbAccessor dbAccessor = - DataColumnSidecarDbAccessor.builder(delayedDb).spec(spec).build(); - final CanonicalBlockResolverStub blockResolver = new CanonicalBlockResolverStub(spec); - final CustodyGroupCountManager custodyGroupCountManager = mock(CustodyGroupCountManager.class); - final Supplier custodyGroupCountManagerSupplier = - () -> custodyGroupCountManager; - static final SpecConfigFulu config = - SpecConfigFulu.required(spec.forMilestone(SpecMilestone.FULU).getConfig()); - static final int groupCount = config.getNumberOfCustodyGroups(); - static final FutureValueObserver custodyGroupCountProvider = new FutureValueObserver<>(); - - final DataColumnSidecarCustodyImpl custodyImpl = - new DataColumnSidecarCustodyImpl( - spec, - blockResolver, - dbAccessor, - MinCustodyPeriodSlotCalculator.createFromSpec(spec), - custodyGroupCountManagerSupplier, - custodyGroupCountProvider); - - private final DataStructureUtil dataStructureUtil = new DataStructureUtil(0, spec); - private final Duration currentSlotTimeout = ofSeconds(3); - private final DasLongPollCustody custody = - new DasLongPollCustody(custodyImpl, stubAsyncRunner, __ -> currentSlotTimeout); - - private final BeaconBlock block10 = blockResolver.addBlock(10, true); - private final DataColumnSidecar sidecar10_0 = createSidecar(block10, 0); - private final DataColumnSlotAndIdentifier columnId10_0 = - DataColumnSlotAndIdentifier.fromDataColumn(sidecar10_0); - private final DataColumnSidecar sidecar10_1 = createSidecar(block10, 1); - private final DataColumnSlotAndIdentifier columnId10_1 = - DataColumnSlotAndIdentifier.fromDataColumn(sidecar10_1); - - private DataColumnSidecar createSidecar(final BeaconBlock block, final int column) { - return dataStructureUtil.randomDataColumnSidecar(createSigned(block), UInt64.valueOf(column)); - } - - private SignedBeaconBlockHeader createSigned(final BeaconBlock block) { - return dataStructureUtil.signedBlock(block).asHeader(); - } - - private void advanceTimeGradually(final Duration delta) { - for (long i = 0; i < delta.toMillis(); i++) { - stubTimeProvider.advanceTimeBy(ofMillis(1)); - stubAsyncRunner.executeDueActionsRepeatedly(); - } - } - - @BeforeAll - public static void setup() { - custodyGroupCountProvider.complete(consumer -> consumer.onValueChanged(groupCount)); - } - - @Test - void testLongPollingColumnRequest() throws Exception { - SafeFuture> fRet0 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0 = custody.hasCustodyDataColumnSidecar(columnId10_0); - SafeFuture> fRet0_1 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0_1 = custody.hasCustodyDataColumnSidecar(columnId10_0); - SafeFuture> fRet1 = - custody.getCustodyDataColumnSidecar(columnId10_1); - SafeFuture fHas1 = custody.hasCustodyDataColumnSidecar(columnId10_1); - - advanceTimeGradually(currentSlotTimeout.minus(dbDelay).minus(ofMillis(1))); - - custody.onSlot(UInt64.valueOf(10)); - - assertThat(fRet0).isNotDone(); - assertThat(fRet0_1).isNotDone(); - assertThat(fRet1).isNotDone(); - - custody.onNewValidatedDataColumnSidecar(sidecar10_0, RemoteOrigin.GOSSIP); - - advanceTimeGradually(dbDelay); - - assertThat(fRet0).isCompletedWithValue(Optional.of(sidecar10_0)); - assertThat(fHas0).isCompletedWithValue(true); - assertThat(fRet0_1).isCompletedWithValue(Optional.of(sidecar10_0)); - assertThat(fHas0_1).isCompletedWithValue(true); - assertThat(fRet1).isNotDone(); - assertThat(fHas1).isNotDone(); - - advanceTimeGradually(currentSlotTimeout); - - assertThat(fRet1).isCompletedWithValue(Optional.empty()); - assertThat(fHas1).isCompletedWithValue(false); - } - - @Test - void testPendingRequestIsExecutedWhenLongReadQuickWrite() throws Exception { - // long DB read - delayedDb.setDelay(ofMillis(10)); - SafeFuture> fRet0 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0 = custody.hasCustodyDataColumnSidecar(columnId10_0); - advanceTimeGradually(ofMillis(1)); - - // quicker DB write - delayedDb.setDelay(ofMillis(5)); - custody.onNewValidatedDataColumnSidecar(sidecar10_0, RemoteOrigin.GOSSIP); - - advanceTimeGradually(ofMillis(10)); - assertThat(fRet0).isCompletedWithValue(Optional.ofNullable(sidecar10_0)); - assertThat(fHas0).isCompletedWithValue(true); - } - - @Test - void testPendingRequestIsExecutedWhenLongWriteQuickRead() throws Exception { - // long DB write - delayedDb.setDelay(ofMillis(10)); - when(custodyGroupCountManager.getCustodyColumnIndices()) - .thenReturn( - List.of(UInt64.valueOf(0), UInt64.valueOf(2), UInt64.valueOf(3), UInt64.valueOf(4))); - custody.onNewValidatedDataColumnSidecar(sidecar10_0, RemoteOrigin.GOSSIP); - - advanceTimeGradually(ofMillis(1)); - // quicker DB read - delayedDb.setDelay(ofMillis(5)); - SafeFuture> fRet0 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0 = custody.hasCustodyDataColumnSidecar(columnId10_0); - - advanceTimeGradually(ofMillis(50)); - assertThat(fRet0).isCompletedWithValue(Optional.ofNullable(sidecar10_0)); - assertThat(fHas0).isCompletedWithValue(true); - } - - @Test - void testEmptyIsNotReturnedImmediatelyAtBeginningOfCurrentSlot() { - custody.onSlot(UInt64.valueOf(9)); - advanceTimeGradually(currentSlotTimeout.plusMillis(100)); - - custody.onSlot(UInt64.valueOf(10)); - SafeFuture> fRet0 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0 = custody.hasCustodyDataColumnSidecar(columnId10_0); - - advanceTimeGradually(ofMillis(100)); - assertThat(fRet0).isNotDone(); - assertThat(fHas0).isNotDone(); - - advanceTimeGradually(currentSlotTimeout); - assertThat(fRet0).isCompletedWithValue(Optional.empty()); - assertThat(fHas0).isCompletedWithValue(false); - } - - @Test - void testOptionalEmptyIsReturnedOnTimeout() { - SafeFuture> fRet0 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0 = custody.hasCustodyDataColumnSidecar(columnId10_0); - - custody.onSlot(UInt64.valueOf(9)); - advanceTimeGradually(currentSlotTimeout.plusMillis(100)); - - assertThat(fRet0).isNotDone(); - assertThat(fHas0).isNotDone(); - - custody.onSlot(UInt64.valueOf(10)); - - advanceTimeGradually(currentSlotTimeout.minusMillis(10)); - assertThat(fRet0).isNotDone(); - assertThat(fHas0).isNotDone(); - - advanceTimeGradually(Duration.ofMillis(110)); - assertThat(fRet0).isCompletedWithValue(Optional.empty()); - assertThat(fHas0).isCompletedWithValue(false); - } - - @Test - void testOptionalEmptyIsReturnedImmediatelyForPastSlot() { - custody.onSlot(UInt64.valueOf(10)); - advanceTimeGradually(currentSlotTimeout); - - SafeFuture> fRet = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0 = custody.hasCustodyDataColumnSidecar(columnId10_0); - advanceTimeGradually(Duration.ofMillis(20)); // more than db delay - assertThat(fRet).isCompletedWithValue(Optional.empty()); - assertThat(fHas0).isCompletedWithValue(false); - } - - @Test - void testTimeoutOccursForAll() { - SafeFuture> fRet0 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas0 = custody.hasCustodyDataColumnSidecar(columnId10_0); - SafeFuture> fRet1 = - custody.getCustodyDataColumnSidecar(columnId10_0); - SafeFuture fHas1 = custody.hasCustodyDataColumnSidecar(columnId10_1); - - custody.onSlot(UInt64.valueOf(9)); - advanceTimeGradually(currentSlotTimeout.plusMillis(100)); - custody.onSlot(UInt64.valueOf(10)); - advanceTimeGradually(currentSlotTimeout.minusMillis(10)); - - assertThat(fRet0).isNotDone(); - assertThat(fHas0).isNotDone(); - assertThat(fRet1).isNotDone(); - assertThat(fHas1).isNotDone(); - - advanceTimeGradually(Duration.ofMillis(110)); - assertThat(fRet0).isCompletedWithValue(Optional.empty()); - assertThat(fHas0).isCompletedWithValue(false); - assertThat(fRet1).isCompletedWithValue(Optional.empty()); - assertThat(fHas1).isCompletedWithValue(false); - } -} diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 53fb1bc42a2..e9644fc9e25 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -164,7 +164,6 @@ import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManager; import tech.pegasys.teku.statetransition.datacolumns.CustodyGroupCountManagerImpl; import tech.pegasys.teku.statetransition.datacolumns.DasCustodySync; -import tech.pegasys.teku.statetransition.datacolumns.DasLongPollCustody; import tech.pegasys.teku.statetransition.datacolumns.DasPreSampler; import tech.pegasys.teku.statetransition.datacolumns.DasSamplerBasic; import tech.pegasys.teku.statetransition.datacolumns.DasSamplerManager; @@ -895,21 +894,9 @@ protected void initDasCustody() { eventChannels.subscribe(SlotEventsChannel.class, dataColumnSidecarCustodyImpl); eventChannels.subscribe(FinalizedCheckpointChannel.class, dataColumnSidecarCustodyImpl); - final DasLongPollCustody.GossipWaitTimeoutCalculator gossipWaitTimeoutCalculator = - slot -> { - Duration slotDuration = - Duration.ofSeconds(spec.atSlot(slot).getConfig().getSecondsPerSlot()); - return slotDuration.dividedBy(3); - }; - - final DasLongPollCustody dasLongPollCustody = - new DasLongPollCustody( - dataColumnSidecarCustodyImpl, dasAsyncRunner, gossipWaitTimeoutCalculator); - eventChannels.subscribe(SlotEventsChannel.class, dasLongPollCustody); - final DataColumnSidecarByRootCustody dataColumnSidecarByRootCustody = new DataColumnSidecarByRootCustodyImpl( - dasLongPollCustody, + dataColumnSidecarCustodyImpl, combinedChainDataClient, UInt64.valueOf(slotsPerEpoch) .times(DataColumnSidecarByRootCustodyImpl.DEFAULT_MAX_CACHE_SIZE_EPOCHS)); From 90e4116c1f55c88677e084c68523f02994a03c9c Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Wed, 22 Oct 2025 18:01:00 +0200 Subject: [PATCH 2/2] spotlessly --- .../teku/services/beaconchain/BeaconChainController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index e9644fc9e25..c9ba4555cf7 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -896,7 +896,7 @@ protected void initDasCustody() { final DataColumnSidecarByRootCustody dataColumnSidecarByRootCustody = new DataColumnSidecarByRootCustodyImpl( - dataColumnSidecarCustodyImpl, + dataColumnSidecarCustodyImpl, combinedChainDataClient, UInt64.valueOf(slotsPerEpoch) .times(DataColumnSidecarByRootCustodyImpl.DEFAULT_MAX_CACHE_SIZE_EPOCHS));