From c67ede4c4b38c791f79991790bf1cd43f59d37b3 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 24 Mar 2025 14:47:15 +0100 Subject: [PATCH 1/7] allow reorg during sync --- .../sync/DefaultSyncServiceFactory.java | 5 ++ .../sync/forward/multipeer/BatchSync.java | 12 ++++ .../multipeer/MultipeerSyncService.java | 2 + .../beacon/sync/forward/multipeer/Sync.java | 7 +++ .../forward/multipeer/SyncController.java | 19 +++++- .../forward/multipeer/SyncReorgManager.java | 59 +++++++++++++++++++ .../forward/multipeer/SyncControllerTest.java | 8 ++- .../forkchoice/ForkChoice.java | 16 +++++ .../forkchoice/ForkChoiceTrigger.java | 5 ++ .../beaconchain/BeaconChainController.java | 1 + .../protoarray/ForkChoiceStrategy.java | 10 ++++ .../teku/storage/protoarray/ProtoArray.java | 43 ++++++++++++++ 12 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index a98f47f0fd8..5fd126c5381 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -26,6 +26,7 @@ import tech.pegasys.teku.beacon.sync.forward.ForwardSync; import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService; import tech.pegasys.teku.beacon.sync.forward.multipeer.MultipeerSyncService; +import tech.pegasys.teku.beacon.sync.forward.multipeer.SyncReorgManager; import tech.pegasys.teku.beacon.sync.forward.singlepeer.SinglePeerSyncServiceFactory; import tech.pegasys.teku.beacon.sync.gossip.blobs.RecentBlobSidecarsFetcher; import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService; @@ -72,6 +73,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory { private final PendingPool pendingBlocks; private final PendingPool pendingAttestations; private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool; + private final SyncReorgManager syncReorgManager; private final int getStartupTargetPeerCount; private final AsyncBLSSignatureVerifier signatureVerifier; private final Duration startupTimeout; @@ -94,6 +96,7 @@ public DefaultSyncServiceFactory( final PendingPool pendingBlocks, final PendingPool pendingAttestations, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, + final SyncReorgManager syncReorgManager, final int getStartupTargetPeerCount, final SignatureVerificationService signatureVerifier, final Duration startupTimeout, @@ -114,6 +117,7 @@ public DefaultSyncServiceFactory( this.pendingBlocks = pendingBlocks; this.pendingAttestations = pendingAttestations; this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool; + this.syncReorgManager = syncReorgManager; this.getStartupTargetPeerCount = getStartupTargetPeerCount; this.signatureVerifier = signatureVerifier; this.startupTimeout = startupTimeout; @@ -198,6 +202,7 @@ protected ForwardSyncService createForwardSyncService() { blobSidecarManager, blockBlobSidecarsTrackersPool, syncPreImportBlockChannel, + syncReorgManager, syncConfig.getForwardSyncBatchSize(), syncConfig.getForwardSyncMaxPendingBatches(), syncConfig.getForwardSyncMaxBlocksPerMinute(), diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java index da4d48a2ad3..96e509190c1 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSync.java @@ -34,6 +34,7 @@ import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; +import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.time.TimeProvider; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary; @@ -45,6 +46,8 @@ public class BatchSync implements Sync { private static final Logger LOG = LogManager.getLogger(); private static final Duration PAUSE_ON_SERVICE_OFFLINE_OR_DAS_CHECK = Duration.ofSeconds(5); + private final Subscribers subscribers = Subscribers.create(true); + private final EventThread eventThread; private final AsyncRunner asyncRunner; private final RecentChainData recentChainData; @@ -122,6 +125,11 @@ public static BatchSync create( syncPreImportBlockChannel); } + @Override + public long subscribeToBlocksImportedEvent(final BlocksImportedSubscriber subscriber) { + return subscribers.subscribe(subscriber); + } + /** * Begin a sync to the specified target chain. If a sync was previously in progress to a different * chain, the sync will switch to this new chain. @@ -412,6 +420,10 @@ private void onImportComplete( isCurrentlyImportingBatch(importedBatch), "Received import complete for batch that shouldn't have been importing"); importingBatch = Optional.empty(); + importedBatch + .getLastBlock() + .ifPresent( + lastBlock -> subscribers.deliver(subscriber -> subscriber.onBlocksImported(lastBlock))); if (switchingBranches) { // We switched to a different chain while this was importing. Can't infer anything about other // batches from this result but should still penalise the peer that sent it to us. diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java index 6801723827f..f7c3b1dda2d 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java @@ -78,6 +78,7 @@ public static MultipeerSyncService create( final BlobSidecarManager blobSidecarManager, final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool, final SyncPreImportBlockChannel syncPreImportBlockChannel, + final SyncReorgManager syncReorgManager, final int batchSize, final int maxPendingBatches, final int maxBlocksPerMinute, @@ -120,6 +121,7 @@ eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()), finalizedTargetChains, nonfinalizedTargetChains, spec.getSlotsPerEpoch(recentChainData.getCurrentSlot().orElse(UInt64.ZERO))), + syncReorgManager, batchSync); final PeerChainTracker peerChainTracker = new PeerChainTracker( diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java index 464758d7d49..d59c0f76832 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java @@ -18,6 +18,7 @@ import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; public interface Sync { @@ -43,4 +44,10 @@ record SyncProgress( boolean importing, SlotAndBlockRoot targetChainHead, int targetChainPeers) {} + + long subscribeToBlocksImportedEvent(BlocksImportedSubscriber subscriber); + + interface BlocksImportedSubscriber { + void onBlocksImported(SignedBeaconBlock lastImportedBlock); + } } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java index b18c305a999..311fb803a76 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncController.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import tech.pegasys.teku.beacon.sync.events.SyncingStatus; import tech.pegasys.teku.beacon.sync.forward.ForwardSync.SyncSubscriber; +import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -28,9 +29,10 @@ import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; import tech.pegasys.teku.infrastructure.subscribers.Subscribers; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.storage.client.RecentChainData; -public class SyncController { +public class SyncController implements BlocksImportedSubscriber { private static final Logger LOG = LogManager.getLogger(); private final Subscribers subscribers = Subscribers.create(true); @@ -40,6 +42,7 @@ public class SyncController { private final RecentChainData recentChainData; private final SyncTargetSelector syncTargetSelector; private final Sync sync; + private final SyncReorgManager syncReorgManager; /** * The current sync. When empty, no sync has started, otherwise contains the details of the last @@ -55,12 +58,26 @@ public SyncController( final Executor subscriberExecutor, final RecentChainData recentChainData, final SyncTargetSelector syncTargetSelector, + final SyncReorgManager syncReorgManager, final Sync sync) { this.eventThread = eventThread; this.subscriberExecutor = subscriberExecutor; this.recentChainData = recentChainData; this.syncTargetSelector = syncTargetSelector; + this.syncReorgManager = syncReorgManager; this.sync = sync; + sync.subscribeToBlocksImportedEvent(this); + } + + @Override + public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) { + eventThread.execute( + () -> { + if (isSyncSpeculative()) { + return; + } + syncReorgManager.onBlocksImported(lastImportedBlock); + }); } /** diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java new file mode 100644 index 00000000000..075aeb7836a --- /dev/null +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java @@ -0,0 +1,59 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beacon.sync.forward.multipeer; + +import java.util.Optional; +import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; +import tech.pegasys.teku.storage.client.ChainHead; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class SyncReorgManager implements BlocksImportedSubscriber { + static final int SLOT_REORG_THRESHOLD = 10; + + private final RecentChainData recentChainData; + private final ForkChoiceTrigger forkChoiceTrigger; + + public SyncReorgManager( + final RecentChainData recentChainData, final ForkChoiceTrigger forkChoiceTrigger) { + this.recentChainData = recentChainData; + this.forkChoiceTrigger = forkChoiceTrigger; + } + + @Override + public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) { + + final Optional currentHead = recentChainData.getChainHead(); + + if (currentHead.isEmpty()) { + return; + } + + if (lastImportedBlock.getRoot().equals(currentHead.get().getRoot())) { + return; + } + + if (currentHead + .get() + .getSlot() + .plus(SLOT_REORG_THRESHOLD) + .isGreaterThan(lastImportedBlock.getSlot())) { + return; + } + + forkChoiceTrigger.reorgToHeadWhileSyncing( + currentHead.get().getRoot(), lastImportedBlock.getRoot()); + } +} diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java index 1cec9ca109b..8cb2ffb7988 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java @@ -48,12 +48,18 @@ class SyncControllerTest { private final SyncTargetSelector syncTargetSelector = mock(SyncTargetSelector.class); private final RecentChainData recentChainData = mock(RecentChainData.class); private final Executor subscriberExecutor = mock(Executor.class); + private final SyncReorgManager syncReorgManager = mock(SyncReorgManager.class); private final TargetChain targetChain = chainWith(dataStructureUtil.randomSlotAndBlockRoot()); private final SyncController syncController = new SyncController( - eventThread, subscriberExecutor, recentChainData, syncTargetSelector, sync); + eventThread, + subscriberExecutor, + recentChainData, + syncTargetSelector, + syncReorgManager, + sync); private static final UInt64 HEAD_SLOT = UInt64.valueOf(2338); @BeforeEach diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java index 2af8992a81b..51fe73a62da 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java @@ -320,6 +320,22 @@ public void onTick( performanceRecord.ifPresent(TickProcessingPerformance::deferredAttestationsApplied); } + public void reorgToHeadWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { + onForkChoiceThread( + () -> { + final ForkChoiceStrategy forkChoiceStrategy = getForkChoiceStrategy(); + final Optional commonAncestor = + forkChoiceStrategy.findCommonAncestor(oldHeadRoot, newHeadRoot); + if (commonAncestor.isEmpty()) { + return; + } + + forkChoiceStrategy.reorgToHeadWhileSyncing( + oldHeadRoot, newHeadRoot, commonAncestor.get().getBlockRoot()); + }) + .ifExceptionGetsHereRaiseABug(); + } + private void initializeProtoArrayForkChoice() { processHead().join(); } diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java index 65c20a8a677..7b39f49954b 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.statetransition.forkchoice; +import org.apache.tuweni.bytes.Bytes32; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -84,4 +85,8 @@ public boolean isForkChoiceOverrideLateBlockEnabled() { public SafeFuture prepareForAttestationProduction(final UInt64 slot) { return forkChoiceRatchet.ensureForkChoiceCompleteForSlot(slot); } + + public void reorgToHeadWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { + forkChoice.reorgToHeadWhileSyncing(oldHeadRoot, newHeadRoot); + } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 3d90131430f..00186083400 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -1825,6 +1825,7 @@ protected SyncServiceFactory createSyncServiceFactory() { pendingBlocks, pendingAttestations, blockBlobSidecarsTrackersPool, + new SyncReorgManager(recentChainData, forkChoiceTrigger), beaconConfig.eth2NetworkConfig().getStartupTargetPeerCount(), signatureVerificationService, Duration.ofSeconds(beaconConfig.eth2NetworkConfig().getStartupTimeoutSeconds()), diff --git a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java index 8b8810ce716..30d7c679680 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java @@ -136,6 +136,16 @@ public Bytes32 applyPendingVotes( } } + public void reorgToHeadWhileSyncing( + final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot, final Bytes32 commonAncestorRoot) { + protoArrayLock.writeLock().lock(); + try { + protoArray.reorgToHeadWhileSyncing(oldHeadRoot, newHeadRoot, commonAncestorRoot); + } finally { + protoArrayLock.writeLock().unlock(); + } + } + public void onAttestation(final VoteUpdater voteUpdater, final IndexedAttestation attestation) { votesLock.writeLock().lock(); try { diff --git a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java index 0284e1f2a5b..2fe4c843305 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java @@ -189,6 +189,49 @@ public void setInitialCanonicalBlockRoot(final Bytes32 initialCanonicalBlockRoot applyToNodes(this::updateBestDescendantOfParent); } + public void reorgToHeadWhileSyncing( + final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot, final Bytes32 commonAncestorRoot) { + final Optional oldHead = getProtoNode(oldHeadRoot); + final Optional newHead = getProtoNode(newHeadRoot); + final Optional commonAncestor = getProtoNode(commonAncestorRoot); + + if (oldHead.isEmpty() || newHead.isEmpty() || commonAncestor.isEmpty()) { + return; + } + + final ProtoNode oldHeadNode = oldHead.get(); + final ProtoNode newHeadNode = newHead.get(); + final ProtoNode commonAncestorNode = commonAncestor.get(); + + // check we are in syncing mode where all nodes have a max weight of 1 + if (commonAncestorNode.getWeight().isGreaterThan(1)) { + return; + } + + // let's set weight to 0 for all nodes from old head to common ancestor + ProtoNode node = oldHeadNode; + while (!node.getBlockRoot().equals(commonAncestorRoot)) { + node.adjustWeight(-node.getWeight().longValue()); + if (node.getParentIndex().isEmpty()) { + break; + } + node = getNodeByIndex(node.getParentIndex().get()); + } + + // let's set weight to 1 from the best descendant of the new head up to the first non-zero + // weight node + node = newHeadNode.getBestDescendantIndex().map(this::getNodeByIndex).orElse(newHeadNode); + while (node.getWeight().isZero()) { + node.adjustWeight(1); + if (node.getParentIndex().isEmpty()) { + break; + } + node = getNodeByIndex(node.getParentIndex().get()); + } + + applyToNodes(this::updateBestDescendantOfParent); + } + /** * Follows the best-descendant links to find the best-block (i.e. head-block), including any * optimistic nodes which have not yet been fully validated. From a58138f76ca4c0cfff79805c36750423f5f4c7c8 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 24 Mar 2025 20:52:15 +0100 Subject: [PATCH 2/7] refactor and tests --- .../forward/multipeer/SyncReorgManager.java | 7 +-- .../sync/forward/multipeer/BatchSyncTest.java | 22 +++++++ .../forward/multipeer/SyncControllerTest.java | 33 ++++++++++ .../forkchoice/ForkChoice.java | 4 +- .../forkchoice/ForkChoiceTrigger.java | 4 +- .../protoarray/ForkChoiceStrategy.java | 4 +- .../teku/storage/protoarray/ProtoArray.java | 13 +++- .../storage/protoarray/ProtoArrayTest.java | 62 +++++++++++++++++++ 8 files changed, 138 insertions(+), 11 deletions(-) diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java index 075aeb7836a..f0f4f1659b9 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManager.java @@ -21,7 +21,7 @@ import tech.pegasys.teku.storage.client.RecentChainData; public class SyncReorgManager implements BlocksImportedSubscriber { - static final int SLOT_REORG_THRESHOLD = 10; + static final int REORG_SLOT_THRESHOLD = 10; private final RecentChainData recentChainData; private final ForkChoiceTrigger forkChoiceTrigger; @@ -48,12 +48,11 @@ public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) { if (currentHead .get() .getSlot() - .plus(SLOT_REORG_THRESHOLD) + .plus(REORG_SLOT_THRESHOLD) .isGreaterThan(lastImportedBlock.getSlot())) { return; } - forkChoiceTrigger.reorgToHeadWhileSyncing( - currentHead.get().getRoot(), lastImportedBlock.getRoot()); + forkChoiceTrigger.reorgWhileSyncing(currentHead.get().getRoot(), lastImportedBlock.getRoot()); } } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java index 65fbfa62582..d8e44dae764 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchSyncTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import tech.pegasys.teku.beacon.sync.events.SyncPreImportBlockChannel; +import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress; import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch; import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.StubBatchFactory; @@ -662,6 +663,27 @@ void shouldRemoveBatchFromActiveSetWhenImportCompletesSuccessfully() { assertBatchNotActive(batch0); } + @Test + void shouldNotifyOnBlocksImported() { + assertThat(sync.syncToChain(targetChain)).isNotDone(); + final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class); + + sync.subscribeToBlocksImportedEvent(subscriber); + + final Batch batch0 = batches.get(0); + final Batch batch1 = batches.get(1); + batches.receiveBlocks(batch0, chainBuilder.generateBlockAtSlot(1).getBlock()); + batches.receiveBlocks( + batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock()); + + assertBatchImported(batch0); + verifyNoInteractions(subscriber); + batches.getImportResult(batch0).complete(IMPORTED_ALL_BLOCKS); + + verify(subscriber).onBlocksImported(batch0.getLastBlock().orElseThrow()); + verifyNoMoreInteractions(subscriber); + } + @Test void shouldSwitchChains() { // Start sync to first chain diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java index 8cb2ffb7988..226a1c83481 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncControllerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChainTestUtil.chainWith; @@ -37,6 +38,7 @@ import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.storage.client.RecentChainData; @@ -65,6 +67,7 @@ class SyncControllerTest { @BeforeEach void setUp() { when(recentChainData.getHeadSlot()).thenReturn(HEAD_SLOT); + verify(sync).subscribeToBlocksImportedEvent(any()); } @Test @@ -248,6 +251,36 @@ void shouldNotNotifySubscribersWhenRunningSpeculativeTarget() { verify(subscriberExecutor, never()).execute(any()); } + @Test + void shouldForwardOnBlocksImportedWhenNonSpeculativeSync() { + final SafeFuture syncResult = new SafeFuture<>(); + when(syncTargetSelector.selectSyncTarget(any())) + .thenReturn(Optional.of(SyncTarget.speculativeTarget(targetChain))); + when(sync.syncToChain(targetChain)).thenReturn(syncResult); + + onTargetChainsUpdated(); + + syncController.onBlocksImported(dataStructureUtil.randomSignedBeaconBlock()); + + verifyNoInteractions(syncReorgManager); + } + + @Test + void shouldForwardOnBlocksImported() { + when(syncTargetSelector.selectSyncTarget(Optional.empty())) + .thenReturn(Optional.of(SyncTarget.nonfinalizedTarget(targetChain))); + + when(sync.syncToChain(targetChain)).thenReturn(new SafeFuture<>()); + + onTargetChainsUpdated(); + + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(); + + syncController.onBlocksImported(block); + + verify(syncReorgManager).onBlocksImported(block); + } + private void assertSyncSubscriberNotified( final SyncSubscriber subscriber, final boolean syncing) { // Shouldn't notify on the event thread diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java index 51fe73a62da..01301fb9281 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java @@ -320,7 +320,7 @@ public void onTick( performanceRecord.ifPresent(TickProcessingPerformance::deferredAttestationsApplied); } - public void reorgToHeadWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { + public void reorgWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { onForkChoiceThread( () -> { final ForkChoiceStrategy forkChoiceStrategy = getForkChoiceStrategy(); @@ -330,7 +330,7 @@ public void reorgToHeadWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 new return; } - forkChoiceStrategy.reorgToHeadWhileSyncing( + forkChoiceStrategy.reorgWhileSyncing( oldHeadRoot, newHeadRoot, commonAncestor.get().getBlockRoot()); }) .ifExceptionGetsHereRaiseABug(); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java index 7b39f49954b..f82c92ef372 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java @@ -86,7 +86,7 @@ public SafeFuture prepareForAttestationProduction(final UInt64 slot) { return forkChoiceRatchet.ensureForkChoiceCompleteForSlot(slot); } - public void reorgToHeadWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { - forkChoice.reorgToHeadWhileSyncing(oldHeadRoot, newHeadRoot); + public void reorgWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot) { + forkChoice.reorgWhileSyncing(oldHeadRoot, newHeadRoot); } } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java index 30d7c679680..e6fe93df0ad 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ForkChoiceStrategy.java @@ -136,11 +136,11 @@ public Bytes32 applyPendingVotes( } } - public void reorgToHeadWhileSyncing( + public void reorgWhileSyncing( final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot, final Bytes32 commonAncestorRoot) { protoArrayLock.writeLock().lock(); try { - protoArray.reorgToHeadWhileSyncing(oldHeadRoot, newHeadRoot, commonAncestorRoot); + protoArray.reorgWhileSyncing(oldHeadRoot, newHeadRoot, commonAncestorRoot); } finally { protoArrayLock.writeLock().unlock(); } diff --git a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java index 2fe4c843305..c9b61a17dcf 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/protoarray/ProtoArray.java @@ -189,7 +189,18 @@ public void setInitialCanonicalBlockRoot(final Bytes32 initialCanonicalBlockRoot applyToNodes(this::updateBestDescendantOfParent); } - public void reorgToHeadWhileSyncing( + /** + * This function is supposed to be called while syncing. It assumes the protoarray to be in a + * state where all nodes have been just initialized (see {@link #setInitialCanonicalBlockRoot}) + * and have a weight of 1 on the canonical chain and 0 on all other chains. + * + *

It may also do the reorg if we enter syncing mode and, while syncing, we decide to sync to a + * new chain so that the common ancestor has not received any votes yet. + * + *

The function set all weights to 0 to the reorged chain and set all weights to 1 to the new + * chain. + */ + public void reorgWhileSyncing( final Bytes32 oldHeadRoot, final Bytes32 newHeadRoot, final Bytes32 commonAncestorRoot) { final Optional oldHead = getProtoNode(oldHeadRoot); final Optional newHead = getProtoNode(newHeadRoot); diff --git a/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java b/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java index 04f46faf929..ad0e6f64b5d 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; +import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; import java.util.Collections; import java.util.List; @@ -552,6 +553,67 @@ void setInitialCanonicalBlockRoot_shouldEnsureCanonicalHeadIsSetWhenBlockRootIsN assertHead(block2a); } + @Test + void reorgWhileSyncing_shouldReorgIfWeightsAreCompatibleWithSyncingModeWhenNotInitialized() { + final Bytes32 block1 = dataStructureUtil.randomBytes32(); + addValidBlock(1, block1, GENESIS_CHECKPOINT.getRoot()); + addValidBlock(2, block2a, block1); + addValidBlock(2, block2b, block1); + addValidBlock(3, block3a, block2a); + addValidBlock(4, block4a, block3a); + + protoArray.applyScoreChanges(computeDeltas(), ZERO, GENESIS_CHECKPOINT, GENESIS_CHECKPOINT); + + // due to tie breaking block4a is the head + assertHead(block4a); + + // setting chain a as the canonical chain via non-tip block + protoArray.reorgWhileSyncing(block2b, block2a, block1); + + // block2a is now the head due to weight + assertHead(block2a); + } + + @Test + void reorgWhileSyncing_shouldReorgIfWeightsAreCompatibleWithSyncingModeWhenInitialized() { + final Bytes32 block1 = dataStructureUtil.randomBytes32(); + addValidBlock(1, block1, GENESIS_CHECKPOINT.getRoot()); + addValidBlock(2, block2a, block1); + addValidBlock(2, block2b, block1); + addValidBlock(3, block3a, block2a); + addValidBlock(4, block4a, block3a); + + protoArray.setInitialCanonicalBlockRoot(block2b); + + assertHead(block2b); + + protoArray.reorgWhileSyncing(block2b, block4a, GENESIS_CHECKPOINT.getRoot()); + + assertHead(block4a); + } + + @Test + void reorgWhileSyncing_shouldNotReorgIfWeightsAreNotCompatibleWithSyncingMode() { + final Bytes32 block1 = dataStructureUtil.randomBytes32(); + addValidBlock(1, block1, GENESIS_CHECKPOINT.getRoot()); + addValidBlock(2, block2a, block1); + addValidBlock(2, block2b, block1); + addValidBlock(3, block3a, block2a); + addValidBlock(4, block4a, block3a); + + // let's add 5 votes to block4a tip, so common ancestor at block1 will accumulate 5 votes too + protoArray.applyScoreChanges( + LongArrayList.of(0, 0, 0, 0, 0, 5), ZERO, GENESIS_CHECKPOINT, GENESIS_CHECKPOINT); + + assertHead(block4a); + assertThat(protoArray.getProtoNode(block1).orElseThrow().getWeight()) + .isEqualTo(UInt64.valueOf(5)); + + protoArray.reorgWhileSyncing(block4a, block2b, block1); + + assertHead(block4a); + } + private void assertHead(final Bytes32 expectedBlockHash) { final ProtoNode node = protoArray.getProtoNode(expectedBlockHash).orElseThrow(); assertThat( From f63355f955ecf597fbc2d65bc5002610714ba479 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 24 Mar 2025 21:07:43 +0100 Subject: [PATCH 3/7] missing unit test --- .../multipeer/SyncReorgManagerTest.java | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManagerTest.java diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManagerTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManagerTest.java new file mode 100644 index 00000000000..c4dbb7e8ba2 --- /dev/null +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/SyncReorgManagerTest.java @@ -0,0 +1,81 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beacon.sync.forward.multipeer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.Optional; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; +import tech.pegasys.teku.storage.client.ChainHead; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class SyncReorgManagerTest { + private final Spec spec = TestSpecFactory.createMinimalPhase0(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + private final RecentChainData recentChainData = mock(RecentChainData.class); + private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + + private final SyncReorgManager syncReorgManager = + new SyncReorgManager(recentChainData, forkChoiceTrigger); + + @Test + public void onBlocksImported_shouldDoNothingIfNoCurrentHead() { + when(recentChainData.getChainHead()).thenReturn(Optional.empty()); + syncReorgManager.onBlocksImported(dataStructureUtil.randomSignedBeaconBlock()); + + verifyNoInteractions(forkChoiceTrigger); + } + + @Test + public void onBlocksImported_shouldDoNothingIfLastImportedBlockIsCurrentHead() { + final SignedBlockAndState headBlock = dataStructureUtil.randomSignedBlockAndState(UInt64.ONE); + when(recentChainData.getChainHead()).thenReturn(Optional.of(ChainHead.create(headBlock))); + syncReorgManager.onBlocksImported(headBlock.getBlock()); + + verifyNoInteractions(forkChoiceTrigger); + } + + @Test + public void onBlocksImported_shouldDoNothingIfLastImportedBlockIsWithinReorgThreshold() { + final SignedBlockAndState headBlock = dataStructureUtil.randomSignedBlockAndState(UInt64.ONE); + final SignedBeaconBlock lastImportedBlock = + dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(10)); + when(recentChainData.getChainHead()).thenReturn(Optional.of(ChainHead.create(headBlock))); + syncReorgManager.onBlocksImported(lastImportedBlock); + + verifyNoInteractions(forkChoiceTrigger); + } + + @Test + public void onBlocksImported_shouldTriggerReorgWhenLastImportedBlockIsOutsideReorgThreshold() { + final SignedBlockAndState headBlock = dataStructureUtil.randomSignedBlockAndState(UInt64.ONE); + final SignedBeaconBlock lastImportedBlock = + dataStructureUtil.randomSignedBeaconBlock(UInt64.valueOf(11)); + when(recentChainData.getChainHead()).thenReturn(Optional.of(ChainHead.create(headBlock))); + syncReorgManager.onBlocksImported(lastImportedBlock); + + verify(forkChoiceTrigger).reorgWhileSyncing(headBlock.getRoot(), lastImportedBlock.getRoot()); + } +} From 3a6844d9dae3f8eff8cbd7402b75d38447246b80 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Mon, 24 Mar 2025 22:22:48 +0100 Subject: [PATCH 4/7] fix test --- .../tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java b/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java index ad0e6f64b5d..e54efada053 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/protoarray/ProtoArrayTest.java @@ -568,10 +568,10 @@ void reorgWhileSyncing_shouldReorgIfWeightsAreCompatibleWithSyncingModeWhenNotIn assertHead(block4a); // setting chain a as the canonical chain via non-tip block - protoArray.reorgWhileSyncing(block2b, block2a, block1); + protoArray.reorgWhileSyncing(block4a, block2b, block1); // block2a is now the head due to weight - assertHead(block2a); + assertHead(block2b); } @Test From 3e58b8bf3389f90faf809166a3716157f17303d1 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Tue, 25 Mar 2025 13:40:21 +0100 Subject: [PATCH 5/7] merge --- .../tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java index d59c0f76832..d5a4410138a 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java @@ -17,8 +17,8 @@ import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.unsigned.UInt64; -import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; public interface Sync { From 1805942826cae3a3a49cbd02468f02600129f171 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Tue, 25 Mar 2025 13:47:24 +0100 Subject: [PATCH 6/7] fix compilation --- .../teku/beacon/sync/forward/multipeer/Sync.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java index d5a4410138a..a4cb3ad8bd7 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/Sync.java @@ -33,6 +33,12 @@ public interface Sync { SafeFuture> getSyncProgress(); + long subscribeToBlocksImportedEvent(BlocksImportedSubscriber subscriber); + + interface BlocksImportedSubscriber { + void onBlocksImported(SignedBeaconBlock lastImportedBlock); + } + record SyncProgress( UInt64 fromSlot, UInt64 toSlot, @@ -44,10 +50,4 @@ record SyncProgress( boolean importing, SlotAndBlockRoot targetChainHead, int targetChainPeers) {} - - long subscribeToBlocksImportedEvent(BlocksImportedSubscriber subscriber); - - interface BlocksImportedSubscriber { - void onBlocksImported(SignedBeaconBlock lastImportedBlock); - } } From 6a904960daa744f25826a4dfab7f6b493330c559 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Fri, 5 Sep 2025 17:16:37 +0200 Subject: [PATCH 7/7] rebase fixes --- .../pegasys/teku/statetransition/forkchoice/ForkChoice.java | 2 +- .../teku/statetransition/forkchoice/ForkChoiceTrigger.java | 2 +- .../teku/services/beaconchain/BeaconChainController.java | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java index 01301fb9281..48dbfb6992b 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoice.java @@ -333,7 +333,7 @@ public void reorgWhileSyncing(final Bytes32 oldHeadRoot, final Bytes32 newHeadRo forkChoiceStrategy.reorgWhileSyncing( oldHeadRoot, newHeadRoot, commonAncestor.get().getBlockRoot()); }) - .ifExceptionGetsHereRaiseABug(); + .finishStackTrace(); } private void initializeProtoArrayForkChoice() { diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java index f82c92ef372..51047005347 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceTrigger.java @@ -13,12 +13,12 @@ package tech.pegasys.teku.statetransition.forkchoice; -import org.apache.tuweni.bytes.Bytes32; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.time.TimeProvider; diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 00186083400..978388d89d3 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -55,6 +55,7 @@ import tech.pegasys.teku.beacon.sync.SyncServiceFactory; import tech.pegasys.teku.beacon.sync.events.CoalescingChainHeadChannel; import tech.pegasys.teku.beacon.sync.events.SyncPreImportBlockChannel; +import tech.pegasys.teku.beacon.sync.forward.multipeer.SyncReorgManager; import tech.pegasys.teku.beacon.sync.gossip.blobs.RecentBlobSidecarsFetcher; import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetcher; import tech.pegasys.teku.beaconrestapi.BeaconRestApi;