Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.beacon.sync.forward.ForwardSync;
import tech.pegasys.teku.beacon.sync.forward.ForwardSyncService;
import tech.pegasys.teku.beacon.sync.forward.multipeer.MultipeerSyncService;
import tech.pegasys.teku.beacon.sync.forward.multipeer.SyncReorgManager;
import tech.pegasys.teku.beacon.sync.forward.singlepeer.SinglePeerSyncServiceFactory;
import tech.pegasys.teku.beacon.sync.gossip.blobs.RecentBlobSidecarsFetcher;
import tech.pegasys.teku.beacon.sync.gossip.blocks.RecentBlocksFetchService;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class DefaultSyncServiceFactory implements SyncServiceFactory {
private final PendingPool<SignedBeaconBlock> pendingBlocks;
private final PendingPool<ValidatableAttestation> pendingAttestations;
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private final SyncReorgManager syncReorgManager;
private final int getStartupTargetPeerCount;
private final AsyncBLSSignatureVerifier signatureVerifier;
private final Duration startupTimeout;
Expand All @@ -94,6 +96,7 @@ public DefaultSyncServiceFactory(
final PendingPool<SignedBeaconBlock> pendingBlocks,
final PendingPool<ValidatableAttestation> pendingAttestations,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final SyncReorgManager syncReorgManager,
final int getStartupTargetPeerCount,
final SignatureVerificationService signatureVerifier,
final Duration startupTimeout,
Expand All @@ -114,6 +117,7 @@ public DefaultSyncServiceFactory(
this.pendingBlocks = pendingBlocks;
this.pendingAttestations = pendingAttestations;
this.blockBlobSidecarsTrackersPool = blockBlobSidecarsTrackersPool;
this.syncReorgManager = syncReorgManager;
this.getStartupTargetPeerCount = getStartupTargetPeerCount;
this.signatureVerifier = signatureVerifier;
this.startupTimeout = startupTimeout;
Expand Down Expand Up @@ -198,6 +202,7 @@ protected ForwardSyncService createForwardSyncService() {
blobSidecarManager,
blockBlobSidecarsTrackersPool,
syncPreImportBlockChannel,
syncReorgManager,
syncConfig.getForwardSyncBatchSize(),
syncConfig.getForwardSyncMaxPendingBatches(),
syncConfig.getForwardSyncMaxBlocksPerMinute(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
Expand All @@ -45,6 +46,8 @@ public class BatchSync implements Sync {
private static final Logger LOG = LogManager.getLogger();
private static final Duration PAUSE_ON_SERVICE_OFFLINE_OR_DAS_CHECK = Duration.ofSeconds(5);

private final Subscribers<BlocksImportedSubscriber> subscribers = Subscribers.create(true);

private final EventThread eventThread;
private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
Expand Down Expand Up @@ -122,6 +125,11 @@ public static BatchSync create(
syncPreImportBlockChannel);
}

@Override
public long subscribeToBlocksImportedEvent(final BlocksImportedSubscriber subscriber) {
return subscribers.subscribe(subscriber);
}

/**
* Begin a sync to the specified target chain. If a sync was previously in progress to a different
* chain, the sync will switch to this new chain.
Expand Down Expand Up @@ -412,6 +420,10 @@ private void onImportComplete(
isCurrentlyImportingBatch(importedBatch),
"Received import complete for batch that shouldn't have been importing");
importingBatch = Optional.empty();
importedBatch
.getLastBlock()
.ifPresent(
lastBlock -> subscribers.deliver(subscriber -> subscriber.onBlocksImported(lastBlock)));
if (switchingBranches) {
// We switched to a different chain while this was importing. Can't infer anything about other
// batches from this result but should still penalise the peer that sent it to us.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static MultipeerSyncService create(
final BlobSidecarManager blobSidecarManager,
final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool,
final SyncPreImportBlockChannel syncPreImportBlockChannel,
final SyncReorgManager syncReorgManager,
final int batchSize,
final int maxPendingBatches,
final int maxBlocksPerMinute,
Expand Down Expand Up @@ -120,6 +121,7 @@ eventThread, blobSidecarManager, new PeerScoringConflictResolutionStrategy()),
finalizedTargetChains,
nonfinalizedTargetChains,
spec.getSlotsPerEpoch(recentChainData.getCurrentSlot().orElse(UInt64.ZERO))),
syncReorgManager,
batchSync);
final PeerChainTracker peerChainTracker =
new PeerChainTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;

public interface Sync {
Expand All @@ -32,6 +33,12 @@ public interface Sync {

SafeFuture<Optional<SyncProgress>> getSyncProgress();

long subscribeToBlocksImportedEvent(BlocksImportedSubscriber subscriber);

interface BlocksImportedSubscriber {
void onBlocksImported(SignedBeaconBlock lastImportedBlock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 's, plural?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well cose I don't want this to look like you can subscribe to it and pretend to get all blocks.
Maybe I should rename everything to be onBatchImported(SignedBeaconBlock lastImportedBlock)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, problem is that I put that on Sync interface, and not BatchSync interface. So the concept of "batch" is not part of that...
Suggestions? :)

}

record SyncProgress(
UInt64 fromSlot,
UInt64 toSlot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.beacon.sync.events.SyncingStatus;
import tech.pegasys.teku.beacon.sync.forward.ForwardSync.SyncSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress;
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.storage.client.RecentChainData;

public class SyncController {
public class SyncController implements BlocksImportedSubscriber {
private static final Logger LOG = LogManager.getLogger();

private final Subscribers<SyncSubscriber> subscribers = Subscribers.create(true);
Expand All @@ -40,6 +42,7 @@ public class SyncController {
private final RecentChainData recentChainData;
private final SyncTargetSelector syncTargetSelector;
private final Sync sync;
private final SyncReorgManager syncReorgManager;

/**
* The current sync. When empty, no sync has started, otherwise contains the details of the last
Expand All @@ -55,12 +58,26 @@ public SyncController(
final Executor subscriberExecutor,
final RecentChainData recentChainData,
final SyncTargetSelector syncTargetSelector,
final SyncReorgManager syncReorgManager,
final Sync sync) {
this.eventThread = eventThread;
this.subscriberExecutor = subscriberExecutor;
this.recentChainData = recentChainData;
this.syncTargetSelector = syncTargetSelector;
this.syncReorgManager = syncReorgManager;
this.sync = sync;
sync.subscribeToBlocksImportedEvent(this);
}

@Override
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) {
eventThread.execute(
() -> {
if (isSyncSpeculative()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we skip this case?
(sorry, I just don't understand what is speculative sync)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

speculative syncing is when we are on sync but we "hear" about a chain that we speculatively sync. So we don't want the sync reorg logic to be applied in that case (we are actually in sync in that scenario)

return;
}
syncReorgManager.onBlocksImported(lastImportedBlock);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.forward.multipeer;

import java.util.Optional;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger;
import tech.pegasys.teku.storage.client.ChainHead;
import tech.pegasys.teku.storage.client.RecentChainData;

public class SyncReorgManager implements BlocksImportedSubscriber {
static final int REORG_SLOT_THRESHOLD = 10;

private final RecentChainData recentChainData;
private final ForkChoiceTrigger forkChoiceTrigger;

public SyncReorgManager(
final RecentChainData recentChainData, final ForkChoiceTrigger forkChoiceTrigger) {
this.recentChainData = recentChainData;
this.forkChoiceTrigger = forkChoiceTrigger;
}

@Override
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) {

final Optional<ChainHead> currentHead = recentChainData.getChainHead();

if (currentHead.isEmpty()) {
return;
}

if (lastImportedBlock.getRoot().equals(currentHead.get().getRoot())) {
return;
}

if (currentHead
.get()
.getSlot()
.plus(REORG_SLOT_THRESHOLD)
.isGreaterThan(lastImportedBlock.getSlot())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this rule, we determinate best chain by votes, two chains could have similar length having a big difference in votes. I guess we don't want ot switch back and forth and it could help. Is this a purpose?

Copy link
Contributor Author

@tbenr tbenr Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, the sync reorg logic only applies during initial syncing when all protoarray votes are 0

ProtoArray::reorgWhileSyncing

    // check we are in syncing mode where all nodes have a max weight of 1
    if (commonAncestorNode.getWeight().isGreaterThan(1)) {
      return;
    }

and it is actually ok to switch back and forth, as long as we keep as canonical the longest chain we are syncing to

return;
}

forkChoiceTrigger.reorgWhileSyncing(currentHead.get().getRoot(), lastImportedBlock.getRoot());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.beacon.sync.events.SyncPreImportBlockChannel;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber;
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress;
import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch;
import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.StubBatchFactory;
Expand Down Expand Up @@ -662,6 +663,27 @@ void shouldRemoveBatchFromActiveSetWhenImportCompletesSuccessfully() {
assertBatchNotActive(batch0);
}

@Test
void shouldNotifyOnBlocksImported() {
assertThat(sync.syncToChain(targetChain)).isNotDone();
final BlocksImportedSubscriber subscriber = mock(BlocksImportedSubscriber.class);

sync.subscribeToBlocksImportedEvent(subscriber);

final Batch batch0 = batches.get(0);
final Batch batch1 = batches.get(1);
batches.receiveBlocks(batch0, chainBuilder.generateBlockAtSlot(1).getBlock());
batches.receiveBlocks(
batch1, chainBuilder.generateBlockAtSlot(batch1.getFirstSlot()).getBlock());

assertBatchImported(batch0);
verifyNoInteractions(subscriber);
batches.getImportResult(batch0).complete(IMPORTED_ALL_BLOCKS);

verify(subscriber).onBlocksImported(batch0.getLastBlock().orElseThrow());
verifyNoMoreInteractions(subscriber);
}

@Test
void shouldSwitchChains() {
// Start sync to first chain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChainTestUtil.chainWith;
Expand All @@ -37,6 +38,7 @@
import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.RecentChainData;

Expand All @@ -48,17 +50,24 @@ class SyncControllerTest {
private final SyncTargetSelector syncTargetSelector = mock(SyncTargetSelector.class);
private final RecentChainData recentChainData = mock(RecentChainData.class);
private final Executor subscriberExecutor = mock(Executor.class);
private final SyncReorgManager syncReorgManager = mock(SyncReorgManager.class);

private final TargetChain targetChain = chainWith(dataStructureUtil.randomSlotAndBlockRoot());

private final SyncController syncController =
new SyncController(
eventThread, subscriberExecutor, recentChainData, syncTargetSelector, sync);
eventThread,
subscriberExecutor,
recentChainData,
syncTargetSelector,
syncReorgManager,
sync);
private static final UInt64 HEAD_SLOT = UInt64.valueOf(2338);

@BeforeEach
void setUp() {
when(recentChainData.getHeadSlot()).thenReturn(HEAD_SLOT);
verify(sync).subscribeToBlocksImportedEvent(any());
}

@Test
Expand Down Expand Up @@ -242,6 +251,36 @@ void shouldNotNotifySubscribersWhenRunningSpeculativeTarget() {
verify(subscriberExecutor, never()).execute(any());
}

@Test
void shouldForwardOnBlocksImportedWhenNonSpeculativeSync() {
final SafeFuture<SyncResult> syncResult = new SafeFuture<>();
when(syncTargetSelector.selectSyncTarget(any()))
.thenReturn(Optional.of(SyncTarget.speculativeTarget(targetChain)));
when(sync.syncToChain(targetChain)).thenReturn(syncResult);

onTargetChainsUpdated();

syncController.onBlocksImported(dataStructureUtil.randomSignedBeaconBlock());

verifyNoInteractions(syncReorgManager);
}

@Test
void shouldForwardOnBlocksImported() {
when(syncTargetSelector.selectSyncTarget(Optional.empty()))
.thenReturn(Optional.of(SyncTarget.nonfinalizedTarget(targetChain)));

when(sync.syncToChain(targetChain)).thenReturn(new SafeFuture<>());

onTargetChainsUpdated();

final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock();

syncController.onBlocksImported(block);

verify(syncReorgManager).onBlocksImported(block);
}

private void assertSyncSubscriberNotified(
final SyncSubscriber subscriber, final boolean syncing) {
// Shouldn't notify on the event thread
Expand Down
Loading