Skip to content

Parallelize reading messages from reports in exec observation #709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions execute/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import (
"context"
"fmt"
"sort"
"sync"
"time"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -283,7 +285,7 @@
return groupedCommits
}

func readAllMessages(

Check failure on line 288 in execute/observation.go

View workflow job for this annotation

GitHub Actions / build-lint-test

cyclomatic complexity 14 of func `readAllMessages` is high (> 13) (gocyclo)
ctx context.Context,
lggr logger.Logger,
ccipReader reader.CCIPReader,
Expand All @@ -295,42 +297,88 @@

commitObs := regroup(commitData)

type result struct {
srcChain cciptypes.ChainSelector
msgs []cciptypes.Message
report exectypes.CommitData
err error
conformsToRange bool
}

resultChan := make(chan result)
var wg sync.WaitGroup

// Process each chain's reports concurrently
for srcChain, reports := range commitObs {
if len(reports) == 0 {
continue
}

messageObs[srcChain] = make(map[cciptypes.SeqNum]cciptypes.Message)
// Read messages for each range.

// Process each report within a chain concurrently
for _, report := range reports {
msgs, err := ccipReader.MsgsBetweenSeqNums(ctx, srcChain, report.SequenceNumberRange)
if err != nil {
lggr.Errorw("unable to read all messages for report",
"srcChain", srcChain,
"seqRange", report.SequenceNumberRange,
"merkleRoot", report.MerkleRoot,
"err", err,
)
continue
}
wg.Add(1)
go func(srcChain cciptypes.ChainSelector, report exectypes.CommitData) {
defer wg.Done()

msgs, err := ccipReader.MsgsBetweenSeqNums(ctx, srcChain, report.SequenceNumberRange)
conformsToRange := err == nil && msgsConformToSeqRange(msgs, report.SequenceNumberRange)

if err != nil {
lggr.Errorw("unable to read all messages for report",
"srcChain", srcChain,
"seqRange", report.SequenceNumberRange,
"merkleRoot", report.MerkleRoot,
"err", err,
)
} else if !conformsToRange {
lggr.Errorw("missing messages in range",
"srcChain", srcChain, "seqRange", report.SequenceNumberRange)
}

resultChan <- result{
srcChain: srcChain,
msgs: msgs,
report: report,
err: err,
conformsToRange: conformsToRange,
}
}(srcChain, report)
}
}

if !msgsConformToSeqRange(msgs, report.SequenceNumberRange) {
lggr.Errorw("missing messages in range",
"srcChain", srcChain, "seqRange", report.SequenceNumberRange)
continue
}
// Close result channel when all goroutines complete
go func() {
wg.Wait()
close(resultChan)
}()

for _, msg := range msgs {
messageObs[srcChain][msg.Header.SequenceNumber] = msg
messageTimestamps[msg.Header.MessageID] = report.Timestamp
}
availableReports[srcChain] = append(availableReports[srcChain], report)
// Collect results
for r := range resultChan {
if r.err != nil || !r.conformsToRange {
continue
}
// Remove empty chains.

for _, msg := range r.msgs {
messageObs[r.srcChain][msg.Header.SequenceNumber] = msg
messageTimestamps[msg.Header.MessageID] = r.report.Timestamp
}
availableReports[r.srcChain] = append(availableReports[r.srcChain], r.report)
}
// sort available reports for each selector by timestamp
for _, reports := range availableReports {
sort.Slice(reports, func(i, j int) bool {
return reports[i].Timestamp.Before(reports[j].Timestamp)
})
}
// Remove empty chains
for srcChain := range messageObs {
if len(messageObs[srcChain]) == 0 {
delete(messageObs, srcChain)
}
}

return messageObs, availableReports, messageTimestamps
}

Expand Down
Loading