Skip to content

Load testing investigation #640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions execute/optimizers/type_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (op ObservationOptimizer) TruncateObservation(observation exectypes.Observa
op.lggr.Errorw("missing message", "seqNum", seqNum, "chain", chain)
continue
}
op.lggr.Debugw("truncating message", "seqNum", seqNum, "chain", chain)
obs.Messages[chain][seqNum] = cciptypes.Message{}
obs.TokenData[chain][seqNum] = exectypes.NewMessageTokenData()
// Subtract the removed message and token size
Expand All @@ -113,11 +114,13 @@ func (op ObservationOptimizer) TruncateObservation(observation exectypes.Observa

var bytesTruncated int
// Reaching here means that all messages in the report are truncated, truncate the last commit
op.lggr.Debugw("truncating last commit for", "chain", chain)
obs, bytesTruncated = op.truncateLastCommit(obs, chain)

encodedObsSize -= bytesTruncated

if len(obs.CommitReports[chain]) == 0 {
op.lggr.Debugw("truncating chain", "chain", chain)
// If the last commit report was truncated, truncate the chain
obs = op.truncateChain(obs, chain)
}
Expand Down
103 changes: 1 addition & 102 deletions execute/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package execute

import (
"bytes"
"cmp"
"context"
"encoding/hex"
"errors"
"fmt"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -42,10 +39,6 @@ import (
"github.com/smartcontractkit/chainlink-ccip/pluginconfig"
)

var (
errAlreadyExecuted = errors.New("messages already executed")
)

type ContractDiscoveryInterface plugincommon.PluginProcessor[dt.Query, dt.Observation, dt.Outcome]

type inflightMessageCache interface {
Expand Down Expand Up @@ -400,6 +393,7 @@ func selectReport(
for i, commitReport := range commitReports {
// handle incomplete observations.
if len(commitReport.Messages) == 0 {
lggr.Debugw("selectReport: skipping commit report with no messages")
pendingReports++
continue
}
Expand Down Expand Up @@ -546,104 +540,9 @@ func (p *Plugin) validateReport(
return false, cciptypes.ExecutePluginReport{}, nil
}

// check that the messages in the report are not already executed onchain.
// note that this involves a set of DB queries, hence why its last in the checks.
seqNrRangesBySource := getSnRangeSetPairsBySource(decodedReport.ChainReports)
err = p.checkAlreadyExecuted(ctx, lggr, seqNrRangesBySource)
if errors.Is(err, errAlreadyExecuted) {
// Some messages in the report have already
// been executed, so we don't want to re-execute them.
// This gives the exec plugin a chance to remedy the situation
// by selecting a different set of messages.
return false, decodedReport, nil
}
if err != nil {
// TODO: should we return true here if we couldn't check for already executed messages?
return false, decodedReport, fmt.Errorf("checking for already executed messages failed: %w", err)
}

return true, decodedReport, nil
}

// checkAlreadyExecuted checks if the messages in the report have already been executed
// on the destination chain. It queries the DB for executed messages in the given sequence
// number range for each source chain in the report.
func (p *Plugin) checkAlreadyExecuted(
ctx context.Context,
lggr logger.Logger,
seqNrRangesBySource map[cciptypes.ChainSelector]snRangeSetPair,
) error {
// TODO: batch these queries? these are all DB reads.
// maybe some alternative queries exist.
for sourceChainSelector, seqNrRange := range seqNrRangesBySource {
executed, err := p.ccipReader.ExecutedMessages(ctx, sourceChainSelector, seqNrRange.snRange)
if err != nil {
return fmt.Errorf("couldn't check if messages already executed: %w", err)
}

if intersection := mapset.NewSet(executed...).Intersect(seqNrRange.set); !intersection.IsEmpty() {
// Some messages in the report have been executed, don't accept it.
reportSeqNrsSlice := seqNrRange.set.ToSlice()
lggr.Warnw("some messages in report already executed",
"alreadyExecuted", executed,
"reportSeqNrs", reportSeqNrsSlice,
)
return fmt.Errorf("%w: already executed messages %+v report seq nrs %+v",
errAlreadyExecuted, executed, reportSeqNrsSlice)
}
}

return nil
}

// snRangeSetPair is an internal data structure used to store a sequence number range
// and a set of sequence numbers simultaneously.
type snRangeSetPair struct {
// snRange is a range of [min, max] sequence numbers of the messages in the report for a particular source chain.
// it is used to query the CCIPReader for executed messages.
snRange cciptypes.SeqNumRange
// set is the sequence numbers of the messages in the report for a particular source chain.
// it is used to check whether the returned array from CCIPReader has a non-empty intersection
// with the set of sequence numbers in the report.
set mapset.Set[cciptypes.SeqNum]
}

// getSnRangeSetPairsBySource returns a map of source chain selector to
// the sequence number range of the messages in the report.
func getSnRangeSetPairsBySource(
chainReports []cciptypes.ExecutePluginReportSingleChain,
) map[cciptypes.ChainSelector]snRangeSetPair {
seqNrRangesBySource := make(map[cciptypes.ChainSelector]snRangeSetPair)
for _, chainReport := range chainReports {
// This should never happen, indicates a bug in the report building and accepting process.
// But we sanity check since slices.Min/Max will panic on empty slices.
if len(chainReport.Messages) == 0 {
continue
}

cmpr := func(a, b cciptypes.Message) int {
return cmp.Compare(a.Header.SequenceNumber, b.Header.SequenceNumber)
}
minMsg := slices.MinFunc(chainReport.Messages, cmpr)
maxMsg := slices.MaxFunc(chainReport.Messages, cmpr)
seqNrSet := mapset.NewSet(
slicelib.Map(
chainReport.Messages,
func(msg cciptypes.Message) cciptypes.SeqNum {
return msg.Header.SequenceNumber
},
)...,
)
seqNrRangesBySource[chainReport.SourceChainSelector] = snRangeSetPair{
snRange: cciptypes.NewSeqNumRange(
minMsg.Header.SequenceNumber,
maxMsg.Header.SequenceNumber),
set: seqNrSet,
}
}
return seqNrRangesBySource
}

func (p *Plugin) ShouldAcceptAttestedReport(
ctx context.Context, seqNr uint64, r ocr3types.ReportWithInfo[[]byte],
) (bool, error) {
Expand Down
Loading
Loading