Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,11 @@ func (e *Epoch) persistFinalizationCertificate(fCert FinalizationCertificate) er

// we receive a finalization certificate for a future round
e.Logger.Debug("Received a finalization certificate for a future sequence", zap.Uint64("seq", fCert.Finalization.Seq), zap.Uint64("nextSeqToCommit", nextSeqToCommit))

if err := e.rebroadcastPastFinalizations(); err != nil {
return err
}

e.replicationState.collectFutureFinalizationCertificates(&fCert, e.round, nextSeqToCommit)
}

Expand All @@ -875,6 +880,45 @@ func (e *Epoch) persistFinalizationCertificate(fCert FinalizationCertificate) er
return nil
}

func (e *Epoch) rebroadcastPastFinalizations() error {
r := e.round

for {
if r == 0 {
return nil
}
r--
round, exists := e.rounds[r]
if !exists {
return nil
}

// Already collected a finalization certificate
if round.fCert != nil {
continue
}

// Has notarized this round?
if round.notarization == nil {
continue
}

var finalizationMessage *Message
// Try to re-use finalization we created if possible, else create it.
if finalization, exists := round.finalizations[string(e.ID)]; exists {
finalizationMessage = &Message{Finalization: finalization}
} else {
_, msg, err := e.constructFinalizationMessage(round.notarization.Vote.BlockHeader)
if err != nil {
return err
}
finalizationMessage = msg
}
e.Logger.Debug("Rebroadcasting finalization", zap.Uint64("round", r))
e.Comm.Broadcast(finalizationMessage)
}
}

func (e *Epoch) indexFinalizationCertificates(startRound uint64) {
r := startRound
round, exists := e.rounds[r]
Expand Down Expand Up @@ -1913,13 +1957,26 @@ func (e *Epoch) doNotarized(r uint64) error {

md := block.BlockHeader()

finalization, finalizationMsg, err := e.constructFinalizationMessage(md)
if err != nil {
return err
}
e.Comm.Broadcast(finalizationMsg)

err1 := e.startRound()
err2 := e.handleFinalizationMessage(&finalization, e.ID)

return errors.Join(err1, err2)
}

func (e *Epoch) constructFinalizationMessage(md BlockHeader) (Finalization, *Message, error) {
f := ToBeSignedFinalization{BlockHeader: md}
signature, err := f.Sign(e.Signer)
if err != nil {
return fmt.Errorf("failed signing vote %w", err)
return Finalization{}, nil, fmt.Errorf("failed signing vote %w", err)
}

sf := Finalization{
finalization := Finalization{
Signature: Signature{
Signer: e.ID,
Value: signature,
Expand All @@ -1930,14 +1987,9 @@ func (e *Epoch) doNotarized(r uint64) error {
}

finalizationMsg := &Message{
Finalization: &sf,
Finalization: &finalization,
}
e.Comm.Broadcast(finalizationMsg)

err1 := e.startRound()
err2 := e.handleFinalizationMessage(&sf, e.ID)

return errors.Join(err1, err2)
return finalization, finalizationMsg, nil
}

// stores a notarization in the epoch's memory.
Expand Down
119 changes: 119 additions & 0 deletions epoch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,125 @@ func TestEpochConsecutiveProposalsDoNotGetVerified(t *testing.T) {
}
}

func TestEpochNotarizeTwiceThenFinalize(t *testing.T) {
l := testutil.MakeLogger(t, 1)
bb := &testBlockBuilder{out: make(chan *testBlock, 1)}
storage := newInMemStorage()

wal := newTestWAL(t)

nodes := []NodeID{{1}, {2}, {3}, {4}}

recordedMessages := make(chan *Message, 100)
comm := &recordingComm{Communication: noopComm(nodes), BroadcastMessages: recordedMessages}

conf := EpochConfig{
MaxProposalWait: DefaultMaxProposalWaitTime,
Logger: l,
ID: nodes[0],
Signer: &testSigner{},
WAL: wal,
Verifier: &testVerifier{},
Storage: storage,
Comm: comm,
BlockBuilder: bb,
SignatureAggregator: &testSignatureAggregator{},
}

e, err := NewEpoch(conf)
require.NoError(t, err)

require.NoError(t, e.Start())

// Round 0
block0 := <-bb.out

injectTestVote(t, e, block0, nodes[1])
injectTestVote(t, e, block0, nodes[2])
wal.assertNotarization(0)

// Round 1
md := e.Metadata()
_, ok := bb.BuildBlock(context.Background(), md)
require.True(t, ok)
block1 := <-bb.out

vote, err := newTestVote(block1, nodes[1])
require.NoError(t, err)
err = e.HandleMessage(&Message{
BlockMessage: &BlockMessage{
Vote: *vote,
Block: block1,
},
}, nodes[1])
require.NoError(t, err)

injectTestVote(t, e, block1, nodes[2])

wal.assertNotarization(1)

// Round 2
md = e.Metadata()
_, ok = bb.BuildBlock(context.Background(), md)
require.True(t, ok)
block2 := <-bb.out

vote, err = newTestVote(block2, nodes[2])
require.NoError(t, err)
err = e.HandleMessage(&Message{
BlockMessage: &BlockMessage{
Vote: *vote,
Block: block2,
},
}, nodes[2])
require.NoError(t, err)

injectTestVote(t, e, block2, nodes[1])

wal.assertNotarization(2)

// drain the recorded messages
for len(recordedMessages) > 0 {
<-recordedMessages
}

blocks := []*testBlock{block0, block1}

var wg sync.WaitGroup
wg.Add(1)

finish := make(chan struct{})
// Once the node sends a finalization message, send it finalization messages as a response
go func() {
defer wg.Done()
for {
select {
case <-finish:
return
case msg := <-recordedMessages:
if msg.Finalization != nil {
index := msg.Finalization.Finalization.Round
if index > 1 {
continue
}
injectTestFinalization(t, e, blocks[int(index)], nodes[1])
injectTestFinalization(t, e, blocks[int(index)], nodes[2])
}
}
}
}()

injectTestFinalization(t, e, block2, nodes[1])
injectTestFinalization(t, e, block2, nodes[2])

storage.waitForBlockCommit(0)
storage.waitForBlockCommit(1)
storage.waitForBlockCommit(2)

close(finish)
wg.Wait()
}

func TestEpochFinalizeThenNotarize(t *testing.T) {
l := testutil.MakeLogger(t, 1)
bb := &testBlockBuilder{out: make(chan *testBlock, 1)}
Expand Down