Skip to content

Apply node.input to the Step of Execution and simulateTask #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/status"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"

avsproto "github.com/AvaProtocol/EigenLayer-AVS/protobuf"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -1553,6 +1554,29 @@ func (n *Engine) SimulateTask(user *model.User, trigger *avsproto.TaskTrigger, n
// Add the trigger variable with the actual trigger name for JavaScript access
vm.AddVar(sanitizeTriggerNameForJS(trigger.GetName()), map[string]any{"data": triggerDataMap})

// Extract and add trigger input data if available
triggerInputData := ExtractTriggerInputData(trigger)
if triggerInputData != nil {
// Get existing trigger variable and add input data
triggerVarName := sanitizeTriggerNameForJS(trigger.GetName())
vm.mu.Lock()
existingTriggerVar := vm.vars[triggerVarName]
if existingMap, ok := existingTriggerVar.(map[string]any); ok {
// Apply dual-access mapping to trigger input data
processedTriggerInput := CreateDualAccessMap(triggerInputData)
existingMap["input"] = processedTriggerInput
vm.vars[triggerVarName] = existingMap
} else {
// Create new trigger variable with both data and input
processedTriggerInput := CreateDualAccessMap(triggerInputData)
vm.vars[triggerVarName] = map[string]any{
"data": triggerDataMap,
"input": processedTriggerInput,
}
}
vm.mu.Unlock()
}

// Step 7: Compile the workflow
if err = vm.Compile(); err != nil {
return nil, fmt.Errorf("failed to compile workflow for simulation: %w", err)
Expand All @@ -1565,6 +1589,16 @@ func (n *Engine) SimulateTask(user *model.User, trigger *avsproto.TaskTrigger, n
triggerInputs = append(triggerInputs, key)
}

// Extract trigger input data using the proper extraction function
extractedTriggerInput := ExtractTriggerInputData(task.Trigger)
var triggerInputProto *structpb.Value
if extractedTriggerInput != nil {
triggerInputProto, err = structpb.NewValue(extractedTriggerInput)
if err != nil {
n.logger.Warn("Failed to convert trigger input data to protobuf", "error", err)
}
}

triggerStep := &avsproto.Execution_Step{
Id: task.Trigger.Id, // Use new 'id' field
Success: true,
Expand All @@ -1575,6 +1609,7 @@ func (n *Engine) SimulateTask(user *model.User, trigger *avsproto.TaskTrigger, n
Inputs: triggerInputs, // Use inputVariables keys as trigger inputs
Type: queueData.TriggerType.String(), // Use trigger type as string
Name: task.Trigger.Name, // Use new 'name' field
Input: triggerInputProto, // Include extracted trigger input data for debugging
}

// Set trigger output data in the step using shared function
Expand Down
34 changes: 34 additions & 0 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"

"github.com/AvaProtocol/EigenLayer-AVS/model"
avsproto "github.com/AvaProtocol/EigenLayer-AVS/protobuf"
Expand Down Expand Up @@ -160,6 +161,28 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)
vm.WithLogger(x.logger).WithDb(x.db)
initialTaskStatus := task.Status

// Extract and add trigger input data if available
triggerInputData := ExtractTriggerInputData(task.Trigger)
if triggerInputData != nil {
// Get the trigger variable name and add input data
triggerVarName := sanitizeTriggerNameForJS(task.Trigger.GetName())
vm.mu.Lock()
existingTriggerVar := vm.vars[triggerVarName]
if existingMap, ok := existingTriggerVar.(map[string]any); ok {
// Apply dual-access mapping to trigger input data
processedTriggerInput := CreateDualAccessMap(triggerInputData)
existingMap["input"] = processedTriggerInput
vm.vars[triggerVarName] = existingMap
} else {
// Create new trigger variable with input data
processedTriggerInput := CreateDualAccessMap(triggerInputData)
vm.vars[triggerVarName] = map[string]any{
"input": processedTriggerInput,
}
}
vm.mu.Unlock()
}

if err != nil {
return nil, fmt.Errorf("vm failed to initialize: %w", err)
}
Expand Down Expand Up @@ -189,6 +212,16 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)
// This ensures regular workflows have complete execution history (trigger + nodes)

// Create trigger step similar to SimulateTask
// Extract trigger input data using the proper extraction function
triggerInputData := ExtractTriggerInputData(task.Trigger)
var triggerInputProto *structpb.Value
if triggerInputData != nil {
triggerInputProto, err = structpb.NewValue(triggerInputData)
if err != nil {
x.logger.Warn("Failed to convert trigger input data to protobuf", "error", err)
}
}

triggerStep := &avsproto.Execution_Step{
Id: task.Trigger.Id,
Success: true,
Expand All @@ -199,6 +232,7 @@ func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData)
Inputs: []string{}, // Empty inputs for trigger steps
Type: queueData.TriggerType.String(),
Name: task.Trigger.Name,
Input: triggerInputProto, // Include extracted trigger input data for debugging
}

// Set trigger output data in the step based on trigger type
Expand Down
10 changes: 8 additions & 2 deletions core/taskengine/run_node_immediately.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,12 @@ func (n *Engine) RunTriggerRPC(user *model.User, req *avsproto.RunTriggerReq) (*
triggerConfig[k] = v.AsInterface()
}

// Extract trigger input data from the request
triggerInput := make(map[string]interface{})
for k, v := range req.TriggerInput {
triggerInput[k] = v.AsInterface()
}

// Convert TriggerType enum to string
triggerTypeStr := TriggerTypeToString(req.TriggerType)
if triggerTypeStr == "" {
Expand All @@ -1545,8 +1551,8 @@ func (n *Engine) RunTriggerRPC(user *model.User, req *avsproto.RunTriggerReq) (*
return resp, nil
}

// Execute the trigger immediately (triggers don't accept input variables)
result, err := n.runTriggerImmediately(triggerTypeStr, triggerConfig, nil)
// Execute the trigger immediately with trigger input data
result, err := n.runTriggerImmediately(triggerTypeStr, triggerConfig, triggerInput)
if err != nil {
if n.logger != nil {
// Categorize errors to avoid unnecessary stack traces for expected validation errors
Expand Down
Loading