Skip to content

report block and processed event to telemetry #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ jobs:
with:
build-args: |
RELEASE_TAG=${{ inputs.tag || github.sha || github.head_ref || github.ref_name }}
COMMIT_SHA=${{ github.sha }}
platforms: linux/amd64,linux/arm64
context: .
file: dockerfiles/operator.Dockerfile
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ dev-live:
## dev-build: build a dev version for local development
dev-build:
mkdir out || true
go build -o ./out/ap
go build \
-o ./out/ap \
-ldflags "-X github.com/AvaProtocol/ap-avs/version.revision=$(shell git rev-parse HEAD)"

## dev-agg: run aggregator locally with dev build
dev-agg:
Expand Down
11 changes: 10 additions & 1 deletion aggregator/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"net/http"

"github.com/AvaProtocol/ap-avs/version"
"github.com/labstack/echo/v4"
)

Expand Down Expand Up @@ -45,7 +46,15 @@ func (agg *Aggregator) startHttpServer(ctx context.Context) {
return err
}

data := agg.operatorPool.GetAll()
data := struct {
Version string
Revision string
Nodes []*OperatorNode
}{
Version: version.Get(),
Revision: version.Commit(),
Nodes: agg.operatorPool.GetAll(),
}
var buf bytes.Buffer
if err := tpl.Execute(&buf, data); err != nil {
agg.logger.Errorf("error rendering telemetry %v", err)
Expand Down
4 changes: 4 additions & 0 deletions aggregator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type OperatorNode struct {
LastPingEpoch int64 `json:"last_ping"`
Version string `json:"version"`
MetricsPort int32 `json:"metrics_port"`
BlockNumer int64 `json:"block_number"`
EventCount int64 `json:"event_count"`
}

func (o *OperatorNode) LastSeen() string {
Expand Down Expand Up @@ -66,6 +68,8 @@ func (o *OperatorPool) Checkin(payload *avsproto.Checkin) error {
MetricsPort: payload.MetricsPort,
RemoteIP: payload.RemoteIP,
Version: payload.Version,
BlockNumer: payload.BlockNumber,
EventCount: payload.EventCount,
}

data, err := json.Marshal(status)
Expand Down
14 changes: 11 additions & 3 deletions aggregator/resources/index.gohtml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<div class="bg-black">
<div class="mx-auto max-w-7xl py-24 sm:px-6 sm:py-32 lg:px-8">
<ul role="list" class="divide-y divide-gray-800">
{{ range . }}
{{ range .Nodes }}
<li class="flex justify-between gap-x-6 py-5">
<div class="flex min-w-0 gap-x-4">
{{/* TODO: Parse metadata to get image url <img class="h-12 w-12 flex-none rounded-full bg-gray-50"
Expand Down Expand Up @@ -46,12 +46,20 @@

<div class="hidden shrink-0 sm:flex sm:flex-col sm:items-end">
<p class="text-sm leading-6 text-white">Active</p>
<p class="mt-1 text-xs leading-5 text-gray-400">Last seen <time
datetime="2023-01-23T13:23Z">{{ .LastSeen }}</time></p>
<p class="mt-1 text-xs leading-5 text-gray-400">Last seen <time datetime="2023-01-23T13:23Z">{{ .LastSeen }}</time></p>
<p class="mt-1 text-xs leading-5 text-gray-400">Block {{ .BlockNumer }}</p>
<p class="mt-1 text-xs leading-5 text-gray-400">Event {{ .EventCount }}</p>
</div>
{{ end }}
</li>
</ul>
</div>
<!-- Center text for version and revision -->
<div class="text-center mb-8">
<p class="text-white text-sm">
Aggregator v{{.Version}}
<a href="https://github.com/AvaProtocol/EigenLayer-AVS/commit/{{.Revision}}" class="underline" target="_blank">+{{.Revision}}</a>
</p>
</div>
</div>
</body>
5 changes: 4 additions & 1 deletion core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,16 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask

task, err := n.GetTaskByID(payload.TaskId)
if err != nil {
n.logger.Error("task not found", "user", user.Address, "task_id", payload.TaskId)
return nil, err
}

if !task.IsRunable() {
return nil, grpcstatus.Errorf(codes.FailedPrecondition, TaskIsNotRunable)
}

if !task.OwnedBy(user.Address) {
// only the owner of a task can trigger it
n.logger.Error("task not own by user", "owner", user.Address, "task_id", payload.TaskId)
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

Expand All @@ -592,9 +593,11 @@ func (n *Engine) TriggerTask(user *model.User, payload *avsproto.UserTriggerTask
ExecutionID: ulid.Make().String(),
}

fmt.Println("task", task)
if payload.IsBlocking {
// Run the task inline, by pass the queue system
executor := NewExecutor(n.smartWalletConfig, n.db, n.logger)
fmt.Println("queue task Data", queueTaskData)
execution, err := executor.RunTask(task, &queueTaskData)
if err == nil {
return &avsproto.UserTriggerTaskResp{
Expand Down
2 changes: 2 additions & 0 deletions core/taskengine/trigger/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (b *BlockTrigger) Run(ctx context.Context) error {
}
case header := <-headers:
b.logger.Debug("detected new block, evaluating checks", "component", "blocktrigger", "block", header.Hash().Hex(), "number", header.Number)
b.progress = header.Number.Int64()

toRemove := []int{}
for interval, tasks := range b.schedule {
z := new(big.Int)
Expand Down
7 changes: 7 additions & 0 deletions core/taskengine/trigger/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type CommonTrigger struct {
done chan bool
shutdown bool
mu sync.Mutex

// a counter to track progress of the trigger. the counter will increase everytime a processing happen
progress int64
}

func (b *CommonTrigger) retryConnectToRpc() error {
Expand All @@ -53,3 +56,7 @@ func (b *CommonTrigger) Shutdown() {
b.shutdown = true
b.done <- true
}

func (b *CommonTrigger) GetProgress() int64 {
return b.progress
}
1 change: 1 addition & 0 deletions core/taskengine/trigger/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (evtTrigger *EventTrigger) Run(ctx context.Context) error {
evtTrigger.logger.Debug("detect new event, evaluate checks", "event", event.Topics[0], "contract", event.Address)
// TODO: implement hint to avoid scan all checks
toRemove := []string{}
evtTrigger.progress += 1

evtTrigger.checks.Range(func(key any, value any) bool {
if evtTrigger.shutdown {
Expand Down
2 changes: 2 additions & 0 deletions dockerfiles/operator.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM golang:1.23 AS builder
ARG RELEASE_TAG
ARG COMMIT_SHA

WORKDIR /app

Expand All @@ -11,6 +12,7 @@ COPY . ./

RUN CGO_ENABLED=0 GOOS=linux go build \
-ldflags "-X github.com/AvaProtocol/ap-avs/version.semver=$RELEASE_TAG" \
-ldflags "-X github.com/AvaProtocol/ap-avs/version.revision=$COMMIT_SHA" \
-o /ava


Expand Down
4 changes: 2 additions & 2 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ import (
avsproto "github.com/AvaProtocol/ap-avs/protobuf"
"github.com/AvaProtocol/ap-avs/version"

"github.com/AvaProtocol/ap-avs/core/config"
triggerengine "github.com/AvaProtocol/ap-avs/core/taskengine/trigger"
"github.com/AvaProtocol/ap-avs/core/config"
"github.com/AvaProtocol/ap-avs/pkg/ipfetcher"
"github.com/AvaProtocol/ap-avs/pkg/timekeeper"
)
Expand Down Expand Up @@ -196,7 +196,7 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {
var ethRpcClient *eth.InstrumentedClient
var ethWsClient *eth.InstrumentedClient

rpcCallsCollector := rpccalls.NewCollector(AVS_NAME, reg)
rpcCallsCollector := rpccalls.NewCollector(AVS_NAME, reg)
if c.EnableMetrics {
ethRpcClient, err = eth.NewInstrumentedClient(c.EthRpcUrl, rpcCallsCollector)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions operator/worker_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ func (o *Operator) PingServer() {
Version: version.Get(),
RemoteIP: o.GetPublicIP(),
MetricsPort: o.config.GetPublicMetricPort(),
BlockNumber: o.blockTrigger.GetProgress(),
EventCount: o.eventTrigger.GetProgress(),
})

if err != nil {
Expand Down
Loading
Loading