Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 39 additions & 18 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,9 @@ func (e *Epoch) persistFinalization(finalization Finalization) error {
// or otherwise write it to the WAL in order to commit it later.
startRound := e.round
nextSeqToCommit := e.nextSeqToCommit()

e.sched.ExecuteDependents(finalization.Finalization.Digest)

if finalization.Finalization.Seq == nextSeqToCommit {
if err := e.indexFinalizations(finalization.Finalization.Round); err != nil {
e.Logger.Error("Failed to index finalizations", zap.Error(err))
Expand Down Expand Up @@ -1327,6 +1330,8 @@ func (e *Epoch) persistNotarization(notarization Notarization) error {
return err
}

e.sched.ExecuteDependents(notarization.Vote.Digest)

round := notarization.Vote.Round
for _, signer := range notarization.QC.Signers() {
if signerIndex := e.nodes.IndexOf(signer); signerIndex != -1 {
Expand Down Expand Up @@ -1559,7 +1564,10 @@ func (e *Epoch) handleBlockMessage(message *BlockMessage, from NodeID) error {

// Schedule the block to be verified once its direct predecessor have been verified,
// or if it can be verified immediately.
e.Logger.Debug("Scheduling block verification", zap.Uint64("round", md.Round))
e.Logger.Debug("Scheduling block verification",
zap.Uint64("round", md.Round),
zap.Uint64("seq", md.Seq),
zap.Bool("ready", canBeImmediatelyVerified))
e.sched.Schedule(task, md.Prev, canBeImmediatelyVerified)

return nil
Expand Down Expand Up @@ -1881,12 +1889,9 @@ func (e *Epoch) createNotarizedBlockVerificationTask(block Block, notarization N

func (e *Epoch) isBlockReadyToBeScheduled(seq uint64, prev Digest) bool {
if seq > 0 {
// A block can be scheduled if its predecessor either exists in storage,
// or there exists a round object for it.
// Since we only create a round object after we verify the block,
// it means we have verified this block in the past.
_, ok := e.locateBlock(seq-1, prev[:])
return ok
// A block can be scheduled if its predecessor is either notarized or finalized.
_, notarizedOrFinalized, _ := e.locateBlock(seq-1, prev[:])
return notarizedOrFinalized != nil
}
// The first block is always ready to be scheduled
return true
Expand Down Expand Up @@ -1918,7 +1923,7 @@ func (e *Epoch) verifyProposalMetadataAndBlacklist(block Block) bool {
// If it's the former, we need to find the parent of the block and ensure it is correct.
prevBlacklist := NewBlacklist(uint16(len(e.nodes)))
if bh.Seq > 0 {
prevBlock, found := e.locateBlock(bh.Seq-1, bh.Prev[:])
prevBlock, _, found := e.locateBlock(bh.Seq-1, bh.Prev[:])
if !found {
e.Logger.Debug("Could not find parent block with given digest",
zap.Uint64("blockSeq", bh.Seq-1),
Expand Down Expand Up @@ -1974,46 +1979,57 @@ func (e *Epoch) verifyProposalMetadataAndBlacklist(block Block) bool {
// 2) Else, on storage.
// Compares to the given digest, and if it's the same, returns it.
// Otherwise, returns false.
func (e *Epoch) locateBlock(seq uint64, digest []byte) (VerifiedBlock, bool) {
func (e *Epoch) locateBlock(seq uint64, digest []byte) (VerifiedBlock, *notarizationOrFinalization, bool) {
Copy link
Collaborator

@samliok samliok Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use locateQuorumRecord which finds a notarized or finalized block for the given sequence. Even though it is a "replication" function it would reduce code duplication & remove the notarizationOrFinalization struct.

// TODO index rounds by digest too to make it quicker
// TODO: optimize this by building an index from digest to round
for _, round := range e.rounds {
dig := round.block.BlockHeader().Digest
if bytes.Equal(dig[:], digest) {
return round.block, true
nof := &notarizationOrFinalization{
Notarization: round.notarization,
Finalization: round.finalization,
}
if nof.Notarization == nil && nof.Finalization == nil {
return nil, nil, false
}
return round.block, nof, true
}
}

height := e.nextSeqToCommit()
// Not in memory, and no block resides in storage.
if height == 0 {
return nil, false
return nil, nil, false
}

// If the given block has a sequence that is higher than the last block we committed to storage,
// we don't have the block in our storage.
maxSeq := height - 1
if maxSeq < seq {
return nil, false
return nil, nil, false
}

if seq >= e.nextSeqToCommit() {
e.Logger.Debug("Requested block sequence we have not yet committed to storage",
zap.Uint64("requestedSeq", seq), zap.Uint64("numBlocks", e.nextSeqToCommit()))
return nil, false
return nil, nil, false
}

block, _, ok := e.retrieveBlockOrHalt(seq)
block, finalization, ok := e.retrieveBlockOrHalt(seq)
if !ok {
return nil, false
return nil, nil, false
}

nof := &notarizationOrFinalization{
Finalization: &finalization,
}

dig := block.BlockHeader().Digest
if bytes.Equal(dig[:], digest) {
return block, true
return block, nof, true
}

return nil, false
return nil, nil, false
}

func (e *Epoch) buildBlock() {
Expand Down Expand Up @@ -2071,7 +2087,7 @@ func (e *Epoch) buildBlock() {
func (e *Epoch) retrieveBlacklistOfParentBlock(metadata ProtocolMetadata) (Blacklist, bool) {
var blacklist Blacklist
if metadata.Seq > 0 {
prevBlock, ok := e.locateBlock(metadata.Seq-1, metadata.Prev[:])
prevBlock, _, ok := e.locateBlock(metadata.Seq-1, metadata.Prev[:])
if !ok {
e.Logger.Error("Failed locating previous block",
zap.Uint64("round", metadata.Round),
Expand Down Expand Up @@ -3020,3 +3036,8 @@ type messagesForRound struct {
finalization *Finalization
notarization *Notarization
}

type notarizationOrFinalization struct {
*Notarization
*Finalization
}
66 changes: 66 additions & 0 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
rand2 "math/rand"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -32,6 +33,71 @@ var (
}
)

func TestBlockNotVerifiedIfParentNotNotarized(t *testing.T) {
bb := &testutil.TestBlockBuilder{Out: make(chan *testutil.TestBlock, 1)}

nodes := []NodeID{{1}, {2}, {3}, {4}}

comm := testutil.NewNoopComm(nodes)
conf, _, _ := testutil.DefaultTestNodeEpochConfig(t, nodes[3], comm, bb)

e, err := NewEpoch(conf)
require.NoError(t, err)

require.NoError(t, e.Start())

blocks := createBlocks(t, nodes, 2)

var block1Verified atomic.Bool

var wg sync.WaitGroup
wg.Add(1)

block0 := blocks[0].VerifiedBlock.(*testutil.TestBlock)
block0.OnVerify = func() {
wg.Done()
}
block1 := blocks[1].VerifiedBlock.(*testutil.TestBlock)
block1.OnVerify = func() {
block1Verified.Store(true)
}

v0, err := testutil.NewTestVote(block0, nodes[0])
require.NoError(t, err)

v1, err := testutil.NewTestVote(block1, nodes[1])
require.NoError(t, err)

emptyNotarization := testutil.NewEmptyNotarization(nodes, 0)

err = e.HandleMessage(&Message{
BlockMessage: &BlockMessage{
Vote: *v0,
Block: block0,
},
}, nodes[0])
require.NoError(t, err)

wg.Wait()

err = e.HandleMessage(&Message{
BlockMessage: &BlockMessage{
Vote: *v1,
Block: block1,
},
}, nodes[1])
require.NoError(t, err)

err = e.HandleMessage(&Message{
EmptyNotarization: emptyNotarization,
}, nodes[1])
require.NoError(t, err)

require.Never(t, func() bool {
return block1Verified.Load()
}, time.Second, 100*time.Millisecond)
}

func TestEpochHandleNotarizationFutureRound(t *testing.T) {
bb := &testutil.TestBlockBuilder{}
nodes := []NodeID{{1}, {2}, {3}, {4}}
Expand Down
17 changes: 17 additions & 0 deletions sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ func (as *Scheduler) Schedule(f func() Digest, prev Digest, ready bool) {
as.signal.Broadcast() // (11)
}

func (as *Scheduler) ExecuteDependents(dep Digest) {
as.lock.Lock()
defer as.lock.Unlock()

if as.close {
return
}

newlyReadyTasks := as.pending.Remove(dep)
if len(newlyReadyTasks) == 0 {
return
}
as.ready = append(as.ready, newlyReadyTasks...)

as.signal.Broadcast()
}

type Task struct {
F func() Digest
Parent Digest
Expand Down
Loading