Skip to content

Fix linting issues in staging branch #218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (n *Engine) CreateTask(user *model.User, taskPayload *avsproto.CreateTaskRe

updates := map[string][]byte{}

updates[string(TaskStorageKey(task.Id, task.Status))], err = task.ToJSON()
updates[string(TaskStorageKey(task.Task.Id, task.Task.Status))], err = task.ToJSON()
updates[string(TaskUserKey(task))] = []byte(fmt.Sprintf("%d", avsproto.TaskStatus_Active))

if err = n.db.BatchWrite(updates); err != nil {
Expand All @@ -314,7 +314,7 @@ func (n *Engine) CreateTask(user *model.User, taskPayload *avsproto.CreateTaskRe

n.lock.Lock()
defer n.lock.Unlock()
n.tasks[task.Id] = task
n.tasks[task.Task.Id] = task

return task, nil
}
Expand Down Expand Up @@ -365,31 +365,31 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av
}

for _, task := range n.tasks {
if _, ok := n.trackSyncedTasks[address].TaskID[task.Id]; ok {
if _, ok := n.trackSyncedTasks[address].TaskID[task.Task.Id]; ok {
continue
}

resp := avsproto.SyncMessagesResp{
Id: task.Id,
Id: task.Task.Id,
Op: avsproto.MessageOp_MonitorTaskTrigger,

TaskMetadata: &avsproto.SyncMessagesResp_TaskMetadata{
TaskId: task.Id,
Remain: task.MaxExecution,
ExpiredAt: task.ExpiredAt,
TaskId: task.Task.Id,
Remain: task.Task.MaxExecution,
ExpiredAt: task.Task.ExpiredAt,
Trigger: task.Trigger,
},
}
n.logger.Info("stream check to operator", "task_id", task.Id, "operator", payload.Address, "resp", resp)
n.logger.Info("stream check to operator", "task_id", task.Task.Id, "operator", payload.Address, "resp", resp)

if err := srv.Send(&resp); err != nil {
// return error to cause client to establish re-connect the connection
n.logger.Info("error sending check to operator", "task_id", task.Id, "operator", payload.Address)
n.logger.Info("error sending check to operator", "task_id", task.Task.Id, "operator", payload.Address)
return fmt.Errorf("cannot send data back to grpc channel")
}

n.lock.Lock()
n.trackSyncedTasks[address].TaskID[task.Id] = true
n.trackSyncedTasks[address].TaskID[task.Task.Id] = true
n.lock.Unlock()
}
}
Expand Down Expand Up @@ -503,7 +503,7 @@ func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksRe
if err := task.FromStorageData(taskRawByte); err != nil {
continue
}
task.Id = taskID
task.Task.Id = taskID

