44 "context"
55 "fmt"
66 "io"
7+ "sort"
78 "sync"
89 "sync/atomic"
910 "time"
@@ -26,10 +27,12 @@ import (
2627 "github.com/smartcontractkit/chainlink-ccip/commit/metrics"
2728 "github.com/smartcontractkit/chainlink-ccip/commit/tokenprice"
2829 "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
30+ "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/consensus"
2931 "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/discovery"
3032 dt "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon/discovery/discoverytypes"
3133 "github.com/smartcontractkit/chainlink-ccip/internal/plugintypes"
3234 "github.com/smartcontractkit/chainlink-ccip/internal/reader"
35+ "github.com/smartcontractkit/chainlink-ccip/pkg/consts"
3336 "github.com/smartcontractkit/chainlink-ccip/pkg/logutil"
3437 ocrtypecodec "github.com/smartcontractkit/chainlink-ccip/pkg/ocrtypecodec/v1"
3538 readerpkg "github.com/smartcontractkit/chainlink-ccip/pkg/reader"
@@ -396,15 +399,13 @@ func (p *Plugin) ObserveFChain(lggr logger.Logger) map[cciptypes.ChainSelector]i
396399 return fChain
397400}
398401
402+ //nolint:gocyclo
399403func (p * Plugin ) Outcome (
400404 ctx context.Context , outCtx ocr3types.OutcomeContext , q types.Query , aos []types.AttributedObservation ,
401405) (ocr3types.Outcome , error ) {
402- // TODO: Replace Outcome with OutcomeNext
403- // and remove the condition when every oracle upgraded.
404- // NOTE: If you are making changes to Outcome make sure they are reflected in outcomeNext.
405- if p .offchainCfg .EnableDonBreakingChanges {
406+ if ! p .offchainCfg .EnableDonBreakingChanges {
406407 p .lggr .Info ("running outcome next" )
407- return p .outcomeNext (ctx , outCtx , q , aos )
408+ return p .outcomeOld (ctx , outCtx , q , aos )
408409 }
409410
410411 ctx , lggr := logutil .WithOCRInfo (ctx , p .lggr , outCtx .SeqNr , logutil .PhaseOutcome )
@@ -424,6 +425,8 @@ func (p *Plugin) Outcome(
424425 tokenPricesObservations := make ([]attributedTokenPricesObservation , 0 , len (aos ))
425426 chainFeeObservations := make ([]attributedChainFeeObservation , 0 , len (aos ))
426427 discoveryObservations := make ([]plugincommon.AttributedObservation [dt.Observation ], 0 , len (aos ))
428+ observedOnChainOcrSeqNums := make ([]uint64 , 0 , len (aos ))
429+ fChainObservations := make (map [cciptypes.ChainSelector ][]int )
427430
428431 for _ , ao := range aos {
429432 obs , err := p .ocrTypeCodec .DecodeObservation (ao .Observation )
@@ -446,6 +449,14 @@ func (p *Plugin) Outcome(
446449
447450 discoveryObservations = append (discoveryObservations , plugincommon.AttributedObservation [dt.Observation ]{
448451 OracleID : ao .Observer , Observation : obs .DiscoveryObs })
452+
453+ if obs .OnChainPriceOcrSeqNum > 0 {
454+ observedOnChainOcrSeqNums = append (observedOnChainOcrSeqNums , obs .OnChainPriceOcrSeqNum )
455+ }
456+
457+ for chainSel , f := range obs .FChain {
458+ fChainObservations [chainSel ] = append (fChainObservations [chainSel ], f )
459+ }
449460 }
450461
451462 if p .discoveryProcessor != nil {
@@ -471,6 +482,13 @@ func (p *Plugin) Outcome(
471482 lggr .Errorw ("failed to get merkle roots outcome" , "err" , err )
472483 }
473484
485+ mainOutcome , invalidatePriceCache , err := p .getMainOutcomeAndCacheInvalidation (
486+ lggr , prevOutcome , observedOnChainOcrSeqNums , fChainObservations )
487+ if err != nil {
488+ lggr .Errorw ("failed to get main outcome and cache invalidation" , "err" , err )
489+ }
490+ ctx = context .WithValue (ctx , consts .InvalidateCacheKey , invalidatePriceCache )
491+
474492 tokenPriceOutcome , err := p .tokenPriceProcessor .Outcome (
475493 ctx ,
476494 prevOutcome .TokenPriceOutcome ,
@@ -488,56 +506,81 @@ func (p *Plugin) Outcome(
488506 chainFeeObservations ,
489507 )
490508 if err != nil {
491- lggr .Warnw ("failed to get gas prices outcome" , "err" , err )
509+ lggr .Warnw ("failed to get chain fee prices outcome" , "err" , err )
510+ }
511+
512+ if len (tokenPriceOutcome .TokenPrices ) > 0 || len (chainFeeOutcome .GasPrices ) > 0 {
513+ if prevOutcome .MainOutcome .InflightPriceOcrSequenceNumber > 0 {
514+ lggr .Errorw ("something is wrong since prices were observed and agreed while previous prices were inflight" ,
515+ "prevMainOutcome" , prevOutcome .MainOutcome ,
516+ "tokenPrices" , tokenPriceOutcome .TokenPrices ,
517+ "gasPrices" , chainFeeOutcome .GasPrices ,
518+ )
519+ }
520+ mainOutcome .InflightPriceOcrSequenceNumber = cciptypes .SeqNum (outCtx .SeqNr )
521+ mainOutcome .RemainingPriceChecks = p .offchainCfg .InflightPriceCheckRetries
492522 }
493523
494524 out := committypes.Outcome {
495525 MerkleRootOutcome : merkleRootOutcome ,
496526 TokenPriceOutcome : tokenPriceOutcome ,
497527 ChainFeeOutcome : chainFeeOutcome ,
498- MainOutcome : p . getMainOutcome ( outCtx , prevOutcome , tokenPriceOutcome , chainFeeOutcome ) ,
528+ MainOutcome : mainOutcome ,
499529 }
500530 p .metricsReporter .TrackOutcome (out )
501531
502532 lggr .Infow ("Commit plugin finished outcome" , "outcome" , out )
503-
504533 return p .ocrTypeCodec .EncodeOutcome (out )
505534}
506535
507- func (p * Plugin ) getMainOutcome (
508- outCtx ocr3types. OutcomeContext ,
536+ func (p * Plugin ) getMainOutcomeAndCacheInvalidation (
537+ lggr logger. Logger ,
509538 prevOutcome committypes.Outcome ,
510- tokenPriceOutcome tokenprice.Outcome ,
511- chainFeeOutcome chainfee.Outcome ,
512- ) committypes.MainOutcome {
513- pricesObservedInThisRound := len (tokenPriceOutcome .TokenPrices ) > 0 || len (chainFeeOutcome .GasPrices ) > 0
514- if pricesObservedInThisRound {
515- return committypes.MainOutcome {
516- InflightPriceOcrSequenceNumber : cciptypes .SeqNum (outCtx .SeqNr ),
517- RemainingPriceChecks : p .offchainCfg .InflightPriceCheckRetries ,
539+ observedOnChainOcrSeqNums []uint64 ,
540+ fChainObservations map [cciptypes.ChainSelector ][]int ,
541+ ) (committypes.MainOutcome , bool , error ) {
542+ // if we didn't have prices inflight or if the inflight prices did not make it on-chain
543+ // return an empty outcome indicating that nothing is inflight now
544+ if prevOutcome .MainOutcome .InflightPriceOcrSequenceNumber == 0 ||
545+ prevOutcome .MainOutcome .RemainingPriceChecks == 0 {
546+ return committypes.MainOutcome {}, false , nil
547+ }
548+
549+ // check if the inflight prices made it on-chain
550+ // first validate and agree on fDestChain and current onChainOcrSeqNum
551+ for _ , v := range observedOnChainOcrSeqNums {
552+ if v == 0 {
553+ return committypes.MainOutcome {}, false , fmt .Errorf ("observed ocr seq num cannot be zero at this point" )
518554 }
519555 }
520556
521- waitingForPriceUpdatesToMakeItOnchain := prevOutcome .MainOutcome .InflightPriceOcrSequenceNumber > 0
522- if waitingForPriceUpdatesToMakeItOnchain {
523- remainingPriceChecks := prevOutcome .MainOutcome .RemainingPriceChecks - 1
524- inflightOcrSeqNum := prevOutcome .MainOutcome .InflightPriceOcrSequenceNumber
557+ donThresh := consensus.MakeConstantThreshold [cciptypes.ChainSelector ](consensus .TwoFPlus1 (p .reportingCfg .F ))
558+ fChainConsensus := consensus .GetConsensusMap (lggr , "mainFChain" , fChainObservations , donThresh )
559+ fDestChain , ok := fChainConsensus [p .destChain ]
560+ if ! ok {
561+ return committypes.MainOutcome {}, false , fmt .Errorf ("no fDestChain for %d: %v" , p .destChain , fChainObservations )
562+ }
525563
526- if remainingPriceChecks < 0 {
527- remainingPriceChecks = 0
528- inflightOcrSeqNum = 0
529- }
564+ if consensus . LtFPlusOne ( fDestChain , len ( observedOnChainOcrSeqNums )) {
565+ return committypes. MainOutcome {}, false , fmt . Errorf ( "onChainOcrSeqNums no consensus requiredMinimum=%d got=%d %v" ,
566+ fDestChain + 1 , len ( observedOnChainOcrSeqNums ), observedOnChainOcrSeqNums )
567+ }
530568
531- return committypes.MainOutcome {
532- InflightPriceOcrSequenceNumber : inflightOcrSeqNum ,
533- RemainingPriceChecks : remainingPriceChecks ,
534- }
569+ sort .Slice (observedOnChainOcrSeqNums , func (i , j int ) bool {
570+ return observedOnChainOcrSeqNums [i ] < observedOnChainOcrSeqNums [j ]
571+ })
572+ consensusOnChainOcrSeqNum := observedOnChainOcrSeqNums [fDestChain ]
573+
574+ // make the actual checks and send the corresponding outcome
575+ pricesTransmitted := consensusOnChainOcrSeqNum >= uint64 (prevOutcome .MainOutcome .InflightPriceOcrSequenceNumber )
576+ if pricesTransmitted {
577+ return committypes.MainOutcome {}, true , nil
535578 }
536579
537580 return committypes.MainOutcome {
538- InflightPriceOcrSequenceNumber : 0 ,
539- RemainingPriceChecks : 0 ,
540- }
581+ InflightPriceOcrSequenceNumber : prevOutcome . MainOutcome . InflightPriceOcrSequenceNumber ,
582+ RemainingPriceChecks : prevOutcome . MainOutcome . RemainingPriceChecks - 1 ,
583+ }, false , nil
541584}
542585
543586func (p * Plugin ) Close () error {
0 commit comments