diff --git a/.changeset/violet-coins-play.md b/.changeset/violet-coins-play.md new file mode 100644 index 00000000000..c280fb42e1b --- /dev/null +++ b/.changeset/violet-coins-play.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix #internal Fix remote trigger event expiry logic diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 7edcbf5eba7..90596b4e48e 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -219,12 +219,8 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload) ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true) s.mu.Unlock() - if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() { - s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender) - continue - } + s.lggr.Debugw("trigger event received", "triggerEventId", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender, "ready", ready, "nowTs", nowMs, "creationTs", creationTs, "minResponsesToAggregate", s.config.MinResponsesToAggregate) if ready { - s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID) aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads) if err != nil { s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "err", err) diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index d5b48bc1dc8..d122ee6547b 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -13,9 +13,9 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( @@ -29,29 +29,10 @@ var ( ) func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { + t.Parallel() lggr := logger.TestLogger(t) - ctx := testutils.Context(t) - capInfo := commoncap.CapabilityInfo{ - ID: "cap_id@1", - CapabilityType: commoncap.CapabilityTypeTrigger, - Description: "Remote Trigger", - } - p1 := p2ptypes.PeerID{} - require.NoError(t, p1.UnmarshalText([]byte(peerID1))) - p2 := p2ptypes.PeerID{} - require.NoError(t, p2.UnmarshalText([]byte(peerID2))) - capDonInfo := commoncap.DON{ - ID: 1, - Members: []p2ptypes.PeerID{p1}, - F: 0, - } - workflowDonInfo := commoncap.DON{ - ID: 2, - Members: []p2ptypes.PeerID{p2}, - F: 0, - } + capInfo, capDon, workflowDon := buildTwoTestDONs(t, 1, 1) dispatcher := remoteMocks.NewDispatcher(t) - awaitRegistrationMessageCh := make(chan struct{}) dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { select { @@ -67,19 +48,130 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiry: 100 * time.Second, } - subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr) - require.NoError(t, subscriber.Start(ctx)) + subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr) + require.NoError(t, subscriber.Start(t.Context())) req := commoncap.TriggerRegistrationRequest{ Metadata: commoncap.RequestMetadata{ WorkflowID: workflowID1, }, } - triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, req) + triggerEventCallbackCh, err := subscriber.RegisterTrigger(t.Context(), req) require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req)) + // calling UnregisterTrigger repeatedly is safe + require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req)) + require.NoError(t, subscriber.Close()) + }) <-awaitRegistrationMessageCh // receive trigger event + triggerEventValue, err := values.NewMap(triggerEvent1) + require.NoError(t, err) + triggerEvent := buildTriggerEvent(t, capDon.Members[0][:]) + subscriber.Receive(t.Context(), triggerEvent) + response := <-triggerEventCallbackCh + require.Equal(t, response.Event.Outputs, triggerEventValue) +} + +func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) { + t.Parallel() + lggr := logger.TestLogger(t) + capInfo, capDon, workflowDon := buildTwoTestDONs(t, 3, 1) + awaitRegistrationMessageCh := make(chan struct{}) + dispatcher := remoteMocks.NewDispatcher(t) + dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + select { + case awaitRegistrationMessageCh <- struct{}{}: + default: + } + }) + + // register trigger + config := &commoncap.RemoteTriggerConfig{ + RegistrationRefresh: 100 * time.Millisecond, + RegistrationExpiry: 10 * time.Second, + MinResponsesToAggregate: 2, + MessageExpiry: 10 * time.Second, + } + subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr) + + require.NoError(t, subscriber.Start(t.Context())) + regReq := commoncap.TriggerRegistrationRequest{ + Metadata: commoncap.RequestMetadata{ + WorkflowID: workflowID1, + }, + } + triggerEventCallbackCh, err := subscriber.RegisterTrigger(t.Context(), regReq) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, subscriber.UnregisterTrigger(t.Context(), regReq)) + require.NoError(t, subscriber.Close()) + }) + <-awaitRegistrationMessageCh + + // receive trigger events: + // cleanup loop happens every 10 seconds, at 0:00, 0:10, 0:20, etc. + // send the event from the first node around 0:02 (this is a bad node + // that sends it too early) + triggerEvent := buildTriggerEvent(t, capDon.Members[0][:]) + time.Sleep(2 * time.Second) + subscriber.Receive(t.Context(), triggerEvent) + + // send events from nodes 2 & 3 (the good ones) around 0:15 so that + // the diff between 0:02 and 0:15 exceeds the expiry threshold but + // we don't hit the cleanup loop yet + time.Sleep(13 * time.Second) + triggerEvent.Sender = capDon.Members[1][:] + subscriber.Receive(t.Context(), triggerEvent) + // the aggregation shouldn't happen after events 1 and 2 as they + // were received too far apart in time + require.Empty(t, triggerEventCallbackCh) + triggerEvent.Sender = capDon.Members[2][:] + subscriber.Receive(t.Context(), triggerEvent) + + // event should be processed + response := <-triggerEventCallbackCh + triggerEventValue, err := values.NewMap(triggerEvent1) + require.NoError(t, err) + require.Equal(t, response.Event.Outputs, triggerEventValue) +} + +func buildTwoTestDONs(t *testing.T, capDonSize int, workflowDonSize int) (commoncap.CapabilityInfo, commoncap.DON, commoncap.DON) { + capInfo := commoncap.CapabilityInfo{ + ID: "cap_id@1", + CapabilityType: commoncap.CapabilityTypeTrigger, + Description: "Remote Trigger", + } + + capDon := commoncap.DON{ + ID: 1, + Members: []p2ptypes.PeerID{}, + F: 0, + } + for range capDonSize { + pid := utils.MustNewPeerID() + peer := p2ptypes.PeerID{} + require.NoError(t, peer.UnmarshalText([]byte(pid))) + capDon.Members = append(capDon.Members, peer) + } + + workflowDon := commoncap.DON{ + ID: 2, + Members: []p2ptypes.PeerID{}, + F: 0, + } + for range workflowDonSize { + pid := utils.MustNewPeerID() + peer := p2ptypes.PeerID{} + require.NoError(t, peer.UnmarshalText([]byte(pid))) + workflowDon.Members = append(workflowDon.Members, peer) + } + return capInfo, capDon, workflowDon +} + +func buildTriggerEvent(t *testing.T, sender []byte) *remotetypes.MessageBody { triggerEventValue, err := values.NewMap(triggerEvent1) require.NoError(t, err) capResponse := commoncap.TriggerResponse{ @@ -90,8 +182,9 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { } marshaled, err := pb.MarshalTriggerResponse(capResponse) require.NoError(t, err) - triggerEvent := &remotetypes.MessageBody{ - Sender: p1[:], + + return &remotetypes.MessageBody{ + Sender: sender, Method: remotetypes.MethodTriggerEvent, Metadata: &remotetypes.MessageBody_TriggerEventMetadata{ TriggerEventMetadata: &remotetypes.TriggerEventMetadata{ @@ -100,11 +193,4 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { }, Payload: marshaled, } - subscriber.Receive(ctx, triggerEvent) - response := <-triggerEventCallbackCh - require.Equal(t, response.Event.Outputs, triggerEventValue) - - require.NoError(t, subscriber.UnregisterTrigger(ctx, req)) - require.NoError(t, subscriber.UnregisterTrigger(ctx, req)) - require.NoError(t, subscriber.Close()) }