diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 9468b547..505614ba 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -107,7 +107,9 @@ func NewAggregator(c *config.Config) (*Aggregator, error) { avsWriter, err := chainio.BuildAvsWriterFromConfig(c) if err != nil { c.Logger.Errorf("Cannot create avsWriter", "err", err) - return nil, err + // TODO: Upgrade EigenSDK to use the new Slash Manager + // EigenLayer has update the contract and we cannot fetch the slasher anymore, we should upgrade the EigenSDK, right now we don't use it so it's ok to ignore this error + //return nil, err } go func() { @@ -122,8 +124,9 @@ func NewAggregator(c *config.Config) (*Aggregator, error) { clients, err := clients.BuildAll(chainioConfig, c.EcdsaPrivateKey, c.Logger) if err != nil { c.Logger.Errorf("Cannot create sdk clients", "err", err) - panic(err) - //return nil, err + // TODO: Upgrade EigenSDK to use the new Slash Manager + // EigenLayer has update the contract and we cannot fetch the slasher anymore, we should upgrade the EigenSDK, right now we don't use it so it's ok to ignore this error + //panic(err) } c.Logger.Info("create avsrrader and client", "avsReader", avsReader, "clients", clients) }() diff --git a/core/taskengine/engine.go b/core/taskengine/engine.go index 646ef461..8ec840ee 100644 --- a/core/taskengine/engine.go +++ b/core/taskengine/engine.go @@ -340,6 +340,10 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av continue } + if !n.CanStreamCheck(address) { + continue + } + for _, task := range n.tasks { if _, ok := n.trackSyncedTasks[address].TaskID[task.Id]; ok { continue @@ -563,7 +567,6 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask if payload.IsBlocking { // Run the task inline, by pass the queue system executor := NewExecutor(n.db, n.logger) - fmt.Println("metadata", payload.TriggerMetadata) execution, err := executor.RunTask(task, payload.TriggerMetadata) if err == nil { return &avsproto.UserTriggerTaskResp{ @@ -798,3 +801,8 @@ func (n *Engine) NewSeqID() (string, error) { } return strconv.FormatInt(int64(num), 10), nil } + +func (n *Engine) CanStreamCheck(address string) bool { + // Only enable for our own operator first, once it's stable we will roll out to all + return address == "0x997e5d40a32c44a3d93e59fc55c4fd20b7d2d49d" || address == "0xc6b87cc9e85b07365b6abefff061f237f7cf7dc3" +} diff --git a/core/taskengine/executor.go b/core/taskengine/executor.go index b482751a..9fd69d14 100644 --- a/core/taskengine/executor.go +++ b/core/taskengine/executor.go @@ -73,7 +73,11 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error { func (x *TaskExecutor) RunTask(task *model.Task, triggerMetadata *avsproto.TriggerMetadata) (*avsproto.Execution, error) { vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges) + if err != nil { + return nil, err + } + vm.WithLogger(x.logger) initialTaskStatus := task.Status if err != nil { diff --git a/core/taskengine/macros/exp.go b/core/taskengine/macros/exp.go index 55e32ea2..aacfbf28 100644 --- a/core/taskengine/macros/exp.go +++ b/core/taskengine/macros/exp.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" ethmath "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/ethclient" + "github.com/go-resty/resty/v2" "github.com/expr-lang/expr" "github.com/expr-lang/expr/vm" @@ -19,6 +20,9 @@ var ( rpcConn *ethclient.Client ) +type Builtin struct { +} + func SetRpc(rpcURL string) { if conn, err := ethclient.Dial(rpcURL); err == nil { rpcConn = conn @@ -125,8 +129,39 @@ func ToBigInt(val string) *big.Int { return b } +func (bi *Builtin) ToBigInt(val string) *big.Int { + return ToBigInt(val) +} + +func (bi *Builtin) ChainlinkLatestRoundData(tokenPair string) *big.Int { + return chainlinkLatestRoundData(tokenPair) +} +func (bi *Builtin) ChainlinkLatestAnswer(tokenPair string) *big.Int { + return chainlinkLatestAnswer(tokenPair) +} + +func (bi *Builtin) BigCmp(a *big.Int, b *big.Int) (r int) { + return BigCmp(a, b) +} + +func (bi *Builtin) BigGt(a *big.Int, b *big.Int) bool { + return BigGt(a, b) +} + +func (bi *Builtin) BigLt(a *big.Int, b *big.Int) bool { + return BigLt(a, b) +} + +func (bi *Builtin) ParseUnit(val string, decimal uint) *big.Int { + return ParseUnit(val, decimal) +} + var ( exprEnv = map[string]any{ + // bind and simular JS fetch api + "fetch": Fetch, + + // macro to do IO from JS "readContractData": readContractData, "priceChainlink": chainlinkLatestAnswer, @@ -141,6 +176,67 @@ var ( } ) +// FetchResponse mimics the JS fetch Response object +type FetchResponse struct { + Status int + StatusText string + Body string + Headers map[string][]string +} + +// FetchOptions allows specifying method, headers, and body +type FetchOptions struct { + Method string + Headers map[string]string + Body interface{} +} + +// Fetch mimics the JS fetch function using Resty +func Fetch(url string) *FetchResponse { + options := FetchOptions{} + + client := resty.New() + // Create request + request := client.R() + + // Set headers + if options.Headers != nil { + request.SetHeaders(options.Headers) + } + + // Set body + if options.Body != nil { + request.SetBody(options.Body) + } + + // Send request based on method + var resp *resty.Response + var err error + switch options.Method { + case "POST": + resp, err = request.Post(url) + case "PUT": + resp, err = request.Put(url) + case "DELETE": + resp, err = request.Delete(url) + default: + resp, err = request.Get(url) // Default to GET + } + + // Handle errors + if err != nil { + return nil + } + + // Build FetchResponse + return &FetchResponse{ + Status: resp.StatusCode(), + StatusText: resp.Status(), + Body: string(resp.Body()), + Headers: resp.Header(), + } +} + func GetEnvs(extra map[string]any) map[string]interface{} { envs := map[string]any{} diff --git a/core/taskengine/trigger/event.go b/core/taskengine/trigger/event.go index 5c46266d..3a116b3e 100644 --- a/core/taskengine/trigger/event.go +++ b/core/taskengine/trigger/event.go @@ -7,8 +7,7 @@ import ( "github.com/AvaProtocol/ap-avs/core/taskengine/macros" sdklogging "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/expr-lang/expr" - "github.com/expr-lang/expr/vm" + "github.com/dop251/goja" "github.com/ginkgoch/godash/v2" "github.com/ethereum/go-ethereum" @@ -26,7 +25,7 @@ type EventMark struct { } type Check struct { - Program *vm.Program + Program string TaskMetadata *avsproto.SyncMessagesResp_TaskMetadata } @@ -71,24 +70,7 @@ func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark]) *E // TODO: track remainExecution and expriedAt before merge func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error { - // Dummy value to get type - envs := macros.GetEnvs(map[string]interface{}{ - "trigger1": map[string]interface{}{ - "data": map[string]interface{}{ - "address": "dummy", - "topics": godash.Map([]common.Hash{}, func(topic common.Hash) string { - return "0x" - }), - "data": "0x", - "tx_hash": "dummy", - }, - }, - }) - program, err := expr.Compile(check.GetTrigger().GetEvent().GetExpression(), expr.Env(envs), expr.AsBool()) - if err != nil { - return err - } - + program := check.GetTrigger().GetEvent().GetExpression() t.checks.Store(check.TaskId, &Check{ Program: program, TaskMetadata: check, @@ -167,7 +149,8 @@ func (evt *EventTrigger) Run(ctx context.Context) error { return err } -func (evt *EventTrigger) Evaluate(event *types.Log, program *vm.Program) (bool, error) { +func (evt *EventTrigger) Evaluate(event *types.Log, program string) (bool, error) { + jsvm := goja.New() envs := macros.GetEnvs(map[string]interface{}{ "trigger1": map[string]interface{}{ "data": map[string]interface{}{ @@ -180,12 +163,15 @@ func (evt *EventTrigger) Evaluate(event *types.Log, program *vm.Program) (bool, }, }, }) + for k, v := range envs { + jsvm.Set(k, v) + } - result, err := expr.Run(program, envs) + result, err := jsvm.RunString(program) if err != nil { return false, err } - return result.(bool), err + return result.Export().(bool), err } diff --git a/core/taskengine/trigger/event_test.go b/core/taskengine/trigger/event_test.go index a9571541..14a7682c 100644 --- a/core/taskengine/trigger/event_test.go +++ b/core/taskengine/trigger/event_test.go @@ -5,10 +5,9 @@ import ( "github.com/AvaProtocol/ap-avs/core/taskengine/macros" "github.com/AvaProtocol/ap-avs/core/testutil" - "github.com/expr-lang/expr" ) -func TestChainlinkLatestAnswer(t *testing.T) { +func TestTriggerExpression(t *testing.T) { event, err := testutil.GetEventForTx("0x8f7c1f698f03d6d32c996b679ea1ebad45bbcdd9aa95d250dda74763cc0f508d", 82) if err != nil { @@ -20,37 +19,57 @@ func TestChainlinkLatestAnswer(t *testing.T) { WsRpcURL: testutil.GetTestRPCURL(), }, make(chan TriggerMetadata[EventMark], 1000)) - envs := macros.GetEnvs(map[string]interface{}{ - "trigger1": map[string]interface{}{ - "data": map[string]interface{}{ - "topics": []string{ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0xabcdef", - "0xc114fb059434563dc65ac8d57e7976e3eac534f4", - }, - }, - }, - }) - - program, err := expr.Compile(` - trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "0xc114fb059434563dc65ac8d57e7976e3eac534f4" - `, expr.Env(envs), expr.AsBool()) - - if err != nil { - panic(err) - } + program := `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "0xc114fb059434563dc65ac8d57e7976e3eac534f4"` result, err := eventTrigger.Evaluate(event, program) if !result { t.Errorf("expect expression to be match, but got false: error: %v", err) } - program, err = expr.Compile(` - (trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "abc") - `) + program = `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && trigger1.data.topics[2] == "abc"` + + result, err = eventTrigger.Evaluate(event, program) + if result { + t.Errorf("expect expression to be not match, but got match: error: %v", err) + } + event, err = testutil.GetEventForTx("0x8f7c1f698f03d6d32c996b679ea1ebad45bbcdd9aa95d250dda74763cc0f508d", 81) + program = `trigger1.data.address == "0x5FF137D4b0FDCD49DcA30c7CF57E578a026d2789" && trigger1.data.topics[0] == "0xbb47ee3e183a558b1a2ff0874b079f3fc5478b7454eacf2bfc5af2ff5878f972"` result, err = eventTrigger.Evaluate(event, program) if result { t.Errorf("expect expression to be not match, but got match: error: %v", err) } } + +func TestTriggerWithContractReadBindingInExpression(t *testing.T) { + // This event is transfering usdc + event, err := testutil.GetEventForTx("0x4bb728dfbe58d7c641c02a214cac6156a0d6a0fe648cb27a7de229a3160e91b1", 145) + + macros.SetRpc(testutil.GetTestRPCURL()) + eventTrigger := NewEventTrigger(&RpcOption{ + RpcURL: testutil.GetTestRPCURL(), + WsRpcURL: testutil.GetTestRPCURL(), + }, make(chan TriggerMetadata[EventMark], 1000)) + + // USDC pair from chainlink, usually USDC price is ~99cent but never approach $1 + // for an unknow reason the decimal is 8 instead of 6 + program := `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && bigGt(chainlinkPrice("0xA2F78ab2355fe2f984D808B5CeE7FD0A93D5270E"), toBigInt("1000000000"))` + + result, err := eventTrigger.Evaluate(event, program) + if err != nil { + t.Errorf("expected no error when evaluate program but got error: %s", err) + } + if result { + t.Errorf("expect expression to be false, but got true: error: %v", err) + } + + program = `trigger1.data.topics[0] == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" && bigGt(chainlinkPrice("0xA2F78ab2355fe2f984D808B5CeE7FD0A93D5270E"), toBigInt("95000000"))` + + result, err = eventTrigger.Evaluate(event, program) + if err != nil { + t.Errorf("expected no error when evaluate program but got error: %s", err) + } + if !result { + t.Errorf("expect expression to be false, but got true: error: %v", err) + } +} diff --git a/core/taskengine/vm.go b/core/taskengine/vm.go index 07068e91..ae59c448 100644 --- a/core/taskengine/vm.go +++ b/core/taskengine/vm.go @@ -3,10 +3,12 @@ package taskengine import ( "context" "fmt" + "regexp" "strings" "sync" "time" + sdklogging "github.com/Layr-Labs/eigensdk-go/logging" "github.com/dop251/goja" "github.com/ginkgoch/godash/v2" @@ -36,6 +38,19 @@ type Step struct { Next []string } +type CommonProcessor struct { + vm *VM +} + +func (c *CommonProcessor) SetVar(name string, data any) { + c.vm.AddVar(name, data) +} + +// Set the variable for step output so it can be refer and use in subsequent steps +func (c *CommonProcessor) SetOutputVarForStep(stepID string, data any) { + c.vm.AddVar(c.vm.GetNodeNameAsVar(stepID), data) +} + // The VM is the core component that load the node information and execute them, yield finaly result type VM struct { // Input raw task data @@ -55,16 +70,18 @@ type VM struct { plans map[string]*Step entrypoint string instructionCount int64 + + logger sdklogging.Logger } -func NewVM() (*VM, error) { +func NewVM() *VM { v := &VM{ Status: VMStateInitialize, mu: &sync.Mutex{}, instructionCount: 0, } - return v, nil + return v } func (v *VM) Reset() { @@ -75,6 +92,29 @@ func (v *VM) Reset() { v.instructionCount = 0 } +func (v *VM) WithLogger(logger sdklogging.Logger) *VM { + v.logger = logger + + return v +} + +func (v *VM) GetNodeNameAsVar(nodeID string) string { + // Replace invalid characters with _ + re := regexp.MustCompile(`[^a-zA-Z0-9_$]`) + name := v.TaskNodes[nodeID].Name + if name == "" { + name = nodeID + } + standardized := re.ReplaceAllString(v.TaskNodes[nodeID].Name, "_") + + // Ensure the first character is valid + if len(standardized) == 0 || !regexp.MustCompile(`^[a-zA-Z_$]`).MatchString(standardized[:1]) { + standardized = "_" + standardized + } + + return standardized +} + func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) { v := &VM{ Status: VMStateInitialize, @@ -92,59 +132,67 @@ func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nod v.vars = macros.GetEnvs(map[string]any{}) // popular trigger data for trigger variable - if triggerMetadata != nil && triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" { - // if it contains event, we need to fetch and pop - receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash)) - if err != nil { - return nil, err - } + if triggerMetadata != nil { + if triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" { + // if it contains event, we need to fetch and pop + receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash)) + if err != nil { + return nil, err + } - var event *types.Log - //event := receipt.Logs[triggerMetadata.LogIndex] + var event *types.Log + //event := receipt.Logs[triggerMetadata.LogIndex] - for _, l := range receipt.Logs { - if uint64(l.Index) == triggerMetadata.LogIndex { - event = l + for _, l := range receipt.Logs { + if uint64(l.Index) == triggerMetadata.LogIndex { + event = l + } } - } - if event == nil { - return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex) - } + if event == nil { + return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex) + } - tokenMetadata, err := GetMetadataForTransfer(event) - ef, err := erc20.NewErc20(event.Address, nil) + tokenMetadata, err := GetMetadataForTransfer(event) + ef, err := erc20.NewErc20(event.Address, nil) - blockHeader, err := GetBlock(event.BlockNumber) - if err != nil { - return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err) - } + blockHeader, err := GetBlock(event.BlockNumber) + if err != nil { + return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err) + } - parseTransfer, err := ef.ParseTransfer(*event) - formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String() - - v.vars["trigger1"] = map[string]interface{}{ - "data": map[string]interface{}{ - "topics": godash.Map(event.Topics, func(topic common.Hash) string { - return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0")) - }), - "data": "0x" + common.Bytes2Hex(event.Data), - - "token_name": tokenMetadata.Name, - "token_symbol": tokenMetadata.Symbol, - "token_decimals": tokenMetadata.Decimals, - "transaction_hash": event.TxHash, - "address": strings.ToLower(event.Address.Hex()), - "block_number": event.BlockNumber, - "block_timestamp": blockHeader.Time, - "from_address": parseTransfer.From.String(), - "to_address": parseTransfer.To.String(), - "value": parseTransfer.Value.String(), - "value_formatted": formattedValue, - "transaction_index": event.TxIndex, - }, + parseTransfer, err := ef.ParseTransfer(*event) + formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String() + + // TODO: Implement a decoder to help standarize common event + v.vars["trigger1"] = map[string]interface{}{ + "data": map[string]interface{}{ + "topics": godash.Map(event.Topics, func(topic common.Hash) string { + return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0")) + }), + "data": "0x" + common.Bytes2Hex(event.Data), + + "token_name": tokenMetadata.Name, + "token_symbol": tokenMetadata.Symbol, + "token_decimals": tokenMetadata.Decimals, + "transaction_hash": event.TxHash, + "address": strings.ToLower(event.Address.Hex()), + "block_number": event.BlockNumber, + "block_timestamp": blockHeader.Time, + "from_address": parseTransfer.From.String(), + "to_address": parseTransfer.To.String(), + "value": parseTransfer.Value.String(), + "value_formatted": formattedValue, + "transaction_index": event.TxIndex, + }, + } } + if triggerMetadata.Epoch > 0 { + v.vars["trigger1"] = map[string]any{ + "epoch": triggerMetadata.Epoch, + } + } } return v, nil @@ -243,32 +291,7 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err } if nodeValue := node.GetRestApi(); nodeValue != nil { - // TODO: refactor into function - p := NewRestProrcessor() - - // only evaluate string when there is string interpolation - if nodeValue.Body != "" && strings.Contains(nodeValue.Body, "$") { - nodeValue2 := &avsproto.RestAPINode{ - Url: macros.RenderString(nodeValue.Url, macroEnvs), - Headers: nodeValue.Headers, - Method: nodeValue.Method, - Body: strings.Clone(nodeValue.Body), - } - vm := goja.New() - // TODO: dynamically set var instead of hardcode the name - // client would need to send this over - vm.Set("trigger1", v.vars["trigger1"]) - - renderBody, err := vm.RunString(nodeValue.Body) - if err == nil { - nodeValue2.Body = renderBody.Export().(string) - } - executionLog, err = p.Execute(node.Id, nodeValue2) - } else { - executionLog, err = p.Execute(node.Id, nodeValue) - } - - v.ExecutionLogs = append(v.ExecutionLogs, executionLog) + executionLog, err = v.runRestApi(node.Id, nodeValue) } else if nodeValue := node.GetBranch(); nodeValue != nil { outcomeID := "" executionLog, outcomeID, err = v.runBranch(node.Id, nodeValue) @@ -289,11 +312,79 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err } } } + } else if nodeValue := node.GetGraphqlQuery(); nodeValue != nil { + executionLog, err = v.runGraphQL(node.Id, nodeValue) + } else if nodeValue := node.GetCustomCode(); nodeValue != nil { + executionLog, err = v.runCustomCode(node.Id, nodeValue) + } + + return executionLog, err +} + +func (v *VM) runRestApi(stepID string, nodeValue *avsproto.RestAPINode) (*avsproto.Execution_Step, error) { + p := NewRestProrcessor(v) + + var err error + executionLog := &avsproto.Execution_Step{ + NodeId: stepID, + } + + // only evaluate string when there is string interpolation + if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) { + nodeValue2 := &avsproto.RestAPINode{ + Url: macros.RenderString(nodeValue.Url, macroEnvs), + Headers: nodeValue.Headers, + Method: nodeValue.Method, + Body: strings.Clone(nodeValue.Body), + } + jsvm := goja.New() + + for key, value := range v.vars { + jsvm.Set(key, map[string]any{ + "data": value, + }) + } + + renderBody, err := jsvm.RunString(nodeValue.Body) + if err == nil { + nodeValue2.Body = renderBody.Export().(string) + } else { + v.logger.Error("error render string with goja", "error", err) + } + executionLog, err = p.Execute(stepID, nodeValue2) + } else { + executionLog, err = p.Execute(stepID, nodeValue) } + v.ExecutionLogs = append(v.ExecutionLogs, executionLog) return executionLog, err } +func (v *VM) runGraphQL(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) { + g, err := NewGraphqlQueryProcessor(v, node.Url) + if err != nil { + return nil, err + } + executionLog, _, err := g.Execute(stepID, node) + if err != nil { + v.logger.Error("error execute graphql node", "task_id", v.TaskID, "step", stepID, "url", node.Url, "error", err) + } + v.ExecutionLogs = append(v.ExecutionLogs, executionLog) + + return executionLog, nil +} + +func (v *VM) runCustomCode(stepID string, node *avsproto.CustomCodeNode) (*avsproto.Execution_Step, error) { + r := NewJSProcessor(v) + executionLog, err := r.Execute(stepID, node) + if err != nil { + v.logger.Error("error execute JavaScript code", "task_id", v.TaskID, "step", stepID, "error", err) + } + v.ExecutionLogs = append(v.ExecutionLogs, executionLog) + + return executionLog, nil +} + func (v *VM) runBranch(stepID string, node *avsproto.BranchNode) (*avsproto.Execution_Step, string, error) { t0 := time.Now() s := &avsproto.Execution_Step{ diff --git a/core/taskengine/vm_runner_contract_read.go b/core/taskengine/vm_runner_contract_read.go index 9b2f6916..0b2e5bce 100644 --- a/core/taskengine/vm_runner_contract_read.go +++ b/core/taskengine/vm_runner_contract_read.go @@ -16,12 +16,16 @@ import ( ) type ContractReadProcessor struct { + *CommonProcessor client *ethclient.Client } -func NewContractReadProcessor(client *ethclient.Client) *ContractReadProcessor { +func NewContractReadProcessor(vm *VM, client *ethclient.Client) *ContractReadProcessor { return &ContractReadProcessor{ client: client, + CommonProcessor: &CommonProcessor{ + vm: vm, + }, } } @@ -81,6 +85,7 @@ func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractRe s.Log = log.String() outputData, err := json.Marshal(result) s.OutputData = string(outputData) + r.SetOutputVarForStep(stepID, outputData) if err != nil { s.Success = false s.Error = err.Error() diff --git a/core/taskengine/vm_runner_contract_read_test.go b/core/taskengine/vm_runner_contract_read_test.go index 1a989cad..d57ce634 100644 --- a/core/taskengine/vm_runner_contract_read_test.go +++ b/core/taskengine/vm_runner_contract_read_test.go @@ -10,15 +10,35 @@ import ( ) func TestContractReadSimpleReturn(t *testing.T) { - n := NewContractReadProcessor(testutil.GetRpcClient()) - node := &avsproto.ContractReadNode{ ContractAddress: "0x1c7d4b196cb0c7b01d743fbc6116a902379c7238", CallData: "0x70a08231000000000000000000000000ce289bb9fb0a9591317981223cbe33d5dc42268d", ContractAbi: `[{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]`, Method: "balanceOf", } - step, err := n.Execute("check balance", node) + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123", + Name: "contractQuery", + TaskType: &avsproto.TaskNode_ContractRead{ + ContractRead: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123", + }, + } + + vm, err := NewVMWithData("123", nil, nodes, edges) + + n := NewContractReadProcessor(vm, testutil.GetRpcClient()) + + step, err := n.Execute("123", node) if err != nil { t.Errorf("expected contract read node run succesfull but got error: %v", err) @@ -41,15 +61,34 @@ func TestContractReadSimpleReturn(t *testing.T) { } func TestContractReadComplexReturn(t *testing.T) { - n := NewContractReadProcessor(testutil.GetRpcClient()) - node := &avsproto.ContractReadNode{ ContractAddress: "0xc59E3633BAAC79493d908e63626716e204A45EdF", CallData: "0x9a6fc8f500000000000000000000000000000000000000000000000100000000000052e7", ContractAbi: `[{"inputs":[{"internalType":"uint80","name":"_roundId","type":"uint80"}],"name":"getRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"}]`, Method: "getRoundData", } - step, err := n.Execute("latest round data", node) + + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "contractQuery", + TaskType: &avsproto.TaskNode_ContractRead{ + ContractRead: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n := NewContractReadProcessor(vm, testutil.GetRpcClient()) + step, err := n.Execute("123abc", node) if err != nil { t.Errorf("expected contract read node run succesfull but got error: %v", err) diff --git a/core/taskengine/vm_runner_customcode.go b/core/taskengine/vm_runner_customcode.go new file mode 100644 index 00000000..6f4ffa18 --- /dev/null +++ b/core/taskengine/vm_runner_customcode.go @@ -0,0 +1,89 @@ +package taskengine + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/dop251/goja" + + "github.com/AvaProtocol/ap-avs/core/taskengine/macros" + avsproto "github.com/AvaProtocol/ap-avs/protobuf" +) + +type JSProcessor struct { + *CommonProcessor + jsvm *goja.Runtime +} + +func NewJSProcessor(vm *VM) *JSProcessor { + r := JSProcessor{ + CommonProcessor: &CommonProcessor{ + vm: vm, + }, + jsvm: goja.New(), + } + + // These are built-in func + for key, value := range macros.GetEnvs(nil) { + r.jsvm.Set(key, value) + } + /// Binding the data from previous step into jsvm + for key, value := range vm.vars { + r.jsvm.Set(key, map[string]any{ + "data": value, + }) + } + + return &r +} + +func (r *JSProcessor) Execute(stepID string, node *avsproto.CustomCodeNode) (*avsproto.Execution_Step, error) { + t0 := time.Now().Unix() + + s := &avsproto.Execution_Step{ + NodeId: stepID, + Log: "", + OutputData: "", + Success: true, + Error: "", + StartAt: t0, + } + + var err error + defer func() { + s.EndAt = time.Now().Unix() + s.Success = err == nil + if err != nil { + s.Error = err.Error() + } + }() + + var log strings.Builder + + log.WriteString(fmt.Sprintf("Start execute user-input JS code at %s", time.Now())) + //result, err := r.jsvm.RunString("(function() {" + node.Source + "})()") + result, err := r.jsvm.RunString(node.Source) + log.WriteString(fmt.Sprintf("Complete Execute user-input JS code at %s", time.Now())) + if err != nil { + s.Success = false + s.Error = err.Error() + log.WriteString("\nerror running JavaScript code:") + log.WriteString(err.Error()) + } + s.Log = log.String() + + if result != nil { + resultValue := result.Export() + // TODO: capsize + if outputData, serilizeError := json.Marshal(resultValue); serilizeError == nil { + s.OutputData = string(outputData) + } else { + log.WriteString("cannot serilize output data to log") + } + r.SetOutputVarForStep(stepID, resultValue) + } + + return s, err +} diff --git a/core/taskengine/vm_runner_customcode_test.go b/core/taskengine/vm_runner_customcode_test.go new file mode 100644 index 00000000..a6b3e737 --- /dev/null +++ b/core/taskengine/vm_runner_customcode_test.go @@ -0,0 +1,125 @@ +package taskengine + +import ( + "strings" + "testing" + + avsproto "github.com/AvaProtocol/ap-avs/protobuf" +) + +func TestRunJavaScript(t *testing.T) { + node := &avsproto.CustomCodeNode{ + Source: "3>2", + } + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "customJs", + TaskType: &avsproto.TaskNode_CustomCode{ + CustomCode: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n := NewJSProcessor(vm) + + step, err := n.Execute("123abc", node) + + if err != nil { + t.Errorf("expected JavaScript node run succesfull but got error: %v", err) + } + + if !step.Success { + t.Errorf("expected JavaScript node run succesfully but failed") + } + + if !strings.Contains(step.Log, "Start execute user-input JS code at") { + t.Errorf("expected log contains trace data but found no") + } + + if step.Error != "" { + t.Errorf("expected log contains request trace data but found no") + } + + if step.OutputData != "true" { + t.Errorf("wrong result, expect true got %s", step.OutputData) + } + +} + +func TestRunJavaScriptComplex(t *testing.T) { + node := &avsproto.CustomCodeNode{ + Source: "const a=[1,2,3]; a.filter((i) => i >= 2);", + } + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "customJs", + TaskType: &avsproto.TaskNode_CustomCode{ + CustomCode: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, _ := NewVMWithData("123abc", nil, nodes, edges) + n := NewJSProcessor(vm) + + step, _ := n.Execute("123abc", node) + + if step.OutputData != "[2,3]" { + t.Errorf("wrong JS code evaluation result, expect [2,3] got %s", step.OutputData) + } +} + +// Temp disable until we figured out the event loop +// func TestRunJavaScriptHTTP(t *testing.T) { +// node := &avsproto.CustomCodeNode{ +// Source: ` +// toBigInt("1234442") +// `, +// } +// nodes := []*avsproto.TaskNode{ +// &avsproto.TaskNode{ +// Id: "123abc", +// Name: "customJs", +// TaskType: &avsproto.TaskNode_CustomCode{ +// CustomCode: node, +// }, +// }, +// } +// +// edges := []*avsproto.TaskEdge{ +// &avsproto.TaskEdge{ +// Id: "e1", +// Source: "__TRIGGER__", +// Target: "123abc", +// }, +// } +// +// vm, _ := NewVMWithData("123abc", nil, nodes, edges) +// n := NewJSProcessor(vm) +// +// step, err := n.Execute("123abc", node) +// fmt.Println("error", err, step.OutputData) +// +// if step.OutputData != "[2,3]" { +// t.Errorf("wrong JS code evaluation result, expect [2,3] got %s", step.OutputData) +// } +// } diff --git a/core/taskengine/vm_runner_graphql_query.go b/core/taskengine/vm_runner_graphql_query.go index e4a8b5dd..6e073abf 100644 --- a/core/taskengine/vm_runner_graphql_query.go +++ b/core/taskengine/vm_runner_graphql_query.go @@ -13,15 +13,16 @@ import ( ) type GraphqlQueryProcessor struct { + *CommonProcessor + client *graphql.Client sb *strings.Builder url *url.URL } -func NewGraphqlQueryProcessor(endpoint string) (*GraphqlQueryProcessor, error) { +func NewGraphqlQueryProcessor(vm *VM, endpoint string) (*GraphqlQueryProcessor, error) { sb := &strings.Builder{} log := func(s string) { - fmt.Println("LOGLOG", s) sb.WriteString(s) } @@ -39,10 +40,12 @@ func NewGraphqlQueryProcessor(endpoint string) (*GraphqlQueryProcessor, error) { client: client, sb: sb, url: u, + + CommonProcessor: &CommonProcessor{vm}, }, nil } -func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) { +func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, any, error) { ctx := context.Background() t0 := time.Now().Unix() step := &avsproto.Execution_Step{ @@ -68,11 +71,12 @@ func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQue query := graphql.NewRequest(node.Query) err = r.client.Run(ctx, query, &resp) if err != nil { - return step, err + return step, nil, err } step.Log = r.sb.String() data, err := json.Marshal(resp) step.OutputData = string(data) - return step, err + r.SetOutputVarForStep(stepID, resp) + return step, resp, err } diff --git a/core/taskengine/vm_runner_graphql_query_test.go b/core/taskengine/vm_runner_graphql_query_test.go index 57d979a4..cb87d9f4 100644 --- a/core/taskengine/vm_runner_graphql_query_test.go +++ b/core/taskengine/vm_runner_graphql_query_test.go @@ -22,9 +22,28 @@ func TestGraphlQlNodeSimpleQuery(t *testing.T) { }`, } - n, _ := NewGraphqlQueryProcessor(node.Url) + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "graphqlQuery", + TaskType: &avsproto.TaskNode_GraphqlQuery{ + GraphqlQuery: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n, _ := NewGraphqlQueryProcessor(vm, node.Url) - step, err := n.Execute("lido approval", node) + step, _, err := n.Execute("123abc", node) if err != nil { t.Errorf("expected rest node run succesfull but got error: %v", err) diff --git a/core/taskengine/vm_runner_rest.go b/core/taskengine/vm_runner_rest.go index 47c098af..5b873703 100644 --- a/core/taskengine/vm_runner_rest.go +++ b/core/taskengine/vm_runner_rest.go @@ -1,6 +1,7 @@ package taskengine import ( + "encoding/json" "fmt" "net/url" "strings" @@ -12,10 +13,11 @@ import ( ) type RestProcessor struct { + *CommonProcessor client *resty.Client } -func NewRestProrcessor() *RestProcessor { +func NewRestProrcessor(vm *VM) *RestProcessor { client := resty.New() // Unique settings at Client level @@ -28,6 +30,9 @@ func NewRestProrcessor() *RestProcessor { r := RestProcessor{ client: client, + CommonProcessor: &CommonProcessor{ + vm: vm, + }, } return &r @@ -83,6 +88,18 @@ func (r *RestProcessor) Execute(stepID string, node *avsproto.RestAPINode) (*avs s.Log = log.String() s.OutputData = string(resp.Body()) + // Attempt to detect json + if s.OutputData[0] == '{' || s.OutputData[0] == '[' { + var parseData map[string]any + if err := json.Unmarshal([]byte(s.OutputData), &parseData); err == nil { + r.SetOutputVarForStep(stepID, parseData) + } else { + r.SetOutputVarForStep(stepID, s.OutputData) + } + } else { + r.SetOutputVarForStep(stepID, s.OutputData) + } + if err != nil { s.Success = false s.Error = err.Error() diff --git a/core/taskengine/vm_runner_rest_test.go b/core/taskengine/vm_runner_rest_test.go index af4ff971..1b3f938e 100644 --- a/core/taskengine/vm_runner_rest_test.go +++ b/core/taskengine/vm_runner_rest_test.go @@ -8,8 +8,6 @@ import ( ) func TestRestRequest(t *testing.T) { - n := NewRestProrcessor() - node := &avsproto.RestAPINode{ Url: "https://httpbin.org/post", Headers: map[string]string{ @@ -18,7 +16,28 @@ func TestRestRequest(t *testing.T) { Body: "chat_id=123&disable_notification=true&text=%2AThis+is+a+test+format%2A", Method: "POST", } - step, err := n.Execute("foo123", node) + + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "restApi", + TaskType: &avsproto.TaskNode_RestApi{ + RestApi: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n := NewRestProrcessor(vm) + step, err := n.Execute("123abc", node) if err != nil { t.Errorf("expected rest node run succesfull but got error: %v", err) diff --git a/core/taskengine/vm_test.go b/core/taskengine/vm_test.go index 66aace63..cdaaf9e6 100644 --- a/core/taskengine/vm_test.go +++ b/core/taskengine/vm_test.go @@ -90,6 +90,11 @@ func TestRunSimpleTasks(t *testing.T) { if !strings.Contains(vm.ExecutionLogs[0].Log, "Execute") { t.Errorf("error generating log for executing. expect a log line displaying the request attempt, got nothing") } + + data := vm.vars["httpnode"].(map[string]any) + if data["data"].(string) != "a=123" { + t.Errorf("step result isn't store properly, expect 123 got %s", data["data"]) + } } func TestRunSequentialTasks(t *testing.T) { diff --git a/examples/example.js b/examples/example.js index 606f9950..1edda884 100644 --- a/examples/example.js +++ b/examples/example.js @@ -301,6 +301,9 @@ const main = async (cmd) => { case "schedule-monitor": scheduleMonitor(owner, token, process.argv[3]); break; + case "schedule-aave": + scheduleAaveMonitor(owner, token); + break; case "schedule": case "schedule-cron": case "schedule-event": @@ -384,7 +387,6 @@ const main = async (cmd) => { case "delete": await deleteTask(owner, token, process.argv[3]); break; - case "wallet": await getWallets(owner, token); break; @@ -413,6 +415,7 @@ const main = async (cmd) => { schedule-cron : to schedule a task that run on cron schedule-event : to schedule a task that run on occurenct of an event schedule-generic: to schedule a task with an arbitrary contract query + schedule-aave: monitor and report aavee liquidity rate every block monitor-address : to monitor erc20 in/out for an address trigger : manually trigger a task. Example: trigger abcdef '{"block_number":1234}' for blog trigger @@ -674,6 +677,116 @@ async function scheduleMonitor(owner, token, target) { return result; } +// setup a task to monitor in/out transfer for a wallet and send notification +async function scheduleAaveMonitor(owner, token) { + const wallets = await getWallets(owner, token); + const smartWalletAddress = wallets[0].address; + + const metadata = new grpc.Metadata(); + metadata.add("authkey", token); + + let trigger = { + name: "trigger1", + block: { + interval: 1, + }, + }; + + const getReserveId = UlidMonotonic.generate().toCanonical(); + const sendSummaryId = UlidMonotonic.generate().toCanonical(); + const getIpId = UlidMonotonic.generate().toCanonical(); + + const result = await asyncRPC( + client, + "CreateTask", + { + smart_wallet_address: smartWalletAddress, + nodes: [ + { + id: getReserveId, + name: 'getReserveUSDC', + graphql_query: { + url: 'https://gateway.thegraph.com/api/10186dcf11921c7d1bc140721c69da38/subgraphs/id/Cd2gEDVeqnjBn1hSeqFMitw8Q1iiyV9FYUZkLNRcL87g', + query: ` + { + reserves(where: {underlyingAsset: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"}) { + id + underlyingAsset + name + decimals + liquidityRate + aToken { + id + } + sToken { + id + } + } + } + ` + } + }, + { + id: getIpId, + name: 'getIpAddress', + rest_api: { + url: 'https://ipinfo.io/json', + } + }, + + { + id: sendSummaryId, + name: 'notification', + rest_api: { + url: "https://api.telegram.org/bot{{notify_bot_token}}/sendMessage?", + //url: `https://webhook.site/ca416047-5ba0-4485-8f98-76790b63add7`, + method: "POST", + body: `JSON.stringify({ + chat_id:-4609037622, + text: \`Node IP is: \${getIpAddress.data.ip}.\nCurrent USDC liquidity rate in RAY unit is \${getReserveUSDC.data.reserves[0].liquidityRate} \` + })`, + headers: { + "content-type": "application/json" + } + } + }, + ], + + edges: [ + { + id: UlidMonotonic.generate().toCanonical(), + // __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint + source: "__TRIGGER__", + target: getIpId, + }, + { + id: UlidMonotonic.generate().toCanonical(), + // __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint + source: getIpId, + target: getReserveId, + }, + { + id: UlidMonotonic.generate().toCanonical(), + // __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint + source: getReserveId, + target: sendSummaryId, + }, + ], + + trigger, + start_at: Math.floor(Date.now() / 1000) + 30, + expired_at: Math.floor(Date.now() / 1000 + 3600 * 24 * 30), + memo: `Montoring USDC aavee on ethereum`, + }, + metadata + ); + + console.log("create task", result); + + return result; +} + + (async () => { try { main(process.argv[2]); diff --git a/examples/static_codegen/avs_pb.js b/examples/static_codegen/avs_pb.js index a24668a7..f8f8ac66 100644 --- a/examples/static_codegen/avs_pb.js +++ b/examples/static_codegen/avs_pb.js @@ -7172,7 +7172,7 @@ proto.aggregator.ListWalletReq.prototype.toObject = function(opt_includeInstance */ proto.aggregator.ListWalletReq.toObject = function(includeInstance, msg) { var f, obj = { - factory: jspb.Message.getFieldWithDefault(msg, 1, ""), + factoryAddress: jspb.Message.getFieldWithDefault(msg, 1, ""), salt: jspb.Message.getFieldWithDefault(msg, 2, "") }; @@ -7212,7 +7212,7 @@ proto.aggregator.ListWalletReq.deserializeBinaryFromReader = function(msg, reade switch (field) { case 1: var value = /** @type {string} */ (reader.readString()); - msg.setFactory(value); + msg.setFactoryAddress(value); break; case 2: var value = /** @type {string} */ (reader.readString()); @@ -7247,7 +7247,7 @@ proto.aggregator.ListWalletReq.prototype.serializeBinary = function() { */ proto.aggregator.ListWalletReq.serializeBinaryToWriter = function(message, writer) { var f = undefined; - f = message.getFactory(); + f = message.getFactoryAddress(); if (f.length > 0) { writer.writeString( 1, @@ -7265,10 +7265,10 @@ proto.aggregator.ListWalletReq.serializeBinaryToWriter = function(message, write /** - * optional string factory = 1; + * optional string factory_address = 1; * @return {string} */ -proto.aggregator.ListWalletReq.prototype.getFactory = function() { +proto.aggregator.ListWalletReq.prototype.getFactoryAddress = function() { return /** @type {string} */ (jspb.Message.getFieldWithDefault(this, 1, "")); }; @@ -7277,7 +7277,7 @@ proto.aggregator.ListWalletReq.prototype.getFactory = function() { * @param {string} value * @return {!proto.aggregator.ListWalletReq} returns this */ -proto.aggregator.ListWalletReq.prototype.setFactory = function(value) { +proto.aggregator.ListWalletReq.prototype.setFactoryAddress = function(value) { return jspb.Message.setProto3StringField(this, 1, value); }; diff --git a/operator/operator.go b/operator/operator.go index aa8721e7..eaefba39 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -283,7 +283,7 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) { } sdkClients, err := clients.BuildAll(chainioConfig, operatorEcdsaPrivateKey, logger) if err != nil { - panic(err) + //panic(err) } skWallet, err := wallet.NewPrivateKeyWallet(ethRpcClient, signerV2, signerAddress, logger) if err != nil { @@ -297,7 +297,9 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) { ) if err != nil { logger.Error("Cannot create AvsWriter", "err", err) - return nil, err + // TODO: Upgrade EigenSDK to use the new Slash Manager + // EigenLayer has update the contract and we cannot fetch the slasher anymore, we should upgrade the EigenSDK, right now we don't use it so it's ok to ignore this error + //return nil, err } avsReader, err := chainio.BuildAvsReader( @@ -306,7 +308,9 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) { ethRpcClient, logger) if err != nil { logger.Error("Cannot create AvsReader", "err", err) - return nil, err + // TODO: Upgrade EigenSDK to use the new Slash Manager + // EigenLayer has update the contract and we cannot fetch the slasher anymore, we should upgrade the EigenSDK, right now we don't use it so it's ok to ignore this error + //return nil, err } // avsSubscriber, err := chainio.BuildAvsSubscriber(common.HexToAddress(c.AVSRegistryCoordinatorAddress), // common.HexToAddress(c.OperatorStateRetrieverAddress), ethWsClient, logger,