Skip to content

add time trigger, perf report, debug log #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,9 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask
ExecutionID: ulid.Make().String(),
}

fmt.Println("task", task)
if payload.IsBlocking {
// Run the task inline, by pass the queue system
executor := NewExecutor(n.smartWalletConfig, n.db, n.logger)
fmt.Println("queue task Data", queueTaskData)
execution, err := executor.RunTask(task, &queueTaskData)
if err == nil {
return &avsproto.UserTriggerTaskResp{
Expand Down
94 changes: 56 additions & 38 deletions core/taskengine/trigger/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/AvaProtocol/ap-avs/core/taskengine/macros"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
Expand All @@ -24,9 +25,9 @@ var (
// a better idea is to only subscribe to what we need and re-load when new trigger is added
whitelistTopics = [][]common.Hash{
[]common.Hash{
common.HexToHash("0x49628fd1471006c1482da88028e9ce4dbb080b815c9b0344d39e5a8e6ec1419f"), // UserOp
common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), // erc20 transfer
common.HexToHash("0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"), // approve
//common.HexToHash("0x49628fd1471006c1482da88028e9ce4dbb080b815c9b0344d39e5a8e6ec1419f"), // UserOp
//common.HexToHash("0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"), // approve
},
}
)
Expand Down Expand Up @@ -107,7 +108,7 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
Topics: whitelistTopics,
}

sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{Topics: whitelistTopics}, logs)
sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs)
evtTrigger.logger.Info("subscribing with filter", "topics", whitelistTopics)

if err != nil {
Expand Down Expand Up @@ -138,12 +139,15 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
evtTrigger.retryConnectToRpc()
sub, err = evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs)
case event := <-logs:
evtTrigger.logger.Debug("detect new event, evaluate checks", "event", event.Topics[0], "contract", event.Address)
evtTrigger.logger.Debug("detect new event, evaluate checks", "event", event.Topics, "contract", event.Address, "tx", event.TxHash)
// TODO: implement hint to avoid scan all checks
toRemove := []string{}
evtTrigger.progress += 1

startTime := time.Now()
checksCount := 0
evtTrigger.checks.Range(func(key any, value any) bool {
checksCount++
if evtTrigger.shutdown {
return false
}
Expand All @@ -160,6 +164,11 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
},
}

evtTrigger.logger.Debug("check hit",
"check", key,
"tx_hash", event.TxHash,
)

// if check.metadata.Remain >= 0 {
// if check.metadata.Remain == 1 {
// toRemove = append(toRemove, key.(string))
Expand All @@ -172,6 +181,12 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
return true
})

duration := time.Since(startTime)
evtTrigger.logger.Info("completed check evaluations",
"checks_count", checksCount,
"duration_ms", duration.Milliseconds(),
"checks_per_second", float64(checksCount)/(float64(duration.Nanoseconds())/1e9))

