diff --git a/execute/observation.go b/execute/observation.go index 2f525f698b..ef77f15ab4 100644 --- a/execute/observation.go +++ b/execute/observation.go @@ -3,6 +3,8 @@ package execute import ( "context" "fmt" + "sort" + "sync" "time" "golang.org/x/exp/maps" @@ -295,42 +297,88 @@ func readAllMessages( commitObs := regroup(commitData) + type result struct { + srcChain cciptypes.ChainSelector + msgs []cciptypes.Message + report exectypes.CommitData + err error + conformsToRange bool + } + + resultChan := make(chan result) + var wg sync.WaitGroup + + // Process each chain's reports concurrently for srcChain, reports := range commitObs { if len(reports) == 0 { continue } messageObs[srcChain] = make(map[cciptypes.SeqNum]cciptypes.Message) - // Read messages for each range. + + // Process each report within a chain concurrently for _, report := range reports { - msgs, err := ccipReader.MsgsBetweenSeqNums(ctx, srcChain, report.SequenceNumberRange) - if err != nil { - lggr.Errorw("unable to read all messages for report", - "srcChain", srcChain, - "seqRange", report.SequenceNumberRange, - "merkleRoot", report.MerkleRoot, - "err", err, - ) - continue - } + wg.Add(1) + go func(srcChain cciptypes.ChainSelector, report exectypes.CommitData) { + defer wg.Done() + + msgs, err := ccipReader.MsgsBetweenSeqNums(ctx, srcChain, report.SequenceNumberRange) + conformsToRange := err == nil && msgsConformToSeqRange(msgs, report.SequenceNumberRange) + + if err != nil { + lggr.Errorw("unable to read all messages for report", + "srcChain", srcChain, + "seqRange", report.SequenceNumberRange, + "merkleRoot", report.MerkleRoot, + "err", err, + ) + } else if !conformsToRange { + lggr.Errorw("missing messages in range", + "srcChain", srcChain, "seqRange", report.SequenceNumberRange) + } + + resultChan <- result{ + srcChain: srcChain, + msgs: msgs, + report: report, + err: err, + conformsToRange: conformsToRange, + } + }(srcChain, report) + } + } - if !msgsConformToSeqRange(msgs, report.SequenceNumberRange) { - lggr.Errorw("missing messages in range", - "srcChain", srcChain, "seqRange", report.SequenceNumberRange) - continue - } + // Close result channel when all goroutines complete + go func() { + wg.Wait() + close(resultChan) + }() - for _, msg := range msgs { - messageObs[srcChain][msg.Header.SequenceNumber] = msg - messageTimestamps[msg.Header.MessageID] = report.Timestamp - } - availableReports[srcChain] = append(availableReports[srcChain], report) + // Collect results + for r := range resultChan { + if r.err != nil || !r.conformsToRange { + continue } - // Remove empty chains. + + for _, msg := range r.msgs { + messageObs[r.srcChain][msg.Header.SequenceNumber] = msg + messageTimestamps[msg.Header.MessageID] = r.report.Timestamp + } + availableReports[r.srcChain] = append(availableReports[r.srcChain], r.report) + } + // sort available reports for each selector by timestamp + for _, reports := range availableReports { + sort.Slice(reports, func(i, j int) bool { + return reports[i].Timestamp.Before(reports[j].Timestamp) + }) + } + // Remove empty chains + for srcChain := range messageObs { if len(messageObs[srcChain]) == 0 { delete(messageObs, srcChain) } } + return messageObs, availableReports, messageTimestamps }