diff --git a/core/taskengine/engine.go b/core/taskengine/engine.go index 0de615e7..a91e3ecd 100644 --- a/core/taskengine/engine.go +++ b/core/taskengine/engine.go @@ -593,11 +593,9 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask ExecutionID: ulid.Make().String(), } - fmt.Println("task", task) if payload.IsBlocking { // Run the task inline, by pass the queue system executor := NewExecutor(n.smartWalletConfig, n.db, n.logger) - fmt.Println("queue task Data", queueTaskData) execution, err := executor.RunTask(task, &queueTaskData) if err == nil { return &avsproto.UserTriggerTaskResp{ diff --git a/core/taskengine/trigger/event.go b/core/taskengine/trigger/event.go index 41678945..2bec2711 100644 --- a/core/taskengine/trigger/event.go +++ b/core/taskengine/trigger/event.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/AvaProtocol/ap-avs/core/taskengine/macros" sdklogging "github.com/Layr-Labs/eigensdk-go/logging" @@ -24,9 +25,9 @@ var ( // a better idea is to only subscribe to what we need and re-load when new trigger is added whitelistTopics = [][]common.Hash{ []common.Hash{ - common.HexToHash("0x49628fd1471006c1482da88028e9ce4dbb080b815c9b0344d39e5a8e6ec1419f"), // UserOp common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), // erc20 transfer - common.HexToHash("0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"), // approve + //common.HexToHash("0x49628fd1471006c1482da88028e9ce4dbb080b815c9b0344d39e5a8e6ec1419f"), // UserOp + //common.HexToHash("0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"), // approve }, } ) @@ -107,7 +108,7 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error { Topics: whitelistTopics, } - sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{Topics: whitelistTopics}, logs) + sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs) evtTrigger.logger.Info("subscribing with filter", "topics", whitelistTopics) if err != nil { @@ -138,12 +139,15 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error { evtTrigger.retryConnectToRpc() sub, err = evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs) case event := <-logs: - evtTrigger.logger.Debug("detect new event, evaluate checks", "event", event.Topics[0], "contract", event.Address) + evtTrigger.logger.Debug("detect new event, evaluate checks", "event", event.Topics, "contract", event.Address, "tx", event.TxHash) // TODO: implement hint to avoid scan all checks toRemove := []string{} evtTrigger.progress += 1 + startTime := time.Now() + checksCount := 0 evtTrigger.checks.Range(func(key any, value any) bool { + checksCount++ if evtTrigger.shutdown { return false } @@ -160,6 +164,11 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error { }, } + evtTrigger.logger.Debug("check hit", + "check", key, + "tx_hash", event.TxHash, + ) + // if check.metadata.Remain >= 0 { // if check.metadata.Remain == 1 { // toRemove = append(toRemove, key.(string)) @@ -172,6 +181,12 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error { return true }) + duration := time.Since(startTime) + evtTrigger.logger.Info("completed check evaluations", + "checks_count", checksCount, + "duration_ms", duration.Milliseconds(), + "checks_per_second", float64(checksCount)/(float64(duration.Nanoseconds())/1e9)) + if len(toRemove) > 0 { for _, v := range toRemove { evtTrigger.checks.Delete(v) @@ -185,6 +200,43 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error { } func (evt *EventTrigger) Evaluate(event *types.Log, check *Check) (bool, error) { + var err error = nil + if len(check.Matcher) > 0 { + // This is the simpler trigger. It's essentially an anyof + return lo.SomeBy(check.Matcher, func(x *avsproto.EventCondition_Matcher) bool { + if len(x.Value) == 0 { + err = fmt.Errorf("matcher value is empty") + return false + } + + switch x.Type { + case "topics": + // Matching based on topic of transaction + topics := lo.Map[common.Hash, string](event.Topics, func(topic common.Hash, _ int) string { + return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0")) + }) + + match := true + // In Topics matching, this will be the array of topics. an element that is empty is skip + for i, v := range x.Value { + if v == "" || v == "0x" { + continue + } + + match = match && strings.EqualFold(topics[i], v) + } + return match + case "address": + // Matching base on token contract that emit the event + return strings.EqualFold(event.Address.String(), x.Value[0]) + } + + // Unsupport type + err = fmt.Errorf("unsupport matcher type: %s", x.Type) + return false + }), err + } + if check.Program != "" { // This is the advance trigger with js evaluation based on trigger data triggerVarName := check.TaskMetadata.GetTrigger().GetName() @@ -220,40 +272,6 @@ func (evt *EventTrigger) Evaluate(event *types.Log, check *Check) (bool, error) } - var err error = nil - if len(check.Matcher) > 0 { - // This is the simpler trigger. It's essentially an anyof - return lo.SomeBy(check.Matcher, func(x *avsproto.EventCondition_Matcher) bool { - if len(x.Value) == 0 { - err = fmt.Errorf("matcher value is empty") - return false - } - - switch x.Type { - case "topics": - // Matching based on topic of transaction - topics := lo.Map[common.Hash, string](event.Topics, func(topic common.Hash, _ int) string { - return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0")) - }) - match := true - for i, v := range x.Value { - match = match && (v == "" || strings.EqualFold(topics[i], v)) - if !match { - return false - } - } - return match - case "address": - // Matching base on token contract that emit the event - return strings.EqualFold(event.Address.String(), x.Value[0]) - } - - // Unsupport type - err = fmt.Errorf("unsupport matcher type: %s", x.Type) - return false - }), err - } - err = fmt.Errorf("invalid event trigger check: both matcher or expression are missing or empty") return false, err } diff --git a/core/taskengine/trigger/event_test.go b/core/taskengine/trigger/event_test.go index 64137241..74392f24 100644 --- a/core/taskengine/trigger/event_test.go +++ b/core/taskengine/trigger/event_test.go @@ -22,6 +22,10 @@ func TestTriggerTopicMatch(t *testing.T) { testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ + TaskMetadata: &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: "123", + }, + Matcher: []*avsproto.EventCondition_Matcher{ &avsproto.EventCondition_Matcher{ Type: "topics", @@ -84,6 +88,7 @@ func TestTriggerTopicMulti(t *testing.T) { testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ + Matcher: []*avsproto.EventCondition_Matcher{ &avsproto.EventCondition_Matcher{ Type: "topics", diff --git a/core/taskengine/trigger/time.go b/core/taskengine/trigger/time.go index dee4fe7a..e5b29f73 100644 --- a/core/taskengine/trigger/time.go +++ b/core/taskengine/trigger/time.go @@ -1 +1,156 @@ package trigger + +import ( + "context" + "fmt" + "sync" + "time" + + avsproto "github.com/AvaProtocol/ap-avs/protobuf" + sdklogging "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/go-co-op/gocron/v2" +) + +type TimeTrigger struct { + *CommonTrigger + + scheduler gocron.Scheduler + jobs map[string]gocron.Job // map taskID to job for removal + + // channel that we will push the trigger information back + triggerCh chan TriggerMetadata[uint64] +} + +func NewTimeTrigger(triggerCh chan TriggerMetadata[uint64], logger sdklogging.Logger) *TimeTrigger { + scheduler, _ := gocron.NewScheduler(gocron.WithLocation(time.UTC)) + + if scheduler == nil { + panic("fail to create scheduler") + } + + t := TimeTrigger{ + CommonTrigger: &CommonTrigger{ + done: make(chan bool), + shutdown: false, + logger: logger, + mu: sync.Mutex{}, + }, + scheduler: scheduler, + jobs: make(map[string]gocron.Job), + triggerCh: triggerCh, + } + + return &t +} + +func (t *TimeTrigger) epochToCron(epoch int64) string { + // Convert epoch to time + tm := time.Unix(epoch, 0) + // Create cron expression for specific time + return fmt.Sprintf("%d %d %d %d %d *", tm.Minute(), tm.Hour(), tm.Day(), tm.Month(), tm.Weekday()) +} + +func (t *TimeTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error { + t.mu.Lock() + defer t.mu.Unlock() + + taskID := check.TaskId + + // Function to be executed when trigger fires + triggerFunc := func() { + currentTime := time.Now().Unix() + t.logger.Info("time trigger fired", "task_id", taskID, "time", currentTime) + t.triggerCh <- TriggerMetadata[uint64]{ + TaskID: taskID, + Marker: uint64(currentTime), + } + } + + var job gocron.Job + var err error + + if fixedTime := check.GetTrigger().GetFixedTime(); fixedTime != nil { + // Handle epoch-based scheduling + epochs := fixedTime.GetEpochs() + if len(epochs) == 0 { + return fmt.Errorf("no epochs provided") + } + + // Schedule a job for each epoch + for _, epoch := range epochs { + // If epoch is in the past, skip scheduling + if epoch < time.Now().Unix() { + t.logger.Info("skipping past epoch", "task_id", taskID, "epoch", epoch) + continue + } + + cronExpr := t.epochToCron(epoch) + job, err = t.scheduler.NewJob( + gocron.CronJob(cronExpr, false), + gocron.NewTask(triggerFunc), + // In FixedTime, we want to run the job only once + gocron.WithLimitedRuns(1), + ) + if err != nil { + return fmt.Errorf("failed to schedule epoch job: %w", err) + } + t.jobs[taskID] = job + } + } else if cronTrigger := check.GetTrigger().GetCron(); cronTrigger != nil { + // Handle cron-based scheduling + schedules := cronTrigger.GetSchedule() + if len(schedules) == 0 { + return fmt.Errorf("no cron expressions provided") + } + + // Schedule a job for each cron expression + for _, cronExpr := range schedules { + if cronExpr == "" { + continue + } + + job, err = t.scheduler.NewJob( + gocron.CronJob(cronExpr, false), + gocron.NewTask(triggerFunc), + ) + if err != nil { + return fmt.Errorf("failed to schedule cron job: %w", err) + } + t.jobs[taskID] = job + } + } + + return nil +} + +func (t *TimeTrigger) Remove(check *avsproto.SyncMessagesResp_TaskMetadata) error { + t.mu.Lock() + defer t.mu.Unlock() + + taskID := check.TaskId + if job, exists := t.jobs[taskID]; exists { + t.scheduler.RemoveJob(job.ID()) + delete(t.jobs, taskID) + } + + return nil +} + +func (t *TimeTrigger) Run(ctx context.Context) error { + t.scheduler.Start() + + go func() { + for { + select { + case <-ctx.Done(): + t.scheduler.Shutdown() + return + case <-t.done: + t.scheduler.Shutdown() + return + } + } + }() + + return nil +} diff --git a/core/taskengine/vm.go b/core/taskengine/vm.go index 15945004..58bcec4d 100644 --- a/core/taskengine/vm.go +++ b/core/taskengine/vm.go @@ -601,6 +601,9 @@ func (v *VM) preprocessText(text string) string { jsvm := goja.New() for key, value := range v.vars { + if v.logger != nil { + v.logger.Debug("evaludate pre-processor bind var", "task_id", v.GetTaskId(), key, value) + } jsvm.Set(key, value) } @@ -641,6 +644,9 @@ func (v *VM) preprocessText(text string) string { script := fmt.Sprintf(`(() => { return %s; })()`, expr) evaluated, err := jsvm.RunString(script) + if v.logger != nil { + v.logger.Debug("evaludate pre-processor script", "task_id", v.GetTaskId(), "script", script, "result", evaluated) + } if err != nil { // If there's an error, move past this opening bracket and continue result = result[:start] + result[end+2:] @@ -675,3 +681,11 @@ func (v *VM) CollectInputs() []string { return inputs } + +func (v *VM) GetTaskId() string { + if v.task != nil && v.task.Task != nil { + return v.task.Id + } + + return "" +} diff --git a/core/taskengine/vm_runner_contract_read.go b/core/taskengine/vm_runner_contract_read.go index dbd17e4e..7a5d71b9 100644 --- a/core/taskengine/vm_runner_contract_read.go +++ b/core/taskengine/vm_runner_contract_read.go @@ -82,6 +82,7 @@ func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractRe return s, err } result, err := parsedABI.Unpack(method.Name, output) + if err != nil { s.Success = false s.Error = fmt.Errorf("error decode result: %w", err).Error() diff --git a/operator/operator.go b/operator/operator.go index 8a94a9d0..d8eccb4d 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -148,6 +148,7 @@ type Operator struct { scheduler gocron.Scheduler eventTrigger *triggerengine.EventTrigger blockTrigger *triggerengine.BlockTrigger + timeTrigger *triggerengine.TimeTrigger } func RunWithConfig(configPath string) { diff --git a/operator/worker_loop.go b/operator/worker_loop.go index 58c8f95c..203e1329 100644 --- a/operator/worker_loop.go +++ b/operator/worker_loop.go @@ -45,24 +45,26 @@ func (o *Operator) runWorkLoop(ctx context.Context) error { } else { metricsErrChan = make(chan error, 1) } - // Register a subscriber on new block event and perform our code such as - // reporting time and perform check result - // TODO: Initialize time based task checking + rpcConfig := triggerengine.RpcOption{ RpcURL: o.config.TargetChain.EthRpcUrl, WsRpcURL: o.config.TargetChain.EthWsUrl, } + blockTriggerCh := make(chan triggerengine.TriggerMetadata[int64], 1000) o.blockTrigger = triggerengine.NewBlockTrigger(&rpcConfig, blockTriggerCh, o.logger) eventTriggerCh := make(chan triggerengine.TriggerMetadata[triggerengine.EventMark], 1000) o.eventTrigger = triggerengine.NewEventTrigger(&rpcConfig, eventTriggerCh, o.logger) + timeTriggerCh := make(chan triggerengine.TriggerMetadata[uint64], 1000) + o.timeTrigger = triggerengine.NewTimeTrigger(timeTriggerCh, o.logger) + o.blockTrigger.Run(ctx) o.eventTrigger.Run(ctx) + o.timeTrigger.Run(ctx) - // Establish a connection with gRPC server where new task will be pushed - // automatically + // Establish a connection with gRPC server where new task will be pushed automatically o.logger.Info("open channel to grpc to receive check") go o.StreamMessages() @@ -70,8 +72,24 @@ func (o *Operator) runWorkLoop(ctx context.Context) error { select { case <-ctx.Done(): return nil + case triggerItem := <-timeTriggerCh: + o.logger.Info("time trigger", "task_id", triggerItem.TaskID, "marker", triggerItem.Marker) + + if _, err := o.nodeRpcClient.NotifyTriggers(context.Background(), &avspb.NotifyTriggersReq{ + Address: o.config.OperatorAddress, + Signature: "pending", + TaskId: triggerItem.TaskID, + Reason: &avspb.TriggerReason{ + Epoch: uint64(triggerItem.Marker), + Type: avspb.TriggerReason_Cron, + }, + }); err == nil { + o.logger.Debug("Succesfully notifiy aggregator for task hit", "taskid", triggerItem.TaskID) + } else { + o.logger.Errorf("task trigger is in alert condition but failed to sync to aggregator", err, "taskid", triggerItem.TaskID) + } case triggerItem := <-blockTriggerCh: - o.logger.Debug("block trigger", "task_id", triggerItem.TaskID, "marker", triggerItem.Marker) + o.logger.Info("block trigger", "task_id", triggerItem.TaskID, "marker", triggerItem.Marker) if _, err := o.nodeRpcClient.NotifyTriggers(context.Background(), &avspb.NotifyTriggersReq{ Address: o.config.OperatorAddress, @@ -79,6 +97,7 @@ func (o *Operator) runWorkLoop(ctx context.Context) error { TaskId: triggerItem.TaskID, Reason: &avspb.TriggerReason{ BlockNumber: uint64(triggerItem.Marker), + Type: avspb.TriggerReason_Block, }, }); err == nil { o.logger.Debug("Succesfully notifiy aggregator for task hit", "taskid", triggerItem.TaskID) @@ -87,7 +106,7 @@ func (o *Operator) runWorkLoop(ctx context.Context) error { } case triggerItem := <-eventTriggerCh: - o.logger.Debug("event trigger", "task_id", triggerItem.TaskID, "marker", triggerItem.Marker) + o.logger.Info("event trigger", "task_id", triggerItem.TaskID, "marker", triggerItem.Marker) if _, err := o.nodeRpcClient.NotifyTriggers(context.Background(), &avspb.NotifyTriggersReq{ Address: o.config.OperatorAddress, @@ -97,6 +116,7 @@ func (o *Operator) runWorkLoop(ctx context.Context) error { BlockNumber: uint64(triggerItem.Marker.BlockNumber), LogIndex: uint64(triggerItem.Marker.LogIndex), TxHash: triggerItem.Marker.TxHash, + Type: avspb.TriggerReason_Event, }, }); err == nil { o.logger.Debug("Succesfully notifiy aggregator for task hit", "taskid", triggerItem.TaskID) @@ -161,6 +181,8 @@ func (o *Operator) StreamMessages() { o.logger.Info("received new event trigger", "id", resp.Id, "type", resp.TaskMetadata.Trigger) if err := o.eventTrigger.AddCheck(resp.TaskMetadata); err != nil { o.logger.Info("add trigger to monitor error", err) + } else { + o.logger.Info("succesfully monitor", "task_id", resp.Id, "component", "eventTrigger") } } else if trigger := resp.TaskMetadata.Trigger.GetBlock(); trigger != nil { o.logger.Info("received new block trigger", "id", resp.Id, "interval", resp.TaskMetadata.Trigger) @@ -169,7 +191,13 @@ func (o *Operator) StreamMessages() { } else { o.logger.Info("succesfully monitor", "task_id", resp.Id, "component", "blockTrigger") } - + } else if trigger := resp.TaskMetadata.Trigger.GetCron(); trigger != nil { + o.logger.Info("received new cron trigger", "id", resp.Id, "cron", resp.TaskMetadata.Trigger) + if err := o.timeTrigger.AddCheck(resp.TaskMetadata); err != nil { + o.logger.Errorf("add trigger to monitor error", err, "task_id", resp.Id) + } else { + o.logger.Info("succesfully monitor", "task_id", resp.Id, "component", "timeTrigger") + } } } }