Skip to content

Update trigger with topics #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av
}

if !n.CanStreamCheck(address) {
// This isn't a consensus approval. It's a feature flag we control server side whether to stream data to the operator or not.
// TODO: Remove this flag when we measure performance impact on all operator
n.logger.Info("operator has not been approved to process task", address)
continue
}

Expand Down Expand Up @@ -863,5 +866,5 @@ func (n *Engine) NewSeqID() (string, error) {

func (n *Engine) CanStreamCheck(address string) bool {
// Only enable for our own operator first, once it's stable we will roll out to all
return address == "0x997e5d40a32c44a3d93e59fc55c4fd20b7d2d49d" || address == "0xc6b87cc9e85b07365b6abefff061f237f7cf7dc3"
return strings.EqualFold(address, "0x997e5d40a32c44a3d93e59fc55c4fd20b7d2d49d") || strings.EqualFold(address, "0xc6b87cc9e85b07365b6abefff061f237f7cf7dc3")
}
2 changes: 2 additions & 0 deletions core/taskengine/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ const (

InvalidCursor = "cursor is not valid"
InvalidPaginationParam = "item per page is not valid"

InvalidEntrypoint = "cannot detect task entrypoint from trigger and edge data"
)
2 changes: 1 addition & 1 deletion core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)
}
triggerMetadata := queueData.TriggerMetadata

vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges)
vm, err := NewVMWithData(task.Id, task.Trigger, triggerMetadata, task.Nodes, task.Edges)
if err != nil {
return nil, err
}
Expand Down
119 changes: 83 additions & 36 deletions core/taskengine/trigger/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package trigger

import (
"context"
"fmt"
"strings"
"sync"

"github.com/AvaProtocol/ap-avs/core/taskengine/macros"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/dop251/goja"
"github.com/ginkgoch/godash/v2"
"github.com/samber/lo"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -25,8 +26,10 @@ type EventMark struct {
}

type Check struct {
Program string
TaskMetadata *avsproto.SyncMessagesResp_TaskMetadata

Program string
Matcher []*avsproto.EventCondition_Matcher
}

type EventTrigger struct {
Expand Down Expand Up @@ -70,9 +73,11 @@ func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark]) *E

// TODO: track remainExecution and expriedAt before merge
func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error {
program := check.GetTrigger().GetEvent().GetExpression()
evt := check.GetTrigger().GetEvent()

t.checks.Store(check.TaskId, &Check{
Program: program,
Program: evt.GetExpression(),
Matcher: evt.GetMatcher(),
TaskMetadata: check,
})

Expand All @@ -85,10 +90,10 @@ func (t *EventTrigger) RemoveCheck(id string) error {
return nil
}

func (evt *EventTrigger) Run(ctx context.Context) error {
func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
logs := make(chan types.Log)
query := ethereum.FilterQuery{}
sub, err := evt.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{}, logs)
sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{}, logs)
if err != nil {
return err
}
Expand All @@ -98,26 +103,26 @@ func (evt *EventTrigger) Run(ctx context.Context) error {
select {
case <-ctx.Done():
err = nil
case <-evt.done:
case <-evtTrigger.done:
err = nil
case err := <-sub.Err():
evt.logger.Errorf("getting error when subscribe to websocket rpc. start reconnecting", "errror", err)
evt.retryConnectToRpc()
sub, err = evt.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs)
evtTrigger.logger.Errorf("getting error when subscribe to websocket rpc. start reconnecting", "errror", err)
evtTrigger.retryConnectToRpc()
sub, err = evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs)
case event := <-logs:
evt.logger.Debug("detect new event, evaluate checks", "event", event.Topics[0], "contract", event.Address)
evtTrigger.logger.Debug("detect new event, evaluate checks", "event", event.Topics[0], "contract", event.Address)
// TODO: implement hint to avoid scan all checks
toRemove := []string{}

evt.checks.Range(func(key any, value any) bool {
if evt.shutdown {
evtTrigger.checks.Range(func(key any, value any) bool {
if evtTrigger.shutdown {
return false
}

check := value.(*Check)
if hit, err := evt.Evaluate(&event, check.Program); err == nil && hit {
evt.logger.Info("check hit, notify aggregator", "task_id", key)
evt.triggerCh <- TriggerMetadata[EventMark]{
if hit, err := evtTrigger.Evaluate(&event, check); err == nil && hit {
evtTrigger.logger.Info("check hit, notify aggregator", "task_id", key)
evtTrigger.triggerCh <- TriggerMetadata[EventMark]{
TaskID: key.(string),
Marker: EventMark{
BlockNumber: event.BlockNumber,
Expand All @@ -134,12 +139,13 @@ func (evt *EventTrigger) Run(ctx context.Context) error {
// }
}

// We do want to continue other check no matter what outcome of previous one
return true
})

if len(toRemove) > 0 {
for _, v := range toRemove {
evt.checks.Delete(v)
evtTrigger.checks.Delete(v)
}
}
}
Expand All @@ -149,29 +155,70 @@ func (evt *EventTrigger) Run(ctx context.Context) error {
return err
}

func (evt *EventTrigger) Evaluate(event *types.Log, program string) (bool, error) {
jsvm := goja.New()
envs := macros.GetEnvs(map[string]interface{}{
"trigger1": map[string]interface{}{
"data": map[string]interface{}{
"address": strings.ToLower(event.Address.Hex()),
"topics": godash.Map(event.Topics, func(topic common.Hash) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
}),
"data": "0x" + common.Bytes2Hex(event.Data),
"tx_hash": event.TxHash,
func (evt *EventTrigger) Evaluate(event *types.Log, check *Check) (bool, error) {
if check.Program != "" {
// This is the advance trigger with js evaluation based on trigger data
jsvm := goja.New()
envs := macros.GetEnvs(map[string]interface{}{
"trigger1": map[string]interface{}{
"data": map[string]interface{}{
"address": strings.ToLower(event.Address.Hex()),
"topics": lo.Map[common.Hash, string](event.Topics, func(topic common.Hash, _ int) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
}),
"data": "0x" + common.Bytes2Hex(event.Data),
"tx_hash": event.TxHash,
},
},
},
})
for k, v := range envs {
jsvm.Set(k, v)
})
for k, v := range envs {
jsvm.Set(k, v)
}

result, err := jsvm.RunString(check.Program)

if err != nil {
return false, err
}

return result.Export().(bool), err

}

result, err := jsvm.RunString(program)
var err error = nil
if len(check.Matcher) > 0 {
// This is the simpler trigger. It's essentially an anyof
return lo.SomeBy(check.Matcher, func(x *avsproto.EventCondition_Matcher) bool {
if len(x.Value) == 0 {
err = fmt.Errorf("matcher value is empty")
return false
}

if err != nil {
return false, err
switch x.Type {
case "topics":
// Matching based on topic of transaction
topics := lo.Map[common.Hash, string](event.Topics, func(topic common.Hash, _ int) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
})
match := true
for i, v := range x.Value {
match = match && (v == "" || strings.EqualFold(topics[i], v))
if !match {
return false
}
}
return match
case "address":
// Matching base on token contract that emit the event
return strings.EqualFold(event.Address.String(), x.Value[0])
}

// Unsupport type
err = fmt.Errorf("unsupport matcher type: %s", x.Type)
return false
}), err
}

return result.Export().(bool), err
err = fmt.Errorf("invalid check data: both matcher or trigger is missing")
return false, err
}
Loading
Loading