Skip to content

reduce rpc query #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/taskengine/macros/exp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func SetRpc(rpcURL string) {
if conn, err := ethclient.Dial(rpcURL); err == nil {
rpcConn = conn
} else {
panic(err)
panic(fmt.Errorf("panic connect to rpc url %s, error %w", rpcURL, err))
}
}

Expand Down
38 changes: 26 additions & 12 deletions core/taskengine/trigger/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package trigger

import (
"context"
"fmt"
"sync"

"math/big"
Expand All @@ -28,10 +29,10 @@ type BlockTrigger struct {
triggerCh chan TriggerMetadata[int64]
}

func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64]) *BlockTrigger {
func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64], logger sdklogging.Logger) *BlockTrigger {
var err error

logger, err := sdklogging.NewZapLogger(sdklogging.Production)
//logger, err := sdklogging.NewZapLogger(sdklogging.Production)
b := BlockTrigger{
CommonTrigger: &CommonTrigger{
done: make(chan bool),
Expand Down Expand Up @@ -88,26 +89,40 @@ func (b *BlockTrigger) Remove(check *avsproto.SyncMessagesResp_TaskMetadata) err
}

func (b *BlockTrigger) Run(ctx context.Context) error {
//func RegisterBlockListener(ctx context.Context, fn OnblockFunc) error {
headers := make(chan *types.Header)
sub, err := b.wsEthClient.SubscribeNewHead(ctx, headers)
if err != nil {
return err
return fmt.Errorf("failed to subscribe to new headers: %w", err)
}
b.logger.Info("subscribed for new blocks", "rpc", b.rpcOption.WsRpcURL)

go func() {
//defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
err = nil
return
case <-b.done:
err = nil
return
case err := <-sub.Err():
b.logger.Errorf("getting error when subscribe to websocket rpc. start reconnecting", "errror", err)
b.retryConnectToRpc()
b.wsEthClient.SubscribeNewHead(ctx, headers)
if err != nil {
b.logger.Error("error when subscribing to websocket RPC, retrying",
"rpc", b.rpcOption.WsRpcURL,
"error", err,
"component", "block")
if sub != nil {
sub.Unsubscribe()
}

if b.wsEthClient != nil {
b.wsEthClient.Close()
}

b.retryConnectToRpc()
sub, err = b.wsEthClient.SubscribeNewHead(ctx, headers)
}
case header := <-headers:
b.logger.Debug("detect new block, evaluate checks", "component", "blocktrigger", "block", header.Hash().Hex(), "number", header.Number)
b.logger.Debug("detected new block, evaluating checks", "component", "blocktrigger", "block", header.Hash().Hex(), "number", header.Number)
toRemove := []int{}
for interval, tasks := range b.schedule {
z := new(big.Int)
Expand All @@ -119,8 +134,6 @@ func (b *BlockTrigger) Run(ctx context.Context) error {
}

}
// Remove the task from the queue
// toRemove = append(toRemove, interval)
}
}

Expand All @@ -130,5 +143,6 @@ func (b *BlockTrigger) Run(ctx context.Context) error {
}
}
}()

return err
}
38 changes: 33 additions & 5 deletions core/taskengine/trigger/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ import (
avsproto "github.com/AvaProtocol/ap-avs/protobuf"
)

var (
// To reduce api call we listen to these topics only
// a better idea is to only subscribe to what we need and re-load when new trigger is added
whitelistTopics = [][]common.Hash{
[]common.Hash{
common.HexToHash("0x49628fd1471006c1482da88028e9ce4dbb080b815c9b0344d39e5a8e6ec1419f"), // UserOp
common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), // erc20 transfer
common.HexToHash("0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925"), // approve
},
}
)