if len(toRemove) > 0 {
for _, v := range toRemove {
evtTrigger.checks.Delete(v)
Expand All @@ -185,6 +200,43 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
}

func (evt *EventTrigger) Evaluate(event *types.Log, check *Check) (bool, error) {
var err error = nil
if len(check.Matcher) > 0 {
// This is the simpler trigger. It's essentially an anyof
return lo.SomeBy(check.Matcher, func(x *avsproto.EventCondition_Matcher) bool {
if len(x.Value) == 0 {
err = fmt.Errorf("matcher value is empty")
return false
}

switch x.Type {
case "topics":
// Matching based on topic of transaction
topics := lo.Map[common.Hash, string](event.Topics, func(topic common.Hash, _ int) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
})

match := true
// In Topics matching, this will be the array of topics. an element that is empty is skip
for i, v := range x.Value {
if v == "" || v == "0x" {
continue
}

match = match && strings.EqualFold(topics[i], v)
}
return match
case "address":
// Matching base on token contract that emit the event
return strings.EqualFold(event.Address.String(), x.Value[0])
}

// Unsupport type
err = fmt.Errorf("unsupport matcher type: %s", x.Type)
return false
}), err
}

if check.Program != "" {
// This is the advance trigger with js evaluation based on trigger data
triggerVarName := check.TaskMetadata.GetTrigger().GetName()
Expand Down Expand Up @@ -220,40 +272,6 @@ func (evt *EventTrigger) Evaluate(event *types.Log, check *Check) (bool, error)

}

var err error = nil
if len(check.Matcher) > 0 {
// This is the simpler trigger. It's essentially an anyof
return lo.SomeBy(check.Matcher, func(x *avsproto.EventCondition_Matcher) bool {
if len(x.Value) == 0 {
err = fmt.Errorf("matcher value is empty")
return false
}

switch x.Type {
case "topics":
// Matching based on topic of transaction
topics := lo.Map[common.Hash, string](event.Topics, func(topic common.Hash, _ int) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
})
match := true
for i, v := range x.Value {
match = match && (v == "" || strings.EqualFold(topics[i], v))
if !match {
return false
}
}
return match
case "address":
// Matching base on token contract that emit the event
return strings.EqualFold(event.Address.String(), x.Value[0])
}

// Unsupport type
err = fmt.Errorf("unsupport matcher type: %s", x.Type)
return false
}), err
}

