Skip to content

Commit b9272ce

Browse files
committed
Merge branch 'develop' into PLEX-1439
2 parents c4a5590 + e81644e commit b9272ce

File tree

23 files changed

+653
-425
lines changed

23 files changed

+653
-425
lines changed

.changeset/small-foxes-roll.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink": patch
3+
---
4+
5+
moves cre/engine scripts to main module adds cron example #internal
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
package fakes
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"time"
9+
10+
"github.com/go-co-op/gocron/v2"
11+
"github.com/jonboulle/clockwork"
12+
13+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
14+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/cron"
15+
crontypedapi "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/cron"
16+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
17+
"github.com/smartcontractkit/chainlink-common/pkg/services"
18+
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
19+
)
20+
21+
// TODO(CAPPL-866): remove this copy of cron trigger implementation from fakes
22+
const ID = "cron-trigger@1.0.0"
23+
const ServiceName = "CronCapabilities"
24+
25+
const (
26+
defaultSendChannelBufferSize = 1
27+
defaultFastestScheduleIntervalSeconds = 30
28+
)
29+
30+
var cronTriggerInfo = capabilities.MustNewCapabilityInfo(
31+
ID,
32+
capabilities.CapabilityTypeTrigger,
33+
"A trigger that uses a cron schedule to run periodically at fixed times, dates, or intervals.",
34+
)
35+
36+
type Config struct {
37+
FastestScheduleIntervalSeconds int `json:"fastestScheduleIntervalSeconds"`
38+
}
39+
40+
type Response struct {
41+
capabilities.TriggerEvent
42+
Payload cron.Payload
43+
}
44+
45+
type cronTrigger struct {
46+
ch chan<- capabilities.TriggerAndId[*crontypedapi.Payload]
47+
job gocron.Job
48+
nextRun time.Time
49+
}
50+
51+
type Service struct {
52+
capabilities.CapabilityInfo
53+
config Config
54+
clock clockwork.Clock
55+
lggr logger.Logger
56+
scheduler gocron.Scheduler
57+
triggers *cronStore
58+
}
59+
60+
var _ services.Service = &Service{}
61+
62+
// NewTriggerService creates a new trigger service. Optionally, a clock can be passed in for testing, if nil
63+
// the system clock will be used.
64+
func NewTriggerService(parentLggr logger.Logger, clock clockwork.Clock) *Service {
65+
lggr := logger.Named(parentLggr, "Service")
66+
67+
var options []gocron.SchedulerOption
68+
// Set scheduler location to UTC for consistency across nodes.
69+
options = append(options, gocron.WithLocation(time.UTC))
70+
71+
// Allow injecting a clock for testing. Otherwise use system clock.
72+
if clock != nil {
73+
options = append(options, gocron.WithClock(clock))
74+
} else {
75+
clock = clockwork.NewRealClock()
76+
}
77+
78+
scheduler, err := gocron.NewScheduler(options...)
79+
if err != nil {
80+
return nil
81+
}
82+
83+
return &Service{
84+
lggr: lggr,
85+
CapabilityInfo: cronTriggerInfo,
86+
triggers: NewCronStore(),
87+
scheduler: scheduler,
88+
clock: clock,
89+
}
90+
}
91+
92+
func (s *Service) Initialise(ctx context.Context, config string, _ core.TelemetryService,
93+
_ core.KeyValueStore,
94+
_ core.ErrorLog,
95+
_ core.PipelineRunnerService,
96+
_ core.RelayerSet,
97+
_ core.OracleFactory) error {
98+
s.lggr.Debugf("Initialising %s", ServiceName)
99+
100+
var cronConfig Config
101+
if len(config) > 0 {
102+
err := json.Unmarshal([]byte(config), &cronConfig)
103+
if err != nil {
104+
return fmt.Errorf("failed to unmarshal config: %s %w", config, err)
105+
}
106+
}
107+
108+
if cronConfig.FastestScheduleIntervalSeconds == 0 {
109+
cronConfig.FastestScheduleIntervalSeconds = defaultFastestScheduleIntervalSeconds
110+
}
111+
112+
s.config = cronConfig
113+
114+
err := s.Start(ctx)
115+
if err != nil {
116+
return fmt.Errorf("error when starting trigger service: %w", err)
117+
}
118+
119+
return nil
120+
}
121+
122+
func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *crontypedapi.Config) (<-chan capabilities.TriggerAndId[*crontypedapi.Payload], error) {
123+
_, ok := s.triggers.Read(triggerID)
124+
if ok {
125+
return nil, fmt.Errorf("triggerId %s already registered", triggerID)
126+
}
127+
128+
var job gocron.Job
129+
callbackCh := make(chan capabilities.TriggerAndId[*crontypedapi.Payload], defaultSendChannelBufferSize)
130+
131+
allowSeconds := true
132+
jobDef := gocron.CronJob(input.Schedule, allowSeconds)
133+
134+
err := enforceFastestSchedule(s.lggr, s.clock, jobDef, time.Second*time.Duration(s.config.FastestScheduleIntervalSeconds))
135+
if err != nil {
136+
return nil, err
137+
}
138+
139+
task := gocron.NewTask(
140+
// Task callback, executed at next run time
141+
func() {
142+
trigger, ok := s.triggers.Read(triggerID)
143+
if !ok {
144+
// Invariant: The trigger should always exist, as unregistering the trigger removes the job
145+
s.lggr.Errorw("task callback invariant: trigger no longer exists", "triggerID", triggerID)
146+
return
147+
}
148+
149+
scheduledExecutionTimeUTC := trigger.nextRun.UTC()
150+
currentTimeUTC := s.clock.Now().UTC()
151+
152+
response := createTriggerResponse(scheduledExecutionTimeUTC)
153+
154+
s.lggr.Debugw("task callback sending trigger response", "executionID", metadata.WorkflowExecutionID, "triggerID", triggerID, "scheduledExecTimeUTC", scheduledExecutionTimeUTC.Format(time.RFC3339Nano), "actualExecTimeUTC", currentTimeUTC.Format(time.RFC3339Nano))
155+
156+
nextExecutionTime, nextRunErr := job.NextRun()
157+
if nextRunErr != nil {
158+
// .NextRun() will error if the job no longer exists
159+
// or if there is no next run to schedule, which shouldn't happen with cron jobs
160+
s.lggr.Errorw("task callback failed to schedule next run", "executionID", metadata.WorkflowExecutionID, "triggerID", triggerID)
161+
}
162+
163+
s.triggers.Write(triggerID, cronTrigger{
164+
ch: callbackCh,
165+
job: job,
166+
nextRun: nextExecutionTime,
167+
})
168+
169+
select {
170+
case callbackCh <- response:
171+
default:
172+
s.lggr.Errorw("callback channel full, dropping event", "executionID", metadata.WorkflowExecutionID, "triggerID", triggerID, "eventID", response.Id)
173+
}
174+
})
175+
176+
if s.scheduler == nil {
177+
return nil, errors.New("cannot register a new trigger, service has been closed")
178+
}
179+
180+
// If service has already started, job will be scheduled immediately
181+
job, err = s.scheduler.NewJob(jobDef, task, gocron.WithName(triggerID))
182+
if err != nil {
183+
s.lggr.Errorw("failed to create new job", "err", err)
184+
return nil, err
185+
}
186+
187+
firstRunTime, err := job.NextRun()
188+
if err != nil {
189+
// errors if job no longer exists on scheduler
190+
s.lggr.Errorw("failed to get next run time", "err", err)
191+
// ensure that it is out of scheduler
192+
err := s.scheduler.RemoveJob(job.ID())
193+
return nil, err
194+
}
195+
196+
s.triggers.Write(triggerID, cronTrigger{
197+
ch: callbackCh,
198+
job: job,
199+
nextRun: firstRunTime,
200+
})
201+
202+
s.lggr.Debugw("Trigger registered", "workflowId", metadata.WorkflowID, "triggerId", triggerID, "jobId", job.ID())
203+
204+
return callbackCh, nil
205+
}
206+
207+
func createTriggerResponse(scheduledExecutionTime time.Time) capabilities.TriggerAndId[*crontypedapi.Payload] {
208+
// Ensure UTC time is used for consistency across nodes.
209+
scheduledExecutionTimeUTC := scheduledExecutionTime.UTC()
210+
211+
// Use the scheduled execution time as a deterministic identifier.
212+
// Since cron schedules only go to second granularity this should never have ms.
213+
// Just in case, truncate on seconds by formatting to ensure consistency across nodes.
214+
scheduledExecutionTimeFormatted := scheduledExecutionTimeUTC.Format(time.RFC3339)
215+
triggerEventID := scheduledExecutionTimeFormatted
216+
217+
return capabilities.TriggerAndId[*crontypedapi.Payload]{
218+
Trigger: &crontypedapi.Payload{
219+
ScheduledExecutionTime: scheduledExecutionTimeUTC.Format(time.RFC3339Nano),
220+
},
221+
Id: triggerEventID,
222+
}
223+
}
224+
225+
func (s *Service) UnregisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *crontypedapi.Config) error {
226+
s.lggr.Debug("got call to unregister trigger")
227+
trigger, ok := s.triggers.Read(triggerID)
228+
if !ok {
229+
return fmt.Errorf("triggerId %s not found", triggerID)
230+
}
231+
232+
jobID := trigger.job.ID()
233+
234+
// Remove job from scheduler
235+
if s.scheduler == nil {
236+
return errors.New("cannot unregister a new trigger, service has been closed")
237+
}
238+
err := s.scheduler.RemoveJob(jobID)
239+
if err != nil {
240+
return fmt.Errorf("UnregisterTrigger failed to remove job from scheduler: %w", err)
241+
}
242+
243+
// Close callback channel
244+
s.lggr.Debug("closing event channel")
245+
close(trigger.ch)
246+
247+
// Remove from triggers context
248+
s.triggers.Delete(triggerID)
249+
250+
s.lggr.Debugw("UnregisterTrigger", "triggerId", triggerID, "jobId", jobID)
251+
return nil
252+
}
253+
254+
// Start the service.
255+
func (s *Service) Start(ctx context.Context) error {
256+
if s.scheduler == nil {
257+
return errors.New("service has shutdown, it must be built again to restart")
258+
}
259+
260+
s.scheduler.Start()
261+
262+
for triggerID, trigger := range s.triggers.ReadAll() {
263+
nextExecutionTime, err := trigger.job.NextRun()
264+
s.triggers.Write(triggerID, cronTrigger{
265+
ch: trigger.ch,
266+
job: trigger.job,
267+
nextRun: nextExecutionTime,
268+
})
269+
if err != nil {
270+
s.lggr.Errorw("Unable to get next run time", "err", err, "triggerID", triggerID)
271+
}
272+
}
273+
274+
s.lggr.Info(s.Name() + " started")
275+
276+
return nil
277+
}
278+
279+
// Close stops the Service.
280+
// After this call the Service cannot be started again,
281+
// The service will need to be re-built to start scheduling again.
282+
func (s *Service) Close() error {
283+
if s.scheduler == nil {
284+
return errors.New("service has shutdown, it must be built again to restart")
285+
}
286+
287+
err := s.scheduler.Shutdown()
288+
if err != nil {
289+
return fmt.Errorf("scheduler shutdown encountered a problem: %w", err)
290+
}
291+
292+
// After .Shutdown() the scheduler cannot be started again,
293+
// but calling .Start() on it will not error. Set to nil to mark closed.
294+
s.scheduler = nil
295+
296+
s.lggr.Info(s.Name() + " closed")
297+
298+
return nil
299+
}
300+
301+
func (s *Service) Ready() error {
302+
return nil
303+
}
304+
305+
func (s *Service) HealthReport() map[string]error {
306+
return map[string]error{s.Name(): nil}
307+
}
308+
309+
func (s *Service) Name() string {
310+
return s.lggr.Name()
311+
}
312+
313+
func (s *Service) Description() string {
314+
return "Cron Trigger Capability"
315+
}
316+
317+
func enforceFastestSchedule(lggr logger.Logger, clock clockwork.Clock, jobDef gocron.JobDefinition, maximumFastest time.Duration) error {
318+
var options []gocron.SchedulerOption
319+
// Set scheduler location to UTC for consistency across nodes.
320+
options = append(options, gocron.WithLocation(time.UTC))
321+
// Use passed in clock
322+
options = append(options, gocron.WithClock(clock))
323+
324+
tempScheduler, err := gocron.NewScheduler(options...)
325+
if err != nil {
326+
return err
327+
}
328+
tempJob, err := tempScheduler.NewJob(jobDef, gocron.NewTask(func() {}))
329+
if err != nil {
330+
return err
331+
}
332+
tempScheduler.Start()
333+
defer func() {
334+
if err = tempScheduler.Shutdown(); err != nil {
335+
lggr.Errorw("error shutting down enforceFastestSchedule temporary scheduler")
336+
}
337+
}()
338+
339+
nextRuns, err := tempJob.NextRuns(2)
340+
if err != nil {
341+
return err
342+
}
343+
344+
if len(nextRuns) != 2 {
345+
return errors.New("could not determine next two scheduled runs")
346+
}
347+
348+
if nextRuns[1].Before(nextRuns[0].Add(maximumFastest)) {
349+
return fmt.Errorf("maximum fastest cron schedule is %s", maximumFastest.String())
350+
}
351+
352+
return nil
353+
}

0 commit comments

Comments
 (0)