From 1d7489ad97dd14818a851bd75674a3a85fe7da19 Mon Sep 17 00:00:00 2001 From: Vinh Date: Sun, 23 Feb 2025 04:24:37 -0800 Subject: [PATCH 1/2] reduce rpc query --- core/taskengine/macros/exp.go | 2 +- core/taskengine/trigger/block.go | 38 ++++++++++++++++++++++---------- core/taskengine/trigger/event.go | 38 +++++++++++++++++++++++++++----- operator/worker_loop.go | 4 ++-- version/version.go | 2 +- 5 files changed, 63 insertions(+), 21 deletions(-) diff --git a/core/taskengine/macros/exp.go b/core/taskengine/macros/exp.go index 01051a25..abd62ece 100644 --- a/core/taskengine/macros/exp.go +++ b/core/taskengine/macros/exp.go @@ -27,7 +27,7 @@ func SetRpc(rpcURL string) { if conn, err := ethclient.Dial(rpcURL); err == nil { rpcConn = conn } else { - panic(err) + panic(fmt.Errorf("panic connect to rpc url %s, error %w", rpcURL, err)) } } diff --git a/core/taskengine/trigger/block.go b/core/taskengine/trigger/block.go index 9d3fb5c3..501ab035 100644 --- a/core/taskengine/trigger/block.go +++ b/core/taskengine/trigger/block.go @@ -2,6 +2,7 @@ package trigger import ( "context" + "fmt" "sync" "math/big" @@ -28,10 +29,10 @@ type BlockTrigger struct { triggerCh chan TriggerMetadata[int64] } -func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64]) *BlockTrigger { +func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64], logger sdklogging.Logger) *BlockTrigger { var err error - logger, err := sdklogging.NewZapLogger(sdklogging.Production) + //logger, err := sdklogging.NewZapLogger(sdklogging.Production) b := BlockTrigger{ CommonTrigger: &CommonTrigger{ done: make(chan bool), @@ -88,26 +89,40 @@ func (b *BlockTrigger) Remove(check *avsproto.SyncMessagesResp_TaskMetadata) err } func (b *BlockTrigger) Run(ctx context.Context) error { - //func RegisterBlockListener(ctx context.Context, fn OnblockFunc) error { headers := make(chan *types.Header) sub, err := b.wsEthClient.SubscribeNewHead(ctx, headers) if err != nil { - return err + return fmt.Errorf("failed to subscribe to new headers: %w", err) } + b.logger.Info("subscribed for new blocks", "rpc", b.rpcOption.WsRpcURL) go func() { + //defer sub.Unsubscribe() for { select { case <-ctx.Done(): - err = nil + return case <-b.done: - err = nil + return case err := <-sub.Err(): - b.logger.Errorf("getting error when subscribe to websocket rpc. start reconnecting", "errror", err) - b.retryConnectToRpc() - b.wsEthClient.SubscribeNewHead(ctx, headers) + if err != nil { + b.logger.Error("error when subscribing to websocket RPC, retrying", + "rpc", b.rpcOption.WsRpcURL, + "error", err, + "component", "block") + if sub != nil { + sub.Unsubscribe() + } + + if b.wsEthClient != nil { + b.wsEthClient.Close() + } + + b.retryConnectToRpc() + sub, err = b.wsEthClient.SubscribeNewHead(ctx, headers) + } case header := <-headers: - b.logger.Debug("detect new block, evaluate checks", "component", "blocktrigger", "block", header.Hash().Hex(), "number", header.Number) + b.logger.Debug("detected new block, evaluating checks", "component", "blocktrigger", "block", header.Hash().Hex(), "number", header.Number) toRemove := []int{} for interval, tasks := range b.schedule { z := new(big.Int) @@ -119,8 +134,6 @@ func (b *BlockTrigger) Run(ctx context.Context) error { } } - // Remove the task from the queue - // toRemove = append(toRemove, interval) } } @@ -130,5 +143,6 @@ func (b *BlockTrigger) Run(ctx context.Context) error { } } }() + return err } diff --git a/core/taskengine/trigger/event.go b/core/taskengine/trigger/event.go index 321dd709..a75e5f0e 100644 --- a/core/taskengine/trigger/event.go +++ b/core/taskengine/trigger/event.go @@ -19,6 +19,18 @@ import ( avsproto "github.com/AvaProtocol/ap-avs/protobuf" ) +var ( + // To reduce api call we listen to these topics only + // a better idea is to only subscribe to what we need and re-load when new trigger is added + whitelistTopics = [][]common.Hash{ + []common.Hash{ + common.HexToHash("0x49628fd1471006c1482da88028e9ce4dbb080b815c9b0344d39e5a8e6ec1419f"), // UserOp + common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), // erc20 transfer + common.HexToHash("0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"), // approve + }, + } +) + type EventMark struct { BlockNumber uint64 LogIndex uint @@ -41,10 +53,9 @@ type EventTrigger struct { triggerCh chan TriggerMetadata[EventMark] } -func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark]) *EventTrigger { +func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark], logger sdklogging.Logger) *EventTrigger { var err error - logger, err := sdklogging.NewZapLogger(sdklogging.Production) b := EventTrigger{ CommonTrigger: &CommonTrigger{ done: make(chan bool), @@ -92,13 +103,19 @@ func (t *EventTrigger) RemoveCheck(id string) error { func (evtTrigger *EventTrigger) Run(ctx context.Context) error { logs := make(chan types.Log) - query := ethereum.FilterQuery{} - sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{}, logs) + query := ethereum.FilterQuery{ + Topics: whitelistTopics, + } + + sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{Topics: whitelistTopics}, logs) + evtTrigger.logger.Info("subscribing with filter", "topics", whitelistTopics) + if err != nil { return err } go func() { + defer sub.Unsubscribe() for { select { case <-ctx.Done(): @@ -106,7 +123,18 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error { case <-evtTrigger.done: err = nil case err := <-sub.Err(): - evtTrigger.logger.Errorf("getting error when subscribe to websocket rpc. start reconnecting", "errror", err) + if err == nil { + continue + } + evtTrigger.logger.Error("error when subscribing to websocket rpc, retrying", "rpc", evtTrigger.rpcOption.WsRpcURL, "error", err) + if sub != nil { + sub.Unsubscribe() + } + + if evtTrigger.wsEthClient != nil { + evtTrigger.wsEthClient.Close() + } + evtTrigger.retryConnectToRpc() sub, err = evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs) case event := <-logs: diff --git a/operator/worker_loop.go b/operator/worker_loop.go index 9076899e..5b41bd97 100644 --- a/operator/worker_loop.go +++ b/operator/worker_loop.go @@ -53,10 +53,10 @@ func (o *Operator) runWorkLoop(ctx context.Context) error { WsRpcURL: o.config.TargetChain.EthWsUrl, } blockTriggerCh := make(chan triggerengine.TriggerMetadata[int64], 1000) - o.blockTrigger = triggerengine.NewBlockTrigger(&rpcConfig, blockTriggerCh) + o.blockTrigger = triggerengine.NewBlockTrigger(&rpcConfig, blockTriggerCh, o.logger) eventTriggerCh := make(chan triggerengine.TriggerMetadata[triggerengine.EventMark], 1000) - o.eventTrigger = triggerengine.NewEventTrigger(&rpcConfig, eventTriggerCh) + o.eventTrigger = triggerengine.NewEventTrigger(&rpcConfig, eventTriggerCh, o.logger) o.blockTrigger.Run(ctx) o.eventTrigger.Run(ctx) diff --git a/version/version.go b/version/version.go index bd3eb183..22aeaac9 100644 --- a/version/version.go +++ b/version/version.go @@ -2,7 +2,7 @@ package version var ( // Version can also be set through tag release at build time - semver = "1.3.0" + semver = "1.4.0" ) // Get return the version. Note that we're injecting this at build time when we tag release From bd46fa11dea2105d1c2d00f749824d3365bdae18 Mon Sep 17 00:00:00 2001 From: Vinh Date: Sun, 23 Feb 2025 04:39:22 -0800 Subject: [PATCH 2/2] fix --- core/taskengine/trigger/event_test.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/core/taskengine/trigger/event_test.go b/core/taskengine/trigger/event_test.go index 5ec916c9..64137241 100644 --- a/core/taskengine/trigger/event_test.go +++ b/core/taskengine/trigger/event_test.go @@ -18,7 +18,8 @@ func TestTriggerTopicMatch(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ Matcher: []*avsproto.EventCondition_Matcher{ @@ -48,7 +49,8 @@ func TestTriggerTopicNotMatch(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ Matcher: []*avsproto.EventCondition_Matcher{ @@ -78,7 +80,8 @@ func TestTriggerTopicMulti(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ Matcher: []*avsproto.EventCondition_Matcher{ @@ -131,7 +134,8 @@ func TestTriggerAddress(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ Matcher: []*avsproto.EventCondition_Matcher{ @@ -170,7 +174,8 @@ func TestTriggerAddressNegativeCase(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ Matcher: []*avsproto.EventCondition_Matcher{ @@ -194,7 +199,8 @@ func TestTriggerNonTransferEvent(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ Matcher: []*avsproto.EventCondition_Matcher{ @@ -222,7 +228,8 @@ func TestTriggerExpression(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) taskMeta := &avsproto.SyncMessagesResp_TaskMetadata{ Trigger: &avsproto.TaskTrigger{ @@ -269,7 +276,8 @@ func TestTriggerWithContractReadBindingInExpression(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) taskMeta := &avsproto.SyncMessagesResp_TaskMetadata{ Trigger: &avsproto.TaskTrigger{ @@ -388,7 +396,8 @@ func TestTriggerEventExpressionWontCrashOnInvalidInput(t *testing.T) { eventTrigger := NewEventTrigger(&RpcOption{ RpcURL: testutil.GetTestRPCURL(), WsRpcURL: testutil.GetTestRPCURL(), - }, make(chan TriggerMetadata[EventMark], 1000)) + }, make(chan TriggerMetadata[EventMark], 1000), + testutil.GetLogger()) result, err := eventTrigger.Evaluate(event, &Check{ Program: tt.expression,