Skip to content

Commit 55ec76c

Browse files
committed
Fix remote trigger event expiry logic
1 parent d79decc commit 55ec76c

File tree

3 files changed

+126
-39
lines changed

3 files changed

+126
-39
lines changed

.changeset/violet-coins-play.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink": patch
3+
---
4+
5+
#bugfix #internal Fix remote trigger event expiry logic

core/capabilities/remote/trigger_subscriber.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,8 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
219219
creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload)
220220
ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true)
221221
s.mu.Unlock()
222-
if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() {
223-
s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender)
224-
continue
225-
}
222+
s.lggr.Debugw("trigger event received", "triggerEventId", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender, "ready", ready, "nowTs", nowMs, "creationTs", creationTs, "minResponsesToAggregate", s.config.MinResponsesToAggregate)
226223
if ready {
227-
s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID)
228224
aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads)
229225
if err != nil {
230226
s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "err", err)

core/capabilities/remote/trigger_subscriber_test.go

Lines changed: 120 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import (
1313
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
1414
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
1515
remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks"
16-
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
1716
"github.com/smartcontractkit/chainlink/v2/core/logger"
1817
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
18+
"github.com/smartcontractkit/chainlink/v2/core/utils"
1919
)
2020

2121
const (
@@ -29,29 +29,10 @@ var (
2929
)
3030

3131
func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
32+
t.Parallel()
3233
lggr := logger.TestLogger(t)
33-
ctx := testutils.Context(t)
34-
capInfo := commoncap.CapabilityInfo{
35-
ID: "cap_id@1",
36-
CapabilityType: commoncap.CapabilityTypeTrigger,
37-
Description: "Remote Trigger",
38-
}
39-
p1 := p2ptypes.PeerID{}
40-
require.NoError(t, p1.UnmarshalText([]byte(peerID1)))
41-
p2 := p2ptypes.PeerID{}
42-
require.NoError(t, p2.UnmarshalText([]byte(peerID2)))
43-
capDonInfo := commoncap.DON{
44-
ID: 1,
45-
Members: []p2ptypes.PeerID{p1},
46-
F: 0,
47-
}
48-
workflowDonInfo := commoncap.DON{
49-
ID: 2,
50-
Members: []p2ptypes.PeerID{p2},
51-
F: 0,
52-
}
34+
capInfo, capDon, workflowDon := buildTwoTestDONs(t, 1, 1)
5335
dispatcher := remoteMocks.NewDispatcher(t)
54-
5536
awaitRegistrationMessageCh := make(chan struct{})
5637
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
5738
select {
@@ -67,19 +48,130 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
6748
MinResponsesToAggregate: 1,
6849
MessageExpiry: 100 * time.Second,
6950
}
70-
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
71-
require.NoError(t, subscriber.Start(ctx))
51+
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr)
52+
require.NoError(t, subscriber.Start(t.Context()))
7253

7354
req := commoncap.TriggerRegistrationRequest{
7455
Metadata: commoncap.RequestMetadata{
7556
WorkflowID: workflowID1,
7657
},
7758
}
78-
triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, req)
59+
triggerEventCallbackCh, err := subscriber.RegisterTrigger(t.Context(), req)
7960
require.NoError(t, err)
61+
t.Cleanup(func() {
62+
require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req))
63+
// calling UnregisterTrigger repeatedly is safe
64+
require.NoError(t, subscriber.UnregisterTrigger(t.Context(), req))
65+
require.NoError(t, subscriber.Close())
66+
})
8067
<-awaitRegistrationMessageCh
8168

