@@ -13,9 +13,9 @@ import (
13
13
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
14
14
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
15
15
remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks"
16
- "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
17
16
"github.com/smartcontractkit/chainlink/v2/core/logger"
18
17
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
18
+ "github.com/smartcontractkit/chainlink/v2/core/utils"
19
19
)
20
20
21
21
const (
@@ -29,29 +29,10 @@ var (
29
29
)
30
30
31
31
func TestTriggerSubscriber_RegisterAndReceive (t * testing.T ) {
32
+ t .Parallel ()
32
33
lggr := logger .TestLogger (t )
33
- ctx := testutils .Context (t )
34
- capInfo := commoncap.CapabilityInfo {
35
- ID : "cap_id@1" ,
36
- CapabilityType : commoncap .CapabilityTypeTrigger ,
37
- Description : "Remote Trigger" ,
38
- }
39
- p1 := p2ptypes.PeerID {}
40
- require .NoError (t , p1 .UnmarshalText ([]byte (peerID1 )))
41
- p2 := p2ptypes.PeerID {}
42
- require .NoError (t , p2 .UnmarshalText ([]byte (peerID2 )))
43
- capDonInfo := commoncap.DON {
44
- ID : 1 ,
45
- Members : []p2ptypes.PeerID {p1 },
46
- F : 0 ,
47
- }
48
- workflowDonInfo := commoncap.DON {
49
- ID : 2 ,
50
- Members : []p2ptypes.PeerID {p2 },
51
- F : 0 ,
52
- }
34
+ capInfo , capDon , workflowDon := buildTwoTestDONs (t , 1 , 1 )
53
35
dispatcher := remoteMocks .NewDispatcher (t )
54
-
55
36
awaitRegistrationMessageCh := make (chan struct {})
56
37
dispatcher .On ("Send" , mock .Anything , mock .Anything ).Return (nil ).Run (func (args mock.Arguments ) {
57
38
select {
@@ -67,19 +48,130 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
67
48
MinResponsesToAggregate : 1 ,
68
49
MessageExpiry : 100 * time .Second ,
69
50
}
70
- subscriber := remote .NewTriggerSubscriber (config , capInfo , capDonInfo , workflowDonInfo , dispatcher , nil , lggr )
71
- require .NoError (t , subscriber .Start (ctx ))
51
+ subscriber := remote .NewTriggerSubscriber (config , capInfo , capDon , workflowDon , dispatcher , nil , lggr )
52
+ require .NoError (t , subscriber .Start (t . Context () ))
72
53
73
54
req := commoncap.TriggerRegistrationRequest {
74
55
Metadata : commoncap.RequestMetadata {
75
56
WorkflowID : workflowID1 ,
76
57
},
77
58
}
78
- triggerEventCallbackCh , err := subscriber .RegisterTrigger (ctx , req )
59
+ triggerEventCallbackCh , err := subscriber .RegisterTrigger (t . Context () , req )
79
60
require .NoError (t , err )
61
+ t .Cleanup (func () {
62
+ require .NoError (t , subscriber .UnregisterTrigger (t .Context (), req ))
63
+ // calling UnregisterTrigger repeatedly is safe
64
+ require .NoError (t , subscriber .UnregisterTrigger (t .Context (), req ))
65
+ require .NoError (t , subscriber .Close ())
66
+ })
80
67
<- awaitRegistrationMessageCh
81
68
82
69
// receive trigger event
70
+ triggerEventValue , err := values .NewMap (triggerEvent1 )
71
+ require .NoError (t , err )
72
+ triggerEvent := buildTriggerEvent (t , capDon .Members [0 ][:])
73
+ subscriber .Receive (t .Context (), triggerEvent )
74
+ response := <- triggerEventCallbackCh
75
+ require .Equal (t , response .Event .Outputs , triggerEventValue )
76
+ }
77
+
78
+ func TestTriggerSubscriber_CorrectEventExpiryCheck (t * testing.T ) {
79
+ t .Parallel ()
80
+ lggr := logger .TestLogger (t )
81
+ capInfo , capDon , workflowDon := buildTwoTestDONs (t , 3 , 1 )
82
+ awaitRegistrationMessageCh := make (chan struct {})
83
+ dispatcher := remoteMocks .NewDispatcher (t )
84
+ dispatcher .On ("Send" , mock .Anything , mock .Anything ).Return (nil ).Run (func (args mock.Arguments ) {
85
+ select {
86
+ case awaitRegistrationMessageCh <- struct {}{}:
87
+ default :
88
+ }
89
+ })
90
+
91
+ // register trigger
92
+ config := & commoncap.RemoteTriggerConfig {
93
+ RegistrationRefresh : 100 * time .Millisecond ,
94
+ RegistrationExpiry : 10 * time .Second ,
95
+ MinResponsesToAggregate : 2 ,
96
+ MessageExpiry : 10 * time .Second ,
97
+ }
98
+ subscriber := remote .NewTriggerSubscriber (config , capInfo , capDon , workflowDon , dispatcher , nil , lggr )
99
+
100
+ require .NoError (t , subscriber .Start (t .Context ()))
101
+ regReq := commoncap.TriggerRegistrationRequest {
102
+ Metadata : commoncap.RequestMetadata {
103
+ WorkflowID : workflowID1 ,
104
+ },
105
+ }
106
+ triggerEventCallbackCh , err := subscriber .RegisterTrigger (t .Context (), regReq )
107
+ require .NoError (t , err )
108
+ t .Cleanup (func () {
109
+ require .NoError (t , subscriber .UnregisterTrigger (t .Context (), regReq ))
110
+ require .NoError (t , subscriber .Close ())
111
+ })
112
+ <- awaitRegistrationMessageCh
113
+
114
+ // receive trigger events:
115
+ // cleanup loop happens every 10 seconds, at 0:00, 0:10, 0:20, etc.
116
+ // send the event from the first node around 0:02 (this is a bad node
117
+ // that sends it too early)
118
+ triggerEvent := buildTriggerEvent (t , capDon .Members [0 ][:])
119
+ time .Sleep (2 * time .Second )
120
+ subscriber .Receive (t .Context (), triggerEvent )
121
+
122
+ // send events from nodes 2 & 3 (the good ones) around 0:15 so that
123
+ // the diff between 0:02 and 0:15 exceeds the expiry threshold but
124
+ // we don't hit the cleanup loop yet
125
+ time .Sleep (13 * time .Second )
126
+ triggerEvent .Sender = capDon .Members [1 ][:]
127
+ subscriber .Receive (t .Context (), triggerEvent )
128
+ // the aggregation shouldn't happen after events 1 and 2 as they
129
+ // were received too far apart in time
130
+ require .Empty (t , triggerEventCallbackCh )
131
+ triggerEvent .Sender = capDon .Members [2 ][:]
132
+ subscriber .Receive (t .Context (), triggerEvent )
133
+
134
+ // event should be processed
135
+ response := <- triggerEventCallbackCh
136
+ triggerEventValue , err := values .NewMap (triggerEvent1 )
137
+ require .NoError (t , err )
138
+ require .Equal (t , response .Event .Outputs , triggerEventValue )
139
+ }
140
+
141
+ func buildTwoTestDONs (t * testing.T , capDonSize int , workflowDonSize int ) (commoncap.CapabilityInfo , commoncap.DON , commoncap.DON ) {
142
+ capInfo := commoncap.CapabilityInfo {
143
+ ID : "cap_id@1" ,
144
+ CapabilityType : commoncap .CapabilityTypeTrigger ,
145
+ Description : "Remote Trigger" ,
146
+ }
147
+
148
+ capDon := commoncap.DON {
149
+ ID : 1 ,
150
+ Members : []p2ptypes.PeerID {},
151
+ F : 0 ,
152
+ }
153
+ for range capDonSize {
154
+ pid := utils .MustNewPeerID ()
155
+ peer := p2ptypes.PeerID {}
156
+ require .NoError (t , peer .UnmarshalText ([]byte (pid )))
157
+ capDon .Members = append (capDon .Members , peer )
158
+ }
159
+
160
+ workflowDon := commoncap.DON {
161
+ ID : 2 ,
162
+ Members : []p2ptypes.PeerID {},
163
+ F : 0 ,
164
+ }
165
+ for range workflowDonSize {
166
+ pid := utils .MustNewPeerID ()
167
+ peer := p2ptypes.PeerID {}
168
+ require .NoError (t , peer .UnmarshalText ([]byte (pid )))
169
+ workflowDon .Members = append (workflowDon .Members , peer )
170
+ }
171
+ return capInfo , capDon , workflowDon
172
+ }
173
+
174
+ func buildTriggerEvent (t * testing.T , sender []byte ) * remotetypes.MessageBody {
83
175
triggerEventValue , err := values .NewMap (triggerEvent1 )
84
176
require .NoError (t , err )
85
177
capResponse := commoncap.TriggerResponse {
@@ -90,8 +182,9 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
90
182
}
91
183
marshaled , err := pb .MarshalTriggerResponse (capResponse )
92
184
require .NoError (t , err )
93
- triggerEvent := & remotetypes.MessageBody {
94
- Sender : p1 [:],
185
+
186
+ return & remotetypes.MessageBody {
187
+ Sender : sender ,
95
188
Method : remotetypes .MethodTriggerEvent ,
96
189
Metadata : & remotetypes.MessageBody_TriggerEventMetadata {
97
190
TriggerEventMetadata : & remotetypes.TriggerEventMetadata {
@@ -100,11 +193,4 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
100
193
},
101
194
Payload : marshaled ,
102
195
}
103
- subscriber .Receive (ctx , triggerEvent )
104
- response := <- triggerEventCallbackCh
105
- require .Equal (t , response .Event .Outputs , triggerEventValue )
106
-
107
- require .NoError (t , subscriber .UnregisterTrigger (ctx , req ))
108
- require .NoError (t , subscriber .UnregisterTrigger (ctx , req ))
109
- require .NoError (t , subscriber .Close ())
110
196
}
0 commit comments