err = fmt.Errorf("invalid event trigger check: both matcher or expression are missing or empty")
return false, err
}
5 changes: 5 additions & 0 deletions core/taskengine/trigger/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func TestTriggerTopicMatch(t *testing.T) {
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
TaskMetadata: &avsproto.SyncMessagesResp_TaskMetadata{
TaskId: "123",
},

Matcher: []*avsproto.EventCondition_Matcher{
&avsproto.EventCondition_Matcher{
Type: "topics",
Expand Down Expand Up @@ -84,6 +88,7 @@ func TestTriggerTopicMulti(t *testing.T) {
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{

Matcher: []*avsproto.EventCondition_Matcher{
&avsproto.EventCondition_Matcher{
Type: "topics",
Expand Down
155 changes: 155 additions & 0 deletions core/taskengine/trigger/time.go
Original file line number Diff line number Diff line change
@@ -1 +1,156 @@
package trigger

import (
"context"
"fmt"
"sync"
"time"

avsproto "github.com/AvaProtocol/ap-avs/protobuf"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/go-co-op/gocron/v2"
)

type TimeTrigger struct {
*CommonTrigger

scheduler gocron.Scheduler
jobs map[string]gocron.Job // map taskID to job for removal

// channel that we will push the trigger information back
triggerCh chan TriggerMetadata[uint64]
}

func NewTimeTrigger(triggerCh chan TriggerMetadata[uint64], logger sdklogging.Logger) *TimeTrigger {
scheduler, _ := gocron.NewScheduler(gocron.WithLocation(time.UTC))

if scheduler == nil {
panic("fail to create scheduler")
}

t := TimeTrigger{
CommonTrigger: &CommonTrigger{
done: make(chan bool),
shutdown: false,
logger: logger,
mu: sync.Mutex{},
},
scheduler: scheduler,
jobs: make(map[string]gocron.Job),
triggerCh: triggerCh,
}

return &t
}

func (t *TimeTrigger) epochToCron(epoch int64) string {
// Convert epoch to time
tm := time.Unix(epoch, 0)
// Create cron expression for specific time
return fmt.Sprintf("%d %d %d %d %d *", tm.Minute(), tm.Hour(), tm.Day(), tm.Month(), tm.Weekday())
}

func (t *TimeTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error {
t.mu.Lock()
defer t.mu.Unlock()

taskID := check.TaskId

// Function to be executed when trigger fires
triggerFunc := func() {
currentTime := time.Now().Unix()
t.logger.Info("time trigger fired", "task_id", taskID, "time", currentTime)
t.triggerCh <- TriggerMetadata[uint64]{
TaskID: taskID,
Marker: uint64(currentTime),
}
}

var job gocron.Job
var err error

if fixedTime := check.GetTrigger().GetFixedTime(); fixedTime != nil {
// Handle epoch-based scheduling
epochs := fixedTime.GetEpochs()
if len(epochs) == 0 {
return fmt.Errorf("no epochs provided")
}

// Schedule a job for each epoch
for _, epoch := range epochs {
// If epoch is in the past, skip scheduling
if epoch < time.Now().Unix() {
t.logger.Info("skipping past epoch", "task_id", taskID, "epoch", epoch)
continue
}

cronExpr := t.epochToCron(epoch)
job, err = t.scheduler.NewJob(
gocron.CronJob(cronExpr, false),
gocron.NewTask(triggerFunc),
// In FixedTime, we want to run the job only once
gocron.WithLimitedRuns(1),
)
if err != nil {
return fmt.Errorf("failed to schedule epoch job: %w", err)
}
t.jobs[taskID] = job
}
} else if cronTrigger := check.GetTrigger().GetCron(); cronTrigger != nil {
// Handle cron-based scheduling
schedules := cronTrigger.GetSchedule()
if len(schedules) == 0 {
return fmt.Errorf("no cron expressions provided")
}

// Schedule a job for each cron expression
for _, cronExpr := range schedules {
if cronExpr == "" {
continue
}

job, err = t.scheduler.NewJob(
gocron.CronJob(cronExpr, false),
gocron.NewTask(triggerFunc),
)
if err != nil {
return fmt.Errorf("failed to schedule cron job: %w", err)
}
t.jobs[taskID] = job
}
}

return nil
}

func (t *TimeTrigger) Remove(check *avsproto.SyncMessagesResp_TaskMetadata) error {
t.mu.Lock()
defer t.mu.Unlock()

taskID := check.TaskId
if job, exists := t.jobs[taskID]; exists {
t.scheduler.RemoveJob(job.ID())
delete(t.jobs, taskID)
}

return nil
}

func (t *TimeTrigger) Run(ctx context.Context) error {
t.scheduler.Start()

go func() {
for {
select {
case <-ctx.Done():
t.scheduler.Shutdown()
return
case <-t.done:
t.scheduler.Shutdown()
return
}
}
}()

return nil
}
14 changes: 14 additions & 0 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ func (v *VM) preprocessText(text string) string {
jsvm := goja.New()

for key, value := range v.vars {
if v.logger != nil {
v.logger.Debug("evaludate pre-processor bind var", "task_id", v.GetTaskId(), key, value)
}
jsvm.Set(key, value)
}

Expand Down Expand Up @@ -641,6 +644,9 @@ func (v *VM) preprocessText(text string) string {
script := fmt.Sprintf(`(() => { return %s; })()`, expr)

evaluated, err := jsvm.RunString(script)
if v.logger != nil {
v.logger.Debug("evaludate pre-processor script", "task_id", v.GetTaskId(), "script", script, "result", evaluated)
}
if err != nil {
// If there's an error, move past this opening bracket and continue
result = result[:start] + result[end+2:]
Expand Down Expand Up @@ -675,3 +681,11 @@ func (v *VM) CollectInputs() []string {

return inputs
}

func (v *VM) GetTaskId() string {
if v.task != nil && v.task.Task != nil {
return v.task.Id
}

return ""
}
1 change: 1 addition & 0 deletions core/taskengine/vm_runner_contract_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractRe
return s, err
}
result, err := parsedABI.Unpack(method.Name, output)

if err != nil {
s.Success = false
s.Error = fmt.Errorf("error decode result: %w", err).Error()
Expand Down
1 change: 1 addition & 0 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ type Operator struct {
scheduler gocron.Scheduler
eventTrigger *triggerengine.EventTrigger
blockTrigger *triggerengine.BlockTrigger
timeTrigger *triggerengine.TimeTrigger
}

func RunWithConfig(configPath string) {
Expand Down
Loading
Loading