8269
// receive trigger event
70+
triggerEventValue, err := values.NewMap(triggerEvent1)
71+
require.NoError(t, err)
72+
triggerEvent := buildTriggerEvent(t, capDon.Members[0][:])
73+
subscriber.Receive(t.Context(), triggerEvent)
74+
response := <-triggerEventCallbackCh
75+
require.Equal(t, response.Event.Outputs, triggerEventValue)
76+
}
77+
78+
func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) {
79+
t.Parallel()
80+
lggr := logger.TestLogger(t)
81+
capInfo, capDon, workflowDon := buildTwoTestDONs(t, 3, 1)
82+
awaitRegistrationMessageCh := make(chan struct{})
83+
dispatcher := remoteMocks.NewDispatcher(t)
84+
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
85+
select {
86+
case awaitRegistrationMessageCh <- struct{}{}:
87+
default:
88+
}
89+
})
90+
91+
// register trigger
92+
config := &commoncap.RemoteTriggerConfig{
93+
RegistrationRefresh: 100 * time.Millisecond,
94+
RegistrationExpiry: 10 * time.Second,
95+
MinResponsesToAggregate: 2,
96+
MessageExpiry: 10 * time.Second,
97+
}
98+
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr)
99+
100+
require.NoError(t, subscriber.Start(t.Context()))
101+
regReq := commoncap.TriggerRegistrationRequest{
102+
Metadata: commoncap.RequestMetadata{
103+
WorkflowID: workflowID1,
104+
},
105+
}
106+
triggerEventCallbackCh, err := subscriber.RegisterTrigger(t.Context(), regReq)
107+
require.NoError(t, err)
108+
t.Cleanup(func() {
109+
require.NoError(t, subscriber.UnregisterTrigger(t.Context(), regReq))
110+
require.NoError(t, subscriber.Close())
111+
})
112+
<-awaitRegistrationMessageCh
113+
114+
// receive trigger events:
115+
// cleanup loop happens every 10 seconds, at 0:00, 0:10, 0:20, etc.
116+
// send the event from the first node around 0:02 (this is a bad node
117+
// that sends it too early)
118+
triggerEvent := buildTriggerEvent(t, capDon.Members[0][:])
119+
time.Sleep(2 * time.Second)
120+
subscriber.Receive(t.Context(), triggerEvent)
121+
122+
// send events from nodes 2 & 3 (the good ones) around 0:15 so that
123+
// the diff between 0:02 and 0:15 exceeds the expiry threshold but
124+
// we don't hit the cleanup loop yet
125+
time.Sleep(13 * time.Second)
126+
triggerEvent.Sender = capDon.Members[1][:]
127+
subscriber.Receive(t.Context(), triggerEvent)
128+
// the aggregation shouldn't happen after events 1 and 2 as they
129+
// were received too far apart in time
130+
require.Empty(t, triggerEventCallbackCh)
131+
triggerEvent.Sender = capDon.Members[2][:]
132+
subscriber.Receive(t.Context(), triggerEvent)
133+
134+
// event should be processed
135+
response := <-triggerEventCallbackCh
136+
triggerEventValue, err := values.NewMap(triggerEvent1)
137+
require.NoError(t, err)
138+
require.Equal(t, response.Event.Outputs, triggerEventValue)
139+
}
140+
141+
func buildTwoTestDONs(t *testing.T, capDonSize int, workflowDonSize int) (commoncap.CapabilityInfo, commoncap.DON, commoncap.DON) {
142+
capInfo := commoncap.CapabilityInfo{
143+
ID: "cap_id@1",
144+
CapabilityType: commoncap.CapabilityTypeTrigger,
145+
Description: "Remote Trigger",
146+
}
147+
148+
capDon := commoncap.DON{
149+
ID: 1,
150+
Members: []p2ptypes.PeerID{},
151+
F: 0,
152+
}
153+
for range capDonSize {
154+
pid := utils.MustNewPeerID()
155+
peer := p2ptypes.PeerID{}
156+
require.NoError(t, peer.UnmarshalText([]byte(pid)))
157+
capDon.Members = append(capDon.Members, peer)
158+
}
159+
160+
workflowDon := commoncap.DON{
161+
ID: 2,
162+
Members: []p2ptypes.PeerID{},
163+
F: 0,
164+
}
165+
for range workflowDonSize {
166+
pid := utils.MustNewPeerID()
167+
peer := p2ptypes.PeerID{}
168+
require.NoError(t, peer.UnmarshalText([]byte(pid)))
169+
workflowDon.Members = append(workflowDon.Members, peer)
170+
}
171+
return capInfo, capDon, workflowDon
172+
}
173+
174+
func buildTriggerEvent(t *testing.T, sender []byte) *remotetypes.MessageBody {
83175
triggerEventValue, err := values.NewMap(triggerEvent1)
84176
require.NoError(t, err)
85177
capResponse := commoncap.TriggerResponse{
@@ -90,8 +182,9 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
90182
}
91183
marshaled, err := pb.MarshalTriggerResponse(capResponse)
92184
require.NoError(t, err)
93-
triggerEvent := &remotetypes.MessageBody{
94-
Sender: p1[:],
185+
186+
return &remotetypes.MessageBody{
187+
Sender: sender,
95188
Method: remotetypes.MethodTriggerEvent,
96189
Metadata: &remotetypes.MessageBody_TriggerEventMetadata{
97190
TriggerEventMetadata: &remotetypes.TriggerEventMetadata{
@@ -100,11 +193,4 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
100193
},
101194
Payload: marshaled,
102195
}
103-
subscriber.Receive(ctx, triggerEvent)
104-
response := <-triggerEventCallbackCh
105-
require.Equal(t, response.Event.Outputs, triggerEventValue)
106-
107-
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
108-
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
109-
require.NoError(t, subscriber.Close())
110196
}

0 commit comments

Comments
 (0)