Skip to content

Commit 960fdf2

Browse files
committed
Fix remote trigger event expiry logic
1 parent 96509f4 commit 960fdf2

File tree

2 files changed

+114
-35
lines changed

2 files changed

+114
-35
lines changed

core/capabilities/remote/trigger_subscriber.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,8 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
219219
creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload)
220220
ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true)
221221
s.mu.Unlock()
222-
if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() {
223-
s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender)
224-
continue
225-
}
222+
s.lggr.Debugw("trigger event received", "triggerEventId", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender, "ready", ready, "nowTs", nowMs, "creationTs", creationTs, "minResponsesToAggregate", s.config.MinResponsesToAggregate)
226223
if ready {
227-
s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID)
228224
aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads)
229225
if err != nil {
230226
s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "err", err)

core/capabilities/remote/trigger_subscriber_test.go

Lines changed: 113 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
1717
"github.com/smartcontractkit/chainlink/v2/core/logger"
1818
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
19+
"github.com/smartcontractkit/chainlink/v2/core/utils"
1920
)
2021

2122
const (
@@ -31,27 +32,8 @@ var (
3132
func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
3233
lggr := logger.TestLogger(t)
3334
ctx := testutils.Context(t)
34-
capInfo := commoncap.CapabilityInfo{
35-
ID: "cap_id@1",
36-
CapabilityType: commoncap.CapabilityTypeTrigger,
37-
Description: "Remote Trigger",
38-
}
39-
p1 := p2ptypes.PeerID{}
40-
require.NoError(t, p1.UnmarshalText([]byte(peerID1)))
41-
p2 := p2ptypes.PeerID{}
42-
require.NoError(t, p2.UnmarshalText([]byte(peerID2)))
43-
capDonInfo := commoncap.DON{
44-
ID: 1,
45-
Members: []p2ptypes.PeerID{p1},
46-
F: 0,
47-
}
48-
workflowDonInfo := commoncap.DON{
49-
ID: 2,
50-
Members: []p2ptypes.PeerID{p2},
51-
F: 0,
52-
}
35+
capInfo, capDon, workflowDon := buildTwoTestDONs(t, 1, 1)
5336
dispatcher := remoteMocks.NewDispatcher(t)
54-
5537
awaitRegistrationMessageCh := make(chan struct{})
5638
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
5739
select {
@@ -67,7 +49,7 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
6749
MinResponsesToAggregate: 1,
6850
MessageExpiry: 100 * time.Second,
6951
}
70-
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
52+
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr)
7153
require.NoError(t, subscriber.Start(ctx))
7254

7355
req := commoncap.TriggerRegistrationRequest{
@@ -80,6 +62,113 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
8062
<-awaitRegistrationMessageCh
8163

8264
// receive trigger event
65+
triggerEventValue, err := values.NewMap(triggerEvent1)
66+
require.NoError(t, err)
67+
triggerEvent := buildTriggerEvent(t, capDon.Members[0][:])
68+
subscriber.Receive(ctx, triggerEvent)
69+
response := <-triggerEventCallbackCh
70+
require.Equal(t, response.Event.Outputs, triggerEventValue)
71+
72+
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
73+
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
74+
require.NoError(t, subscriber.Close())
75+
}
76+
77+
func TestTriggerSubscriber_CorrectEventExpiryCheck(t *testing.T) {
78+
lggr := logger.TestLogger(t)
79+
ctx := testutils.Context(t)
80+
capInfo, capDon, workflowDon := buildTwoTestDONs(t, 3, 1)
81+
awaitRegistrationMessageCh := make(chan struct{})
82+
dispatcher := remoteMocks.NewDispatcher(t)
83+
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
84+
select {
85+
case awaitRegistrationMessageCh <- struct{}{}:
86+
default:
87+
}
88+
})
89+
90+
// register trigger
91+
config := &commoncap.RemoteTriggerConfig{
92+
RegistrationRefresh: 100 * time.Millisecond,
93+
RegistrationExpiry: 10 * time.Second,
94+
MinResponsesToAggregate: 2,
95+
MessageExpiry: 10 * time.Second,
96+
}
97+
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDon, workflowDon, dispatcher, nil, lggr)
98+
99+
require.NoError(t, subscriber.Start(ctx))
100+
regReq := commoncap.TriggerRegistrationRequest{
101+
Metadata: commoncap.RequestMetadata{
102+
WorkflowID: workflowID1,
103+
},
104+
}
105+
triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, regReq)
106+
require.NoError(t, err)
107+
<-awaitRegistrationMessageCh
108+
109+
// receive trigger events:
110+
// cleanup loop happens every 10 seconds, at 0:00, 0:10, 0:20, etc.
111+
// send the event from the first node around 0:02 (this is a bad node
112+
// that sends it too early)
113+
triggerEvent := buildTriggerEvent(t, capDon.Members[0][:])
114+
time.Sleep(2 * time.Second)
115+
subscriber.Receive(ctx, triggerEvent)
116+
117+
// send events from nodes 2 & 3 (the good ones) around 0:15 so that
118+
// the diff between 0:02 and 0:15 exceeds the expiry threshold but
119+
// we don't hit the cleanup loop yet
120+
time.Sleep(13 * time.Second)
121+
triggerEvent.Sender = capDon.Members[1][:]
122+
subscriber.Receive(ctx, triggerEvent)
123+
triggerEvent.Sender = capDon.Members[2][:]
124+
subscriber.Receive(ctx, triggerEvent)
125+
126+
// event should be processed
127+
response := <-triggerEventCallbackCh
128+
triggerEventValue, err := values.NewMap(triggerEvent1)
129+
require.NoError(t, err)
130+
require.Equal(t, response.Event.Outputs, triggerEventValue)
131+
132+
// cleanly unregister and close
133+
require.NoError(t, subscriber.UnregisterTrigger(ctx, regReq))
134+
require.NoError(t, subscriber.UnregisterTrigger(ctx, regReq))
135+
require.NoError(t, subscriber.Close())
136+
}
137+
138+
func buildTwoTestDONs(t *testing.T, capDonSize int, workflowDonSize int) (commoncap.CapabilityInfo, commoncap.DON, commoncap.DON) {
139+
capInfo := commoncap.CapabilityInfo{
140+
ID: "cap_id@1",
141+
CapabilityType: commoncap.CapabilityTypeTrigger,
142+
Description: "Remote Trigger",
143+
}
144+
145+
capDon := commoncap.DON{
146+
ID: 1,
147+
Members: []p2ptypes.PeerID{},
148+
F: 0,
149+
}
150+
for range capDonSize {
151+
pid := utils.MustNewPeerID()
152+
peer := p2ptypes.PeerID{}
153+
require.NoError(t, peer.UnmarshalText([]byte(pid)))
154+
capDon.Members = append(capDon.Members, peer)
155+
}
156+
157+
workflowDon := commoncap.DON{
158+
ID: 2,
159+
Members: []p2ptypes.PeerID{},
160+
F: 0,
161+
}
162+
for range workflowDonSize {
163+
pid := utils.MustNewPeerID()
164+
peer := p2ptypes.PeerID{}
165+
require.NoError(t, peer.UnmarshalText([]byte(pid)))
166+
workflowDon.Members = append(workflowDon.Members, peer)
167+
}
168+
return capInfo, capDon, workflowDon
169+
}
170+
171+
func buildTriggerEvent(t *testing.T, sender []byte) *remotetypes.MessageBody {
83172
triggerEventValue, err := values.NewMap(triggerEvent1)
84173
require.NoError(t, err)
85174
capResponse := commoncap.TriggerResponse{
@@ -90,8 +179,9 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
90179
}
91180
marshaled, err := pb.MarshalTriggerResponse(capResponse)
92181
require.NoError(t, err)
93-
triggerEvent := &remotetypes.MessageBody{
94-
Sender: p1[:],
182+
183+
return &remotetypes.MessageBody{
184+
Sender: sender,
95185
Method: remotetypes.MethodTriggerEvent,
96186
Metadata: &remotetypes.MessageBody_TriggerEventMetadata{
97187
TriggerEventMetadata: &remotetypes.TriggerEventMetadata{
@@ -100,11 +190,4 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
100190
},
101191
Payload: marshaled,
102192
}
103-
subscriber.Receive(ctx, triggerEvent)
104-
response := <-triggerEventCallbackCh
105-
require.Equal(t, response.Event.Outputs, triggerEventValue)
106-
107-
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
108-
require.NoError(t, subscriber.UnregisterTrigger(ctx, req))
109-
require.NoError(t, subscriber.Close())
110193
}

0 commit comments

Comments
 (0)