type EventMark struct {
BlockNumber uint64
LogIndex uint
Expand All @@ -41,10 +53,9 @@ type EventTrigger struct {
triggerCh chan TriggerMetadata[EventMark]
}

func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark]) *EventTrigger {
func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark], logger sdklogging.Logger) *EventTrigger {
var err error

logger, err := sdklogging.NewZapLogger(sdklogging.Production)
b := EventTrigger{
CommonTrigger: &CommonTrigger{
done: make(chan bool),
Expand Down Expand Up @@ -92,21 +103,38 @@ func (t *EventTrigger) RemoveCheck(id string) error {

func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
logs := make(chan types.Log)
query := ethereum.FilterQuery{}
sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{}, logs)
query := ethereum.FilterQuery{
Topics: whitelistTopics,
}

sub, err := evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), ethereum.FilterQuery{Topics: whitelistTopics}, logs)
evtTrigger.logger.Info("subscribing with filter", "topics", whitelistTopics)

if err != nil {
return err
}

go func() {
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
err = nil
case <-evtTrigger.done:
err = nil
case err := <-sub.Err():
evtTrigger.logger.Errorf("getting error when subscribe to websocket rpc. start reconnecting", "errror", err)
if err == nil {
continue
}
evtTrigger.logger.Error("error when subscribing to websocket rpc, retrying", "rpc", evtTrigger.rpcOption.WsRpcURL, "error", err)
if sub != nil {
sub.Unsubscribe()
}

if evtTrigger.wsEthClient != nil {
evtTrigger.wsEthClient.Close()
}

evtTrigger.retryConnectToRpc()
sub, err = evtTrigger.wsEthClient.SubscribeFilterLogs(context.Background(), query, logs)
case event := <-logs:
Expand Down
27 changes: 18 additions & 9 deletions core/taskengine/trigger/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func TestTriggerTopicMatch(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
Matcher: []*avsproto.EventCondition_Matcher{
Expand Down Expand Up @@ -48,7 +49,8 @@ func TestTriggerTopicNotMatch(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
Matcher: []*avsproto.EventCondition_Matcher{
Expand Down Expand Up @@ -78,7 +80,8 @@ func TestTriggerTopicMulti(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
Matcher: []*avsproto.EventCondition_Matcher{
Expand Down Expand Up @@ -131,7 +134,8 @@ func TestTriggerAddress(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
Matcher: []*avsproto.EventCondition_Matcher{
Expand Down Expand Up @@ -170,7 +174,8 @@ func TestTriggerAddressNegativeCase(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
Matcher: []*avsproto.EventCondition_Matcher{
Expand All @@ -194,7 +199,8 @@ func TestTriggerNonTransferEvent(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
Matcher: []*avsproto.EventCondition_Matcher{
Expand Down Expand Up @@ -222,7 +228,8 @@ func TestTriggerExpression(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

taskMeta := &avsproto.SyncMessagesResp_TaskMetadata{
Trigger: &avsproto.TaskTrigger{
Expand Down Expand Up @@ -269,7 +276,8 @@ func TestTriggerWithContractReadBindingInExpression(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

taskMeta := &avsproto.SyncMessagesResp_TaskMetadata{
Trigger: &avsproto.TaskTrigger{
Expand Down Expand Up @@ -388,7 +396,8 @@ func TestTriggerEventExpressionWontCrashOnInvalidInput(t *testing.T) {
eventTrigger := NewEventTrigger(&RpcOption{
RpcURL: testutil.GetTestRPCURL(),
WsRpcURL: testutil.GetTestRPCURL(),
}, make(chan TriggerMetadata[EventMark], 1000))
}, make(chan TriggerMetadata[EventMark], 1000),
testutil.GetLogger())

result, err := eventTrigger.Evaluate(event, &Check{
Program: tt.expression,
Expand Down
4 changes: 2 additions & 2 deletions operator/worker_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func (o *Operator) runWorkLoop(ctx context.Context) error {
WsRpcURL: o.config.TargetChain.EthWsUrl,
}
blockTriggerCh := make(chan triggerengine.TriggerMetadata[int64], 1000)
o.blockTrigger = triggerengine.NewBlockTrigger(&rpcConfig, blockTriggerCh)
o.blockTrigger = triggerengine.NewBlockTrigger(&rpcConfig, blockTriggerCh, o.logger)

eventTriggerCh := make(chan triggerengine.TriggerMetadata[triggerengine.EventMark], 1000)
o.eventTrigger = triggerengine.NewEventTrigger(&rpcConfig, eventTriggerCh)
o.eventTrigger = triggerengine.NewEventTrigger(&rpcConfig, eventTriggerCh, o.logger)

o.blockTrigger.Run(ctx)
o.eventTrigger.Run(ctx)
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package version

var (
// Version can also be set through tag release at build time
semver = "1.3.0"
semver = "1.4.0"
)

// Get return the version. Note that we're injecting this at build time when we tag release
Expand Down
Loading