if t, err := task.ToProtoBuf(); err == nil {
taskResp.Items = append(taskResp.Items, &avsproto.ListTasksResp_Item{
Expand Down Expand Up @@ -859,11 +859,11 @@ func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error)
return false, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

if task.Status == avsproto.TaskStatus_Executing {
if task.Task.Status == avsproto.TaskStatus_Executing {
return false, fmt.Errorf("Only non executing task can be deleted")
}

n.db.Delete(TaskStorageKey(task.Id, task.Status))
n.db.Delete(TaskStorageKey(task.Task.Id, task.Task.Status))
n.db.Delete(TaskUserKey(task))

return true, nil
Expand All @@ -876,23 +876,23 @@ func (n *Engine) CancelTaskByUser(user *model.User, taskID string) (bool, error)
return false, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

if task.Status != avsproto.TaskStatus_Active {
if task.Task.Status != avsproto.TaskStatus_Active {
return false, fmt.Errorf("Only active task can be cancelled")
}

updates := map[string][]byte{}
oldStatus := task.Status
oldStatus := task.Task.Status
task.SetCanceled()
updates[string(TaskStorageKey(task.Id, oldStatus))], err = task.ToJSON()
updates[string(TaskUserKey(task))] = []byte(fmt.Sprintf("%d", task.Status))
updates[string(TaskStorageKey(task.Task.Id, oldStatus))], err = task.ToJSON()
updates[string(TaskUserKey(task))] = []byte(fmt.Sprintf("%d", task.Task.Status))

if err = n.db.BatchWrite(updates); err == nil {
n.db.Move(
TaskStorageKey(task.Id, oldStatus),
TaskStorageKey(task.Id, task.Status),
TaskStorageKey(task.Task.Id, oldStatus),
TaskStorageKey(task.Task.Id, task.Task.Status),
)

delete(n.tasks, task.Id)
delete(n.tasks, task.Task.Id)
} else {
return false, err
}
Expand Down
30 changes: 15 additions & 15 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {
// ref: AggregateChecksResult
err = json.Unmarshal(job.Data, queueData)
if err != nil {
return fmt.Errorf("error decode job payload when executing task: %s with job id %d", task.Id, job.ID)
return fmt.Errorf("error decode job payload when executing task: %s with job id %d", task.Task.Id, job.ID)
}

_, err = x.RunTask(task, queueData)
Expand Down Expand Up @@ -95,15 +95,15 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)
}

vm.WithLogger(x.logger).WithDb(x.db)
initialTaskStatus := task.Status
initialTaskStatus := task.Task.Status

if err != nil {
return nil, fmt.Errorf("vm failed to initialize: %w", err)
}

t0 := time.Now()
task.TotalExecution += 1
task.LastRanAt = t0.UnixMilli()
task.Task.TotalExecution += 1
task.Task.LastRanAt = t0.UnixMilli()

var runTaskErr error = nil
if err = vm.Compile(); err != nil {
Expand All @@ -116,11 +116,11 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)
t1 := time.Now()

// when MaxExecution is 0, it means unlimited run until cancel
if task.MaxExecution > 0 && task.TotalExecution >= task.MaxExecution {
if task.Task.MaxExecution > 0 && task.Task.TotalExecution >= task.Task.MaxExecution {
task.SetCompleted()
}

if task.ExpiredAt > 0 && t1.UnixMilli() >= task.ExpiredAt {
if task.Task.ExpiredAt > 0 && t1.UnixMilli() >= task.Task.ExpiredAt {
task.SetCompleted()
}

Expand All @@ -140,14 +140,14 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)
}

if runTaskErr != nil {
x.logger.Error("error executing task", "error", err, "runError", runTaskErr, "task_id", task.Id, "triggermark", triggerMetadata)
x.logger.Error("error executing task", "error", err, "runError", runTaskErr, "task_id", task.Task.Id, "triggermark", triggerMetadata)
execution.Error = runTaskErr.Error()
}

// batch update storage for task + execution log
updates := map[string][]byte{}
updates[string(TaskStorageKey(task.Id, task.Status))], err = task.ToJSON()
updates[string(TaskUserKey(task))] = []byte(fmt.Sprintf("%d", task.Status))
updates[string(TaskStorageKey(task.Task.Id, task.Task.Status))], err = task.ToJSON()
updates[string(TaskUserKey(task))] = []byte(fmt.Sprintf("%d", task.Task.Status))

// update execution log
executionByte, err := protojson.Marshal(execution)
Expand All @@ -157,19 +157,19 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)

if err = x.db.BatchWrite(updates); err != nil {
// TODO Monitor to see how often this happen
x.logger.Errorf("error updating task status. %w", err, "task_id", task.Id)
x.logger.Errorf("error updating task status. %w", err, "task_id", task.Task.Id)
}

// whenever a task change its status, we moved it, therefore we will need to clean up the old storage
if task.Status != initialTaskStatus {
if err = x.db.Delete(TaskStorageKey(task.Id, initialTaskStatus)); err != nil {
x.logger.Errorf("error updating task status. %w", err, "task_id", task.Id)
if task.Task.Status != initialTaskStatus {
if err = x.db.Delete(TaskStorageKey(task.Task.Id, initialTaskStatus)); err != nil {
x.logger.Errorf("error updating task status. %w", err, "task_id", task.Task.Id)
}
}

if runTaskErr == nil {
x.logger.Info("succesfully executing task", "task_id", task.Id, "triggermark", triggerMetadata)
x.logger.Info("succesfully executing task", "task_id", task.Task.Id, "triggermark", triggerMetadata)
return execution, nil
}
return execution, fmt.Errorf("Error executing task %s: %v", task.Id, runTaskErr)
return execution, fmt.Errorf("Error executing task %s: %v", task.Task.Id, runTaskErr)
}
14 changes: 14 additions & 0 deletions core/taskengine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package taskengine
import (
"net/http"
"net/http/httptest"
"os"
"reflect"
"sort"
"testing"
"time"

"github.com/AvaProtocol/EigenLayer-AVS/core/testutil"
"github.com/AvaProtocol/EigenLayer-AVS/model"
Expand Down Expand Up @@ -129,6 +131,10 @@ func TestExecutorRunTaskSucess(t *testing.T) {
}

func TestExecutorRunTaskStopAndReturnErrorWhenANodeFailed(t *testing.T) {
if os.Getenv("CONTROLLER_PRIVATE_KEY") == "" {
t.Skip("Skipping test because CONTROLLER_PRIVATE_KEY is not set")
}

SetRpc(testutil.GetTestRPCURL())
SetCache(testutil.GetDefaultCache())
db := testutil.TestMustDB()
Expand Down Expand Up @@ -217,6 +223,10 @@ func TestExecutorRunTaskStopAndReturnErrorWhenANodeFailed(t *testing.T) {
}

func TestExecutorRunTaskComputeSuccessFalseWhenANodeFailedToRun(t *testing.T) {
if os.Getenv("CONTROLLER_PRIVATE_KEY") == "" {
t.Skip("Skipping test because CONTROLLER_PRIVATE_KEY is not set")
}

// Set up a test HTTP server that returns a 503 status code
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
Expand Down Expand Up @@ -312,6 +322,10 @@ func TestExecutorRunTaskComputeSuccessFalseWhenANodeFailedToRun(t *testing.T) {

// TestExecutorRunTaskReturnAllExecutionData to test the happy path and return all the relevant data a task needed
func TestExecutorRunTaskReturnAllExecutionData(t *testing.T) {
if os.Getenv("CONTROLLER_PRIVATE_KEY") == "" {
t.Skip("Skipping test because CONTROLLER_PRIVATE_KEY is not set")
}

SetRpc(testutil.GetTestRPCURL())
SetCache(testutil.GetDefaultCache())
db := testutil.TestMustDB()
Expand Down
6 changes: 3 additions & 3 deletions core/taskengine/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (
func LoadSecretForTask(db storage.Storage, task *model.Task) (map[string]string, error) {
secrets := map[string]string{}

if task.Owner == "" {
if task.Task.Owner == "" {
return nil, fmt.Errorf("missing user in task structure")
}

user := &model.User{
Address: common.HexToAddress(task.Owner),
Address: common.HexToAddress(task.Task.Owner),
}

prefixes := []string{
Expand Down Expand Up @@ -51,7 +51,7 @@ func LoadSecretForTask(db storage.Storage, task *model.Task) (map[string]string,
continue
}

if secretWithNameOnly.WorkflowID == task.Id {
if secretWithNameOnly.WorkflowID == task.Task.Id {
if value, err := db.GetKey([]byte(k)); err == nil {
secrets[secretWithNameOnly.Name] = string(value)
}
Expand Down
2 changes: 1 addition & 1 deletion core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func (v *VM) CollectInputs() []string {

func (v *VM) GetTaskId() string {
if v.task != nil && v.task.Task != nil {
return v.task.Id
return v.task.Task.Id
}

return ""
Expand Down
2 changes: 2 additions & 0 deletions core/taskengine/vm_runner_branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (r *BranchProcessor) Execute(stepID string, node *avsproto.BranchNode) (*av
sb.WriteString("error evaluating expression")
s.Log = sb.String()
s.EndAt = time.Now().UnixMilli()
s.OutputData = nil
return s, fmt.Errorf("error evaluating the statement: %w", err)
}

Expand All @@ -116,6 +117,7 @@ func (r *BranchProcessor) Execute(stepID string, node *avsproto.BranchNode) (*av
sb.WriteString("error evaluating expression")
s.Log = sb.String()
s.EndAt = time.Now().UnixMilli()
s.OutputData = nil
return s, fmt.Errorf("error evaluating the statement: %w", err)
}
}
Expand Down
Loading