-
Notifications
You must be signed in to change notification settings - Fork 350
Allow reorg during sync #9268
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Allow reorg during sync #9268
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,16 +21,18 @@ | |
import org.apache.logging.log4j.Logger; | ||
import tech.pegasys.teku.beacon.sync.events.SyncingStatus; | ||
import tech.pegasys.teku.beacon.sync.forward.ForwardSync.SyncSubscriber; | ||
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; | ||
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.SyncProgress; | ||
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; | ||
import tech.pegasys.teku.infrastructure.async.SafeFuture; | ||
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; | ||
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil; | ||
import tech.pegasys.teku.infrastructure.subscribers.Subscribers; | ||
import tech.pegasys.teku.infrastructure.unsigned.UInt64; | ||
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; | ||
import tech.pegasys.teku.storage.client.RecentChainData; | ||
|
||
public class SyncController { | ||
public class SyncController implements BlocksImportedSubscriber { | ||
private static final Logger LOG = LogManager.getLogger(); | ||
|
||
private final Subscribers<SyncSubscriber> subscribers = Subscribers.create(true); | ||
|
@@ -40,6 +42,7 @@ public class SyncController { | |
private final RecentChainData recentChainData; | ||
private final SyncTargetSelector syncTargetSelector; | ||
private final Sync sync; | ||
private final SyncReorgManager syncReorgManager; | ||
|
||
/** | ||
* The current sync. When empty, no sync has started, otherwise contains the details of the last | ||
|
@@ -55,12 +58,26 @@ public SyncController( | |
final Executor subscriberExecutor, | ||
final RecentChainData recentChainData, | ||
final SyncTargetSelector syncTargetSelector, | ||
final SyncReorgManager syncReorgManager, | ||
final Sync sync) { | ||
this.eventThread = eventThread; | ||
this.subscriberExecutor = subscriberExecutor; | ||
this.recentChainData = recentChainData; | ||
this.syncTargetSelector = syncTargetSelector; | ||
this.syncReorgManager = syncReorgManager; | ||
this.sync = sync; | ||
sync.subscribeToBlocksImportedEvent(this); | ||
} | ||
|
||
@Override | ||
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) { | ||
eventThread.execute( | ||
() -> { | ||
if (isSyncSpeculative()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we skip this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. speculative syncing is when we are on sync but we "hear" about a chain that we speculatively sync. So we don't want the sync reorg logic to be applied in that case (we are actually in sync in that scenario) |
||
return; | ||
} | ||
syncReorgManager.onBlocksImported(lastImportedBlock); | ||
}); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright Consensys Software Inc., 2025 | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package tech.pegasys.teku.beacon.sync.forward.multipeer; | ||
|
||
import java.util.Optional; | ||
import tech.pegasys.teku.beacon.sync.forward.multipeer.Sync.BlocksImportedSubscriber; | ||
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; | ||
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; | ||
import tech.pegasys.teku.storage.client.ChainHead; | ||
import tech.pegasys.teku.storage.client.RecentChainData; | ||
|
||
public class SyncReorgManager implements BlocksImportedSubscriber { | ||
static final int REORG_SLOT_THRESHOLD = 10; | ||
|
||
private final RecentChainData recentChainData; | ||
private final ForkChoiceTrigger forkChoiceTrigger; | ||
|
||
public SyncReorgManager( | ||
final RecentChainData recentChainData, final ForkChoiceTrigger forkChoiceTrigger) { | ||
this.recentChainData = recentChainData; | ||
this.forkChoiceTrigger = forkChoiceTrigger; | ||
} | ||
|
||
@Override | ||
public void onBlocksImported(final SignedBeaconBlock lastImportedBlock) { | ||
|
||
final Optional<ChainHead> currentHead = recentChainData.getChainHead(); | ||
|
||
if (currentHead.isEmpty()) { | ||
return; | ||
} | ||
|
||
if (lastImportedBlock.getRoot().equals(currentHead.get().getRoot())) { | ||
return; | ||
} | ||
|
||
if (currentHead | ||
.get() | ||
.getSlot() | ||
.plus(REORG_SLOT_THRESHOLD) | ||
.isGreaterThan(lastImportedBlock.getSlot())) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about this rule, we determinate best chain by votes, two chains could have similar length having a big difference in votes. I guess we don't want ot switch back and forth and it could help. Is this a purpose? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as discussed, the sync reorg logic only applies during initial syncing when all protoarray votes are 0
and it is actually ok to switch back and forth, as long as we keep as canonical the longest chain we are syncing to |
||
return; | ||
} | ||
|
||
forkChoiceTrigger.reorgWhileSyncing(currentHead.get().getRoot(), lastImportedBlock.getRoot()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 's, plural?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well cose I don't want this to look like you can subscribe to it and pretend to get all blocks.
Maybe I should rename everything to be
onBatchImported(SignedBeaconBlock lastImportedBlock)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, problem is that I put that on
Sync
interface, and notBatchSync
interface. So the concept of "batch" is not part of that...Suggestions? :)