diff --git a/core/taskengine/engine.go b/core/taskengine/engine.go index 329fcb5a..08b43a44 100644 --- a/core/taskengine/engine.go +++ b/core/taskengine/engine.go @@ -200,6 +200,7 @@ func New(db storage.Storage, config *config.Config, queue *apqueue.Queue, logger // Initialize TokenEnrichmentService // Always try to initialize, even without RPC, so we can serve whitelist data + logger.Debug("initializing TokenEnrichmentService", "has_rpc", rpcConn != nil) tokenService, err := NewTokenEnrichmentService(rpcConn, logger) if err != nil { logger.Warn("Failed to initialize TokenEnrichmentService", "error", err) @@ -252,16 +253,23 @@ func (n *Engine) MustStart() error { if e != nil { panic(e) } + + loadedCount := 0 for _, item := range kvs { task := &model.Task{ Task: &avsproto.Task{}, } err := protojson.Unmarshal(item.Value, task) if err == nil { - n.tasks[string(item.Key)] = task + n.tasks[task.Id] = task + loadedCount++ + } else { + n.logger.Warn("Failed to unmarshal task during startup", "storage_key", string(item.Key), "error", err) } } + n.logger.Info("🚀 Engine started successfully", "active_tasks_loaded", loadedCount) + // Start the batch notification processor go n.processBatchedNotifications() @@ -941,12 +949,37 @@ func (n *Engine) AggregateChecksResultWithState(address string, payload *avsprot // Get task information to determine execution state task, exists := n.tasks[payload.TaskId] if !exists { - return &ExecutionState{ - RemainingExecutions: 0, - TaskStillActive: false, - Status: "not_found", - Message: "Task not found", - }, fmt.Errorf("task %s not found", payload.TaskId) + // Task not found in memory - this could indicate a synchronization issue + // Try to load the task from database as a fallback + n.logger.Warn("Task not found in memory, attempting database lookup", + "task_id", payload.TaskId, + "operator", address, + "memory_task_count", len(n.tasks)) + + dbTask, dbErr := n.GetTaskByID(payload.TaskId) + if dbErr != nil { + n.logger.Error("Task not found in database either", + "task_id", payload.TaskId, + "operator", address, + "db_error", dbErr) + return &ExecutionState{ + RemainingExecutions: 0, + TaskStillActive: false, + Status: "not_found", + Message: "Task not found", + }, fmt.Errorf("task %s not found", payload.TaskId) + } + + // Task found in database but not in memory - add it to memory and continue + n.lock.Lock() + n.tasks[dbTask.Id] = dbTask + n.lock.Unlock() + task = dbTask + + n.logger.Info("Task recovered from database and added to memory", + "task_id", payload.TaskId, + "operator", address, + "task_status", task.Status) } // Check if task is still runnable @@ -988,27 +1021,48 @@ func (n *Engine) AggregateChecksResultWithState(address string, payload *avsprot } // Enrich EventTrigger output if TokenEnrichmentService is available and it's a Transfer event - if payload.TriggerType == avsproto.TriggerType_TRIGGER_TYPE_EVENT && n.tokenEnrichmentService != nil { - if eventOutput := triggerData.Output.(*avsproto.EventTrigger_Output); eventOutput != nil { - if evmLog := eventOutput.GetEvmLog(); evmLog != nil { - n.logger.Debug("enriching EventTrigger output from operator", - "task_id", payload.TaskId, - "tx_hash", evmLog.TransactionHash, - "block_number", evmLog.BlockNumber) - - // Fetch full event data from the blockchain using the minimal data from operator - if enrichedEventOutput, err := n.enrichEventTriggerFromOperatorData(evmLog); err == nil { - // Replace the minimal event output with the enriched one - triggerData.Output = enrichedEventOutput - n.logger.Debug("successfully enriched EventTrigger output", + if payload.TriggerType == avsproto.TriggerType_TRIGGER_TYPE_EVENT { + n.logger.Debug("processing event trigger", + "task_id", payload.TaskId, + "has_token_service", n.tokenEnrichmentService != nil) + + if n.tokenEnrichmentService != nil { + if eventOutput := triggerData.Output.(*avsproto.EventTrigger_Output); eventOutput != nil { + if evmLog := eventOutput.GetEvmLog(); evmLog != nil { + n.logger.Debug("enriching EventTrigger output from operator", "task_id", payload.TaskId, - "has_transfer_log", enrichedEventOutput.GetTransferLog() != nil) + "tx_hash", evmLog.TransactionHash, + "block_number", evmLog.BlockNumber, + "log_index", evmLog.Index, + "address", evmLog.Address, + "topics_count", len(evmLog.Topics), + "data_length", len(evmLog.Data)) + + // Fetch full event data from the blockchain using the minimal data from operator + if enrichedEventOutput, err := n.enrichEventTriggerFromOperatorData(evmLog); err == nil { + // Replace the minimal event output with the enriched one + triggerData.Output = enrichedEventOutput + n.logger.Debug("successfully enriched EventTrigger output", + "task_id", payload.TaskId, + "has_transfer_log", enrichedEventOutput.GetTransferLog() != nil, + "has_evm_log", enrichedEventOutput.GetEvmLog() != nil) + } else { + n.logger.Warn("failed to enrich EventTrigger output, using minimal data", + "task_id", payload.TaskId, + "error", err) + } } else { - n.logger.Warn("failed to enrich EventTrigger output, using minimal data", + n.logger.Debug("EventTrigger output has no EvmLog data", "task_id", payload.TaskId, - "error", err) + "has_transfer_log", eventOutput.GetTransferLog() != nil) } + } else { + n.logger.Debug("EventTrigger output is nil", + "task_id", payload.TaskId) } + } else { + n.logger.Debug("TokenEnrichmentService not available for event enrichment", + "task_id", payload.TaskId) } } @@ -1018,6 +1072,26 @@ func (n *Engine) AggregateChecksResultWithState(address string, payload *avsprot ExecutionID: ulid.Make().String(), } + // For event triggers, if we have enriched data, convert it to a map format that survives JSON serialization + if triggerData.Type == avsproto.TriggerType_TRIGGER_TYPE_EVENT { + if eventOutput, ok := triggerData.Output.(*avsproto.EventTrigger_Output); ok { + // Convert the enriched protobuf data to a map that will survive JSON serialization + enrichedDataMap := buildTriggerDataMapFromProtobuf(triggerData.Type, eventOutput, n.logger) + + // Store the enriched data as a map instead of protobuf structure + // This ensures the enriched data survives JSON serialization/deserialization + queueTaskData.TriggerOutput = map[string]interface{}{ + "enriched_data": enrichedDataMap, + "trigger_type": triggerData.Type.String(), + } + + n.logger.Debug("stored enriched event trigger data for queue execution", + "task_id", payload.TaskId, + "has_token_symbol", enrichedDataMap["tokenSymbol"] != nil, + "has_value_formatted", enrichedDataMap["valueFormatted"] != nil) + } + } + data, err := json.Marshal(queueTaskData) if err != nil { n.logger.Error("error serialize trigger to json", err) @@ -1443,8 +1517,33 @@ func (n *Engine) SimulateTask(user *model.User, trigger *avsproto.TaskTrigger, n vm.WithLogger(n.logger).WithDb(n.db) // Add input variables to VM for template processing + // Apply dual-access mapping to enable both camelCase and snake_case field access for key, value := range inputVariables { - vm.AddVar(key, value) + // Apply dual-access mapping if the value is a map with a "data" field + if valueMap, ok := value.(map[string]interface{}); ok { + if dataField, hasData := valueMap["data"]; hasData { + if dataMap, isDataMap := dataField.(map[string]interface{}); isDataMap { + // Apply dual-access mapping to the data field + dualAccessData := CreateDualAccessMap(dataMap) + // Create a new map with the dual-access data + processedValue := make(map[string]interface{}) + for k, v := range valueMap { + if k == "data" { + processedValue[k] = dualAccessData + } else { + processedValue[k] = v + } + } + vm.AddVar(key, processedValue) + } else { + vm.AddVar(key, value) + } + } else { + vm.AddVar(key, value) + } + } else { + vm.AddVar(key, value) + } } // Step 6: Add trigger data as "trigger" variable for convenient access in JavaScript @@ -1486,7 +1585,7 @@ func (n *Engine) SimulateTask(user *model.User, trigger *avsproto.TaskTrigger, n // Step 8.5: Update VM trigger variable with actual execution results // This ensures subsequent nodes can access the trigger's actual output via eventTrigger.data - actualTriggerDataMap := buildTriggerDataMapFromProtobuf(queueData.TriggerType, triggerOutputProto) + actualTriggerDataMap := buildTriggerDataMapFromProtobuf(queueData.TriggerType, triggerOutputProto, n.logger) vm.AddVar(sanitizeTriggerNameForJS(trigger.GetName()), map[string]any{"data": actualTriggerDataMap}) // Step 9: Run the workflow nodes @@ -2406,21 +2505,35 @@ func getStringMapKeys(m map[string]interface{}) []string { // enrichEventTriggerFromOperatorData fetches full event data from blockchain and enriches it with token metadata func (n *Engine) enrichEventTriggerFromOperatorData(minimalEvmLog *avsproto.Evm_Log) (*avsproto.EventTrigger_Output, error) { if minimalEvmLog.TransactionHash == "" { + n.logger.Debug("enrichment failed: transaction hash is empty") return nil, fmt.Errorf("transaction hash is required for enrichment") } // Get RPC client (using the global rpcConn variable) if rpcConn == nil { + n.logger.Debug("enrichment failed: RPC client not available") return nil, fmt.Errorf("RPC client not available") } + n.logger.Debug("starting event enrichment", + "tx_hash", minimalEvmLog.TransactionHash, + "log_index", minimalEvmLog.Index, + "block_number", minimalEvmLog.BlockNumber) + // Fetch transaction receipt to get the full event logs ctx := context.Background() receipt, err := rpcConn.TransactionReceipt(ctx, common.HexToHash(minimalEvmLog.TransactionHash)) if err != nil { + n.logger.Debug("enrichment failed: could not fetch transaction receipt", + "tx_hash", minimalEvmLog.TransactionHash, + "error", err) return nil, fmt.Errorf("failed to fetch transaction receipt: %w", err) } + n.logger.Debug("fetched transaction receipt", + "tx_hash", minimalEvmLog.TransactionHash, + "logs_count", len(receipt.Logs)) + // Find the specific log that matches the operator's data var targetLog *types.Log for _, log := range receipt.Logs { @@ -2431,10 +2544,27 @@ func (n *Engine) enrichEventTriggerFromOperatorData(minimalEvmLog *avsproto.Evm_ } if targetLog == nil { + n.logger.Debug("enrichment failed: log not found in transaction", + "tx_hash", minimalEvmLog.TransactionHash, + "expected_log_index", minimalEvmLog.Index, + "available_log_indices", func() []uint32 { + indices := make([]uint32, len(receipt.Logs)) + for i, log := range receipt.Logs { + indices[i] = uint32(log.Index) + } + return indices + }()) return nil, fmt.Errorf("log with index %d not found in transaction %s", minimalEvmLog.Index, minimalEvmLog.TransactionHash) } + n.logger.Debug("found target log", + "tx_hash", minimalEvmLog.TransactionHash, + "log_index", targetLog.Index, + "address", targetLog.Address.Hex(), + "topics_count", len(targetLog.Topics), + "data_length", len(targetLog.Data)) + // Create enriched EVM log with full data enrichedEvmLog := &avsproto.Evm_Log{ Address: targetLog.Address.Hex(), @@ -2459,12 +2589,26 @@ func (n *Engine) enrichEventTriggerFromOperatorData(minimalEvmLog *avsproto.Evm_ isTransferEvent := len(targetLog.Topics) > 0 && targetLog.Topics[0].Hex() == "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" + n.logger.Debug("checking if transfer event", + "is_transfer", isTransferEvent, + "topics_count", len(targetLog.Topics), + "first_topic", func() string { + if len(targetLog.Topics) > 0 { + return targetLog.Topics[0].Hex() + } + return "none" + }()) + if isTransferEvent && len(targetLog.Topics) >= 3 { + n.logger.Debug("processing as transfer event") + // Get block timestamp for transfer_log header, err := rpcConn.HeaderByNumber(ctx, big.NewInt(int64(targetLog.BlockNumber))) var blockTimestamp uint64 if err == nil { blockTimestamp = header.Time * 1000 // Convert to milliseconds + } else { + n.logger.Debug("could not fetch block header for timestamp", "error", err) } // Extract from and to addresses from topics @@ -2488,10 +2632,22 @@ func (n *Engine) enrichEventTriggerFromOperatorData(minimalEvmLog *avsproto.Evm_ LogIndex: uint32(targetLog.Index), } + n.logger.Debug("created transfer log", + "from", fromAddr, + "to", toAddr, + "value", value, + "token_address", targetLog.Address.Hex()) + // Enrich with token metadata if err := n.tokenEnrichmentService.EnrichTransferLog(enrichedEvmLog, transferLog); err != nil { n.logger.Warn("failed to enrich transfer log with token metadata", "error", err) // Continue without enrichment - partial data is better than no data + } else { + n.logger.Debug("successfully enriched transfer log", + "token_name", transferLog.TokenName, + "token_symbol", transferLog.TokenSymbol, + "token_decimals", transferLog.TokenDecimals, + "value_formatted", transferLog.ValueFormatted) } // Use the oneof TransferLog field @@ -2499,12 +2655,17 @@ func (n *Engine) enrichEventTriggerFromOperatorData(minimalEvmLog *avsproto.Evm_ TransferLog: transferLog, } } else { + n.logger.Debug("processing as regular EVM log event") // Regular event (not a transfer) - use the oneof EvmLog field enrichedOutput.OutputType = &avsproto.EventTrigger_Output_EvmLog{ EvmLog: enrichedEvmLog, } } + n.logger.Debug("enrichment completed successfully", + "has_transfer_log", enrichedOutput.GetTransferLog() != nil, + "has_evm_log", enrichedOutput.GetEvmLog() != nil) + return enrichedOutput, nil } @@ -2936,9 +3097,17 @@ func buildTriggerDataMap(triggerType avsproto.TriggerType, triggerOutput map[str triggerDataMap["gasUsed"] = gasUsed } case avsproto.TriggerType_TRIGGER_TYPE_EVENT: - // Copy all event trigger data - for k, v := range triggerOutput { - triggerDataMap[k] = v + // Handle event trigger data with special processing for transfer_log + if transferLogData, hasTransferLog := triggerOutput["transfer_log"].(map[string]interface{}); hasTransferLog { + // Flatten transfer_log data to top level for JavaScript access + for k, v := range transferLogData { + triggerDataMap[k] = v + } + } else { + // For non-transfer events, copy all event trigger data + for k, v := range triggerOutput { + triggerDataMap[k] = v + } } default: // For unknown trigger types, copy all output data @@ -3035,13 +3204,14 @@ func buildExecutionStepOutputData(triggerType avsproto.TriggerType, triggerOutpu // Parameters: // - triggerType: the type of trigger being processed // - triggerOutputProto: the protobuf trigger output structure (e.g., *avsproto.BlockTrigger_Output) +// - logger: optional logger for debugging (can be nil) // // Returns: // - map[string]interface{}: JavaScript-accessible trigger data map // // This function differs from buildTriggerDataMap in that it works with structured protobuf data // rather than raw trigger output maps, making it suitable for VM initialization. -func buildTriggerDataMapFromProtobuf(triggerType avsproto.TriggerType, triggerOutputProto interface{}) map[string]interface{} { +func buildTriggerDataMapFromProtobuf(triggerType avsproto.TriggerType, triggerOutputProto interface{}, logger sdklogging.Logger) map[string]interface{} { triggerDataMap := make(map[string]interface{}) if triggerOutputProto == nil { @@ -3082,30 +3252,51 @@ func buildTriggerDataMapFromProtobuf(triggerType avsproto.TriggerType, triggerOu // Check if we have transfer log data in the event output if transferLogData := eventOutput.GetTransferLog(); transferLogData != nil { // Use transfer log data to populate rich trigger data matching runTrigger format - triggerDataMap["token_name"] = transferLogData.TokenName - triggerDataMap["token_symbol"] = transferLogData.TokenSymbol - triggerDataMap["token_decimals"] = transferLogData.TokenDecimals - triggerDataMap["transaction_hash"] = transferLogData.TransactionHash + // Use camelCase field names for JavaScript compatibility + triggerDataMap["tokenName"] = transferLogData.TokenName + triggerDataMap["tokenSymbol"] = transferLogData.TokenSymbol + triggerDataMap["tokenDecimals"] = transferLogData.TokenDecimals + triggerDataMap["transactionHash"] = transferLogData.TransactionHash triggerDataMap["address"] = transferLogData.Address - triggerDataMap["block_number"] = transferLogData.BlockNumber - triggerDataMap["block_timestamp"] = transferLogData.BlockTimestamp - triggerDataMap["from_address"] = transferLogData.FromAddress - triggerDataMap["to_address"] = transferLogData.ToAddress + triggerDataMap["blockNumber"] = transferLogData.BlockNumber + triggerDataMap["blockTimestamp"] = transferLogData.BlockTimestamp + triggerDataMap["fromAddress"] = transferLogData.FromAddress + triggerDataMap["toAddress"] = transferLogData.ToAddress triggerDataMap["value"] = transferLogData.Value - triggerDataMap["value_formatted"] = transferLogData.ValueFormatted - triggerDataMap["transaction_index"] = transferLogData.TransactionIndex - triggerDataMap["log_index"] = transferLogData.LogIndex - } else if evmLog := eventOutput.GetEvmLog(); evmLog != nil { - // Fall back to basic EVM log data matching runTrigger format - triggerDataMap["block_number"] = evmLog.BlockNumber - triggerDataMap["log_index"] = evmLog.Index - triggerDataMap["tx_hash"] = evmLog.TransactionHash - triggerDataMap["address"] = evmLog.Address - triggerDataMap["topics"] = evmLog.Topics - triggerDataMap["data"] = evmLog.Data - triggerDataMap["block_hash"] = evmLog.BlockHash - triggerDataMap["transaction_index"] = evmLog.TransactionIndex - triggerDataMap["removed"] = evmLog.Removed + triggerDataMap["valueFormatted"] = transferLogData.ValueFormatted + triggerDataMap["transactionIndex"] = transferLogData.TransactionIndex + triggerDataMap["logIndex"] = transferLogData.LogIndex + } else if evmLogData := eventOutput.GetEvmLog(); evmLogData != nil { + // Use EVM log data for regular events + triggerDataMap["address"] = evmLogData.Address + triggerDataMap["topics"] = evmLogData.Topics + triggerDataMap["data"] = evmLogData.Data + triggerDataMap["blockNumber"] = evmLogData.BlockNumber + triggerDataMap["transactionHash"] = evmLogData.TransactionHash + triggerDataMap["transactionIndex"] = evmLogData.TransactionIndex + triggerDataMap["blockHash"] = evmLogData.BlockHash + triggerDataMap["logIndex"] = evmLogData.Index + triggerDataMap["removed"] = evmLogData.Removed + } + } else if enrichedDataMap, ok := triggerOutputProto.(map[string]interface{}); ok { + // Handle the new enriched data format that survives JSON serialization + if enrichedData, hasEnrichedData := enrichedDataMap["enriched_data"].(map[string]interface{}); hasEnrichedData { + // Copy all enriched data to the trigger data map + for k, v := range enrichedData { + triggerDataMap[k] = v + } + if logger != nil { + logger.Debug("loaded enriched event trigger data from queue", + "has_token_symbol", triggerDataMap["tokenSymbol"] != nil, + "has_value_formatted", triggerDataMap["valueFormatted"] != nil) + } + } else { + // Fallback: copy all data from the map + for k, v := range enrichedDataMap { + if k != "trigger_type" { // Skip metadata + triggerDataMap[k] = v + } + } } } default: diff --git a/core/taskengine/engine_trigger_output_test.go b/core/taskengine/engine_trigger_output_test.go index 7d0a332a..030ed399 100644 --- a/core/taskengine/engine_trigger_output_test.go +++ b/core/taskengine/engine_trigger_output_test.go @@ -239,26 +239,26 @@ func TestBuildTriggerDataMapFromProtobufEventTriggerComprehensive(t *testing.T) description: "Should map all TransferLog fields including the critical log_index field", verifyFunc: func(t *testing.T, result map[string]interface{}) { expected := map[string]interface{}{ - "token_name": "Test Token", - "token_symbol": "TEST", - "token_decimals": uint32(18), - "transaction_hash": "0x1234567890abcdef", - "address": "0xabcdef1234567890", - "block_number": uint64(12345678), - "block_timestamp": uint64(1672531200), - "from_address": "0x1111111111111111", - "to_address": "0x2222222222222222", - "value": "1000000000000000000", - "value_formatted": "1.0", - "transaction_index": uint32(5), - "log_index": uint32(3), - "type": "TRIGGER_TYPE_EVENT", + "tokenName": "Test Token", + "tokenSymbol": "TEST", + "tokenDecimals": uint32(18), + "transactionHash": "0x1234567890abcdef", + "address": "0xabcdef1234567890", + "blockNumber": uint64(12345678), + "blockTimestamp": uint64(1672531200), + "fromAddress": "0x1111111111111111", + "toAddress": "0x2222222222222222", + "value": "1000000000000000000", + "valueFormatted": "1.0", + "transactionIndex": uint32(5), + "logIndex": uint32(3), + "type": "TRIGGER_TYPE_EVENT", } require.Equal(t, expected, result, "All TransferLog fields should be properly mapped") - require.Contains(t, result, "log_index", "log_index field should be present") - require.Equal(t, uint32(3), result["log_index"], "log_index should have correct value") + require.Contains(t, result, "logIndex", "logIndex field should be present") + require.Equal(t, uint32(3), result["logIndex"], "logIndex should have correct value") }, }, { @@ -281,29 +281,29 @@ func TestBuildTriggerDataMapFromProtobufEventTriggerComprehensive(t *testing.T) description: "Should map all EvmLog fields including log_index", verifyFunc: func(t *testing.T, result map[string]interface{}) { expected := map[string]interface{}{ - "block_number": uint64(12345678), - "log_index": uint32(3), - "tx_hash": "0x1234567890abcdef", - "address": "0xabcdef1234567890", - "topics": []string{"0xtopic1", "0xtopic2", "0xtopic3"}, - "data": "0xdeadbeef", - "block_hash": "0xblockhash123456", - "transaction_index": uint32(5), - "removed": false, - "type": "TRIGGER_TYPE_EVENT", + "blockNumber": uint64(12345678), + "logIndex": uint32(3), + "transactionHash": "0x1234567890abcdef", + "address": "0xabcdef1234567890", + "topics": []string{"0xtopic1", "0xtopic2", "0xtopic3"}, + "data": "0xdeadbeef", + "blockHash": "0xblockhash123456", + "transactionIndex": uint32(5), + "removed": false, + "type": "TRIGGER_TYPE_EVENT", } require.Equal(t, expected, result, "All EvmLog fields should be properly mapped") - require.Contains(t, result, "log_index", "log_index field should be present") - require.Equal(t, uint32(3), result["log_index"], "log_index should have correct value") + require.Contains(t, result, "logIndex", "logIndex field should be present") + require.Equal(t, uint32(3), result["logIndex"], "logIndex should have correct value") }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - result := buildTriggerDataMapFromProtobuf(avsproto.TriggerType_TRIGGER_TYPE_EVENT, test.input) + result := buildTriggerDataMapFromProtobuf(avsproto.TriggerType_TRIGGER_TYPE_EVENT, test.input, nil) require.NotNil(t, result, "buildTriggerDataMapFromProtobuf should never return nil") @@ -325,7 +325,7 @@ func TestBuildTriggerDataMapFromProtobufFieldCompleteness(t *testing.T) { } for _, triggerType := range triggerTypes { - result := buildTriggerDataMapFromProtobuf(triggerType, nil) + result := buildTriggerDataMapFromProtobuf(triggerType, nil, nil) require.Contains(t, result, "type", "All trigger types should include type field") require.Equal(t, triggerType.String(), result["type"], "Type field should match trigger type") } diff --git a/core/taskengine/executor_test.go b/core/taskengine/executor_test.go index 11c14404..849fd022 100644 --- a/core/taskengine/executor_test.go +++ b/core/taskengine/executor_test.go @@ -39,7 +39,7 @@ func TestExecutorRunTaskSucess(t *testing.T) { Id: "a1", Type: "if", // The test data is of this transaction https://sepolia.etherscan.io/tx/0x53beb2163994510e0984b436ebc828dc57e480ee671cfbe7ed52776c2a4830c8 which is 3.45 token - Expression: "Number(triggertest.data.value_formatted) >= 3", + Expression: "Number(triggertest.data.valueFormatted) >= 3", }, }, }, @@ -390,7 +390,7 @@ func TestExecutorRunTaskReturnAllExecutionData(t *testing.T) { { Id: "condition1", Type: "if", - Expression: "Number(triggertest.data.value_formatted) >= 3", + Expression: "Number(triggertest.data.valueFormatted) >= 3", }, }, }, diff --git a/core/taskengine/run_node_immediately.go b/core/taskengine/run_node_immediately.go index f4b586ae..2e449213 100644 --- a/core/taskengine/run_node_immediately.go +++ b/core/taskengine/run_node_immediately.go @@ -720,8 +720,38 @@ func (n *Engine) runProcessingNodeWithInputs(nodeType string, nodeConfig map[str vm.WithLogger(n.logger).WithDb(n.db) // Add input variables to VM for template processing and node access + // Apply dual-access mapping to enable both camelCase and snake_case field access + processedInputVariables := make(map[string]interface{}) for key, value := range inputVariables { - vm.AddVar(key, value) + // Apply dual-access mapping if the value is a map with a "data" field + if valueMap, ok := value.(map[string]interface{}); ok { + if dataField, hasData := valueMap["data"]; hasData { + if dataMap, isDataMap := dataField.(map[string]interface{}); isDataMap { + // Apply dual-access mapping to the data field + dualAccessData := CreateDualAccessMap(dataMap) + // Create a new map with the dual-access data + processedValue := make(map[string]interface{}) + for k, v := range valueMap { + if k == "data" { + processedValue[k] = dualAccessData + } else { + processedValue[k] = v + } + } + vm.AddVar(key, processedValue) + processedInputVariables[key] = processedValue + } else { + vm.AddVar(key, value) + processedInputVariables[key] = value + } + } else { + vm.AddVar(key, value) + processedInputVariables[key] = value + } + } else { + vm.AddVar(key, value) + processedInputVariables[key] = value + } } // Create node from type and config @@ -730,8 +760,8 @@ func (n *Engine) runProcessingNodeWithInputs(nodeType string, nodeConfig map[str return nil, fmt.Errorf("failed to create node: %w", err) } - // Execute the node - executionStep, err := vm.RunNodeWithInputs(node, inputVariables) + // Execute the node with processed input variables + executionStep, err := vm.RunNodeWithInputs(node, processedInputVariables) if err != nil { return nil, fmt.Errorf("node execution failed: %w", err) } @@ -1071,8 +1101,7 @@ func (n *Engine) RunNodeImmediatelyRPC(user *model.User, req *avsproto.RunNodeWi // Log successful execution if n.logger != nil { - n.logger.Info("RunNodeImmediatelyRPC: Executed successfully", "nodeTypeStr", nodeTypeStr, "originalNodeType", req.NodeType, "configKeys", getStringMapKeys(nodeConfig), "inputKeys", getStringMapKeys(inputVariables)) - + n.logger.Info("RunNodeImmediatelyRPC: Executed successfully", "nodeTypeStr", nodeTypeStr, "originalNodeType", req.NodeType) } // Convert result to the appropriate protobuf output type @@ -1572,7 +1601,7 @@ func (n *Engine) RunTriggerRPC(user *model.User, req *avsproto.RunTriggerReq) (* // Log successful execution if n.logger != nil { - n.logger.Info("RunTriggerRPC: Executed successfully", "triggerTypeStr", triggerTypeStr, "originalTriggerType", req.TriggerType, "configKeys", getStringMapKeys(triggerConfig)) + n.logger.Info("RunTriggerRPC: Executed successfully", "triggerTypeStr", triggerTypeStr, "originalTriggerType", req.TriggerType) } // Convert result to the appropriate protobuf output type diff --git a/core/taskengine/smart_variable_resolution_test.go b/core/taskengine/smart_variable_resolution_test.go index 6156410e..4dff09c2 100644 --- a/core/taskengine/smart_variable_resolution_test.go +++ b/core/taskengine/smart_variable_resolution_test.go @@ -288,6 +288,222 @@ func TestSmartVariableResolution(t *testing.T) { }) } +// TestDualAccessVariableSupport tests that both camelCase and snake_case field access +// work for direct JavaScript variable access (not just template variables). +// +// This test verifies that the createDualAccessMap function properly enables: +// 1. Direct JS destructuring: const {tokenSymbol, token_symbol} = eventTrigger.data +// 2. Direct JS property access: eventTrigger.data.tokenSymbol AND eventTrigger.data.token_symbol +// 3. Template variables: {{eventTrigger.data.tokenSymbol}} AND {{eventTrigger.data.token_symbol}} +// +// This solves the original issue where deployed tasks returned NaN/undefined because +// JavaScript code expected camelCase but data had snake_case (or vice versa). +func TestDualAccessVariableSupport(t *testing.T) { + engine := createTestEngineForSmartResolution(t) + + t.Run("DirectJavaScriptVariableAccess", func(t *testing.T) { + // Test that both camelCase and snake_case work in direct JavaScript code + // This simulates the user's original issue with custom code destructuring + + config := map[string]interface{}{ + "lang": "JavaScript", + "source": ` + // Test direct property access (both naming conventions should work) + const tokenSymbolCamel = eventTrigger.data.tokenSymbol; + const tokenSymbolSnake = eventTrigger.data.token_symbol; + const valueFormattedCamel = eventTrigger.data.valueFormatted; + const valueFormattedSnake = eventTrigger.data.value_formatted; + + // Test destructuring (both naming conventions should work) + const {tokenSymbol, token_symbol, valueFormatted, value_formatted} = eventTrigger.data; + + // Return results to verify both work + return { + tokenSymbolCamel, + tokenSymbolSnake, + valueFormattedCamel, + valueFormattedSnake, + destructuredTokenSymbol: tokenSymbol, + destructuredTokenSymbolSnake: token_symbol, + destructuredValueFormatted: valueFormatted, + destructuredValueFormattedSnake: value_formatted + }; + `, + } + + // Simulate trigger data with camelCase field names (as it comes from buildTriggerDataMapFromProtobuf) + inputVariables := map[string]interface{}{ + "eventTrigger": map[string]interface{}{ + "data": map[string]interface{}{ + "tokenSymbol": "USDC", + "valueFormatted": "3.45", + "blockNumber": 12345678, + "fromAddress": "0x1111111111111111", + "toAddress": "0x2222222222222222", + }, + }, + } + + result, err := engine.RunNodeImmediately(NodeTypeCustomCode, config, inputVariables) + + assert.NoError(t, err) + assert.NotNil(t, result) + + // Verify that both camelCase and snake_case access work + assert.Equal(t, "USDC", result["tokenSymbolCamel"]) + assert.Equal(t, "USDC", result["tokenSymbolSnake"]) + assert.Equal(t, "3.45", result["valueFormattedCamel"]) + assert.Equal(t, "3.45", result["valueFormattedSnake"]) + + // Verify destructuring works for both naming conventions + assert.Equal(t, "USDC", result["destructuredTokenSymbol"]) + assert.Equal(t, "USDC", result["destructuredTokenSymbolSnake"]) + assert.Equal(t, "3.45", result["destructuredValueFormatted"]) + assert.Equal(t, "3.45", result["destructuredValueFormattedSnake"]) + }) + + t.Run("NodeOutputDualAccess", func(t *testing.T) { + // Test that node outputs also support dual access + // This ensures the SetOutputVarForStep dual-access mapping works + + // First, create a REST API node that returns data with camelCase fields + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + // Return camelCase field names + fmt.Fprintf(w, `{ + "responseData": "test_response", + "statusCode": 200, + "apiKey": "secret_key_123" + }`) + })) + defer mockServer.Close() + + restConfig := map[string]interface{}{ + "url": mockServer.URL, + "method": "GET", + } + + // Execute REST API node to get output data + restResult, err := engine.RunNodeImmediately(NodeTypeRestAPI, restConfig, map[string]interface{}{}) + assert.NoError(t, err) + assert.NotNil(t, restResult) + + // Now test a custom code node that accesses the REST API output using both naming conventions + customCodeConfig := map[string]interface{}{ + "lang": "JavaScript", + "source": ` + // Test accessing REST API output data using both camelCase and snake_case + const responseDataCamel = apiNode.data.responseData; + const responseDataSnake = apiNode.data.response_data; + const statusCodeCamel = apiNode.data.statusCode; + const statusCodeSnake = apiNode.data.status_code; + const apiKeyCamel = apiNode.data.apiKey; + const apiKeySnake = apiNode.data.api_key; + + return { + responseDataCamel, + responseDataSnake, + statusCodeCamel, + statusCodeSnake, + apiKeyCamel, + apiKeySnake + }; + `, + } + + // Simulate the REST API node output being available to the custom code node + inputVariables := map[string]interface{}{ + "apiNode": map[string]interface{}{ + "data": restResult, // This should have dual-access mapping applied + }, + } + + customResult, err := engine.RunNodeImmediately(NodeTypeCustomCode, customCodeConfig, inputVariables) + + assert.NoError(t, err) + assert.NotNil(t, customResult) + + // Verify that both camelCase and snake_case access work for node outputs + // Note: The actual values depend on the REST API response structure + // We're mainly testing that both naming conventions resolve to the same values + if customResult["responseDataCamel"] != nil { + assert.Equal(t, customResult["responseDataCamel"], customResult["responseDataSnake"]) + } + if customResult["statusCodeCamel"] != nil { + assert.Equal(t, customResult["statusCodeCamel"], customResult["statusCodeSnake"]) + } + if customResult["apiKeyCamel"] != nil { + assert.Equal(t, customResult["apiKeyCamel"], customResult["apiKeySnake"]) + } + }) +} + +// TestDualAccessDebug is a debug test to understand what's happening with dual-access +func TestDualAccessDebug(t *testing.T) { + engine := createTestEngineForSmartResolution(t) + + t.Run("DebugVariableAccess", func(t *testing.T) { + config := map[string]interface{}{ + "lang": "JavaScript", + "source": ` + // Check if eventTrigger exists and return debug info + return { + debug: "test", + eventTriggerExists: typeof eventTrigger !== 'undefined', + eventTriggerData: typeof eventTrigger !== 'undefined' ? eventTrigger.data : null, + eventTriggerType: typeof eventTrigger, + eventTriggerKeys: typeof eventTrigger !== 'undefined' ? Object.keys(eventTrigger) : [], + eventTriggerDataKeys: typeof eventTrigger !== 'undefined' && eventTrigger.data ? Object.keys(eventTrigger.data) : [], + globalKeys: Object.keys(this), + // Try to access specific fields + tokenSymbolDirect: typeof eventTrigger !== 'undefined' && eventTrigger.data ? eventTrigger.data.tokenSymbol : "not_found", + tokenSymbolSnake: typeof eventTrigger !== 'undefined' && eventTrigger.data ? eventTrigger.data.token_symbol : "not_found" + }; + `, + } + + inputVariables := map[string]interface{}{ + "eventTrigger": map[string]interface{}{ + "data": map[string]interface{}{ + "tokenSymbol": "USDC", + "valueFormatted": "3.45", + }, + }, + } + + result, err := engine.RunNodeImmediately(NodeTypeCustomCode, config, inputVariables) + + assert.NoError(t, err) + assert.NotNil(t, result) + + // Print the result for debugging + t.Logf("Debug result: %+v", result) + }) +} + +// TestCreateDualAccessMap tests the CreateDualAccessMap function directly +func TestCreateDualAccessMap(t *testing.T) { + // Test with camelCase input + input := map[string]interface{}{ + "tokenSymbol": "USDC", + "valueFormatted": "3.45", + "blockNumber": 12345, + } + + result := CreateDualAccessMap(input) + + // Should have both camelCase and snake_case versions + assert.Equal(t, "USDC", result["tokenSymbol"]) + assert.Equal(t, "USDC", result["token_symbol"]) + assert.Equal(t, "3.45", result["valueFormatted"]) + assert.Equal(t, "3.45", result["value_formatted"]) + assert.Equal(t, 12345, result["blockNumber"]) + assert.Equal(t, 12345, result["block_number"]) + + t.Logf("CreateDualAccessMap result: %+v", result) +} + // createTestEngineForSmartResolution creates an engine for testing func createTestEngineForSmartResolution(t *testing.T) *Engine { logger := testutil.GetLogger() diff --git a/core/taskengine/trigger/event.go b/core/taskengine/trigger/event.go index 548f79e7..c173e3ba 100644 --- a/core/taskengine/trigger/event.go +++ b/core/taskengine/trigger/event.go @@ -768,7 +768,7 @@ func (t *EventTrigger) buildFilterQueries() []QueryInfo { for taskID, count := range taskQueryCounts { if count > 1 { - t.logger.Warn("⚠️ Task has multiple unique queries - may receive duplicate events", + t.logger.Info("⚠️ Task has multiple unique queries - may receive duplicate events", "task_id", taskID, "unique_query_count", count, "recommendation", "Consider combining queries to reduce duplicates") diff --git a/core/taskengine/trigger_data_flattening_test.go b/core/taskengine/trigger_data_flattening_test.go new file mode 100644 index 00000000..fd61d28a --- /dev/null +++ b/core/taskengine/trigger_data_flattening_test.go @@ -0,0 +1,417 @@ +package taskengine + +import ( + "testing" + + "github.com/AvaProtocol/EigenLayer-AVS/core/testutil" + avsproto "github.com/AvaProtocol/EigenLayer-AVS/protobuf" + "github.com/AvaProtocol/EigenLayer-AVS/storage" + "github.com/stretchr/testify/assert" +) + +// TestBuildTriggerDataMapEventTriggerFlattening tests the specific fix for flattening transfer_log data +// This test verifies that the buildTriggerDataMap function correctly flattens nested transfer_log data +// to the top level, which resolves the NaN and undefined values issue in simulateTask. +func TestBuildTriggerDataMapEventTriggerFlattening(t *testing.T) { + // Test data with nested transfer_log structure (as it comes from runEventTriggerImmediately) + triggerOutput := map[string]interface{}{ + "found": true, + "queriesCount": 2, + "totalSearched": 5000, + "totalEvents": 1, + "transfer_log": map[string]interface{}{ + "tokenName": "USDC", + "tokenSymbol": "USDC", + "tokenDecimals": uint32(6), + "transactionHash": "0x1b0b9bee55e3a824dedd1dcfaad1790e19e0a68d6717e385a960092077f8b6a1", + "address": "0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238", + "blockNumber": uint64(8560047), + "blockTimestamp": uint64(1750061412000), + "fromAddress": "0xc60e71bd0f2e6d8832Fea1a2d56091C48493C788", + "toAddress": "0xfE66125343Aabda4A330DA667431eC1Acb7BbDA9", + "value": "0x00000000000000000000000000000000000000000000000000000000004c4b40", + "valueFormatted": "5", + "transactionIndex": uint32(63), + "logIndex": uint32(83), + }, + } + + // Test buildTriggerDataMap with event trigger + result := buildTriggerDataMap(avsproto.TriggerType_TRIGGER_TYPE_EVENT, triggerOutput) + + // Verify that transfer_log data is flattened to top level + assert.Equal(t, "USDC", result["tokenName"], "tokenName should be at top level") + assert.Equal(t, "USDC", result["tokenSymbol"], "tokenSymbol should be at top level") + assert.Equal(t, uint32(6), result["tokenDecimals"], "tokenDecimals should be at top level") + assert.Equal(t, "0x1b0b9bee55e3a824dedd1dcfaad1790e19e0a68d6717e385a960092077f8b6a1", result["transactionHash"], "transactionHash should be at top level") + assert.Equal(t, "0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238", result["address"], "address should be at top level") + assert.Equal(t, uint64(8560047), result["blockNumber"], "blockNumber should be at top level") + assert.Equal(t, uint64(1750061412000), result["blockTimestamp"], "blockTimestamp should be at top level") + assert.Equal(t, "0xc60e71bd0f2e6d8832Fea1a2d56091C48493C788", result["fromAddress"], "fromAddress should be at top level") + assert.Equal(t, "0xfE66125343Aabda4A330DA667431eC1Acb7BbDA9", result["toAddress"], "toAddress should be at top level") + assert.Equal(t, "0x00000000000000000000000000000000000000000000000000000000004c4b40", result["value"], "value should be at top level") + assert.Equal(t, "5", result["valueFormatted"], "valueFormatted should be at top level") + assert.Equal(t, uint32(63), result["transactionIndex"], "transactionIndex should be at top level") + assert.Equal(t, uint32(83), result["logIndex"], "logIndex should be at top level") + + // Verify that the nested transfer_log object is NOT present at top level + assert.NotContains(t, result, "transfer_log", "transfer_log should not be present as nested object") + + // Test with non-transfer event (should copy all data as-is) + nonTransferOutput := map[string]interface{}{ + "found": true, + "someField": "someValue", + "anotherField": 123, + } + + nonTransferResult := buildTriggerDataMap(avsproto.TriggerType_TRIGGER_TYPE_EVENT, nonTransferOutput) + assert.Equal(t, true, nonTransferResult["found"]) + assert.Equal(t, "someValue", nonTransferResult["someField"]) + assert.Equal(t, 123, nonTransferResult["anotherField"]) +} + +// TestBuildTriggerDataMapFromProtobufConsistency tests that both buildTriggerDataMap and +// buildTriggerDataMapFromProtobuf produce consistent field names for JavaScript access. +func TestBuildTriggerDataMapFromProtobufConsistency(t *testing.T) { + // Create protobuf transfer log data + transferLogProto := &avsproto.EventTrigger_TransferLogOutput{ + TokenName: "USDC", + TokenSymbol: "USDC", + TokenDecimals: 6, + TransactionHash: "0x1b0b9bee55e3a824dedd1dcfaad1790e19e0a68d6717e385a960092077f8b6a1", + Address: "0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238", + BlockNumber: 8560047, + BlockTimestamp: 1750061412000, + FromAddress: "0xc60e71bd0f2e6d8832Fea1a2d56091C48493C788", + ToAddress: "0xfE66125343Aabda4A330DA667431eC1Acb7BbDA9", + Value: "0x00000000000000000000000000000000000000000000000000000000004c4b40", + ValueFormatted: "5", + TransactionIndex: 63, + LogIndex: 83, + } + + eventOutputProto := &avsproto.EventTrigger_Output{ + OutputType: &avsproto.EventTrigger_Output_TransferLog{ + TransferLog: transferLogProto, + }, + } + + // Test buildTriggerDataMapFromProtobuf + protobufResult := buildTriggerDataMapFromProtobuf(avsproto.TriggerType_TRIGGER_TYPE_EVENT, eventOutputProto, nil) + + // Create raw trigger output data (as it would come from runEventTriggerImmediately) + rawTriggerOutput := map[string]interface{}{ + "found": true, + "queriesCount": 2, + "totalSearched": 5000, + "totalEvents": 1, + "transfer_log": map[string]interface{}{ + "tokenName": "USDC", + "tokenSymbol": "USDC", + "tokenDecimals": uint32(6), + "transactionHash": "0x1b0b9bee55e3a824dedd1dcfaad1790e19e0a68d6717e385a960092077f8b6a1", + "address": "0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238", + "blockNumber": uint64(8560047), + "blockTimestamp": uint64(1750061412000), + "fromAddress": "0xc60e71bd0f2e6d8832Fea1a2d56091C48493C788", + "toAddress": "0xfE66125343Aabda4A330DA667431eC1Acb7BbDA9", + "value": "0x00000000000000000000000000000000000000000000000000000000004c4b40", + "valueFormatted": "5", + "transactionIndex": uint32(63), + "logIndex": uint32(83), + }, + } + + // Test buildTriggerDataMap + rawResult := buildTriggerDataMap(avsproto.TriggerType_TRIGGER_TYPE_EVENT, rawTriggerOutput) + + // Verify that both functions produce the same field names for JavaScript access + expectedFields := []string{ + "tokenName", "tokenSymbol", "tokenDecimals", "transactionHash", + "address", "blockNumber", "blockTimestamp", "fromAddress", + "toAddress", "value", "valueFormatted", "transactionIndex", "logIndex", + } + + for _, field := range expectedFields { + // Both results should have the same field names + assert.Contains(t, protobufResult, field, "buildTriggerDataMapFromProtobuf should have field: %s", field) + assert.Contains(t, rawResult, field, "buildTriggerDataMap should have field: %s", field) + + // Both results should have the same values for these fields + assert.Equal(t, protobufResult[field], rawResult[field], "Field %s should have same value in both results", field) + } + + // Verify that neither result has the nested transfer_log structure + assert.NotContains(t, protobufResult, "transfer_log", "buildTriggerDataMapFromProtobuf should not have nested transfer_log") + assert.NotContains(t, rawResult, "transfer_log", "buildTriggerDataMap should not have nested transfer_log") +} + +// TestJavaScriptFieldAccessPattern tests that the field names work correctly for JavaScript destructuring +func TestJavaScriptFieldAccessPattern(t *testing.T) { + // This test simulates the JavaScript destructuring pattern used in the client code: + // const { tokenSymbol, valueFormatted, fromAddress, toAddress, blockTimestamp } = eventTrigger.data; + + triggerOutput := map[string]interface{}{ + "found": true, + "transfer_log": map[string]interface{}{ + "tokenSymbol": "USDC", + "valueFormatted": "5", + "fromAddress": "0xc60e71bd0f2e6d8832Fea1a2d56091C48493C788", + "toAddress": "0xfE66125343Aabda4A330DA667431eC1Acb7BbDA9", + "blockTimestamp": uint64(1750061412000), + }, + } + + result := buildTriggerDataMap(avsproto.TriggerType_TRIGGER_TYPE_EVENT, triggerOutput) + + // Simulate JavaScript destructuring - these fields should all be available at top level + tokenSymbol, hasTokenSymbol := result["tokenSymbol"] + valueFormatted, hasValueFormatted := result["valueFormatted"] + fromAddress, hasFromAddress := result["fromAddress"] + toAddress, hasToAddress := result["toAddress"] + blockTimestamp, hasBlockTimestamp := result["blockTimestamp"] + + // All fields should be present + assert.True(t, hasTokenSymbol, "tokenSymbol should be available for JavaScript destructuring") + assert.True(t, hasValueFormatted, "valueFormatted should be available for JavaScript destructuring") + assert.True(t, hasFromAddress, "fromAddress should be available for JavaScript destructuring") + assert.True(t, hasToAddress, "toAddress should be available for JavaScript destructuring") + assert.True(t, hasBlockTimestamp, "blockTimestamp should be available for JavaScript destructuring") + + // All fields should have the correct values (not NaN or undefined) + assert.Equal(t, "USDC", tokenSymbol, "tokenSymbol should not be undefined") + assert.Equal(t, "5", valueFormatted, "valueFormatted should not be undefined") + assert.Equal(t, "0xc60e71bd0f2e6d8832Fea1a2d56091C48493C788", fromAddress, "fromAddress should not be undefined") + assert.Equal(t, "0xfE66125343Aabda4A330DA667431eC1Acb7BbDA9", toAddress, "toAddress should not be undefined") + assert.Equal(t, uint64(1750061412000), blockTimestamp, "blockTimestamp should not be NaN") + + t.Logf("✅ All JavaScript destructuring fields are available:") + t.Logf(" tokenSymbol: %v", tokenSymbol) + t.Logf(" valueFormatted: %v", valueFormatted) + t.Logf(" fromAddress: %v", fromAddress) + t.Logf(" toAddress: %v", toAddress) + t.Logf(" blockTimestamp: %v", blockTimestamp) +} + +// TestFallbackVariableResolutionConsistency tests that the fallback variable resolution +// (where subsequent nodes can use tokenName to match token_name from previous nodes) +// works consistently across all three execution paths: +// 1. run_node_immediately (RunNodeImmediatelyRPC) +// 2. simulateTask (SimulateTask) +// 3. runTask (actual task execution) +func TestFallbackVariableResolutionConsistency(t *testing.T) { + db := testutil.TestMustDB() + defer storage.Destroy(db.(*storage.BadgerStorage)) + + config := testutil.GetAggregatorConfig() + engine := New(db, config, nil, testutil.GetLogger()) + + // Test data with snake_case field names (as would come from protobuf/gRPC) + testData := map[string]interface{}{ + "token_name": "USDC", + "token_symbol": "USDC", + "token_decimals": 6, + "block_number": 12345, + "tx_hash": "0x123abc", + } + + // Create input variables that include a node with snake_case data + inputVariables := map[string]interface{}{ + "eventTrigger": map[string]interface{}{ + "data": testData, + }, + } + + // Test custom code that tries to access both camelCase and snake_case versions + customCodeConfig := map[string]interface{}{ + "source": ` + // Test accessing snake_case (original) + const tokenNameSnake = eventTrigger.data.token_name; + const tokenSymbolSnake = eventTrigger.data.token_symbol; + const tokenDecimalsSnake = eventTrigger.data.token_decimals; + + // Test accessing camelCase (fallback) + const tokenNameCamel = eventTrigger.data.tokenName; + const tokenSymbolCamel = eventTrigger.data.tokenSymbol; + const tokenDecimalsCamel = eventTrigger.data.tokenDecimals; + + return { + snake_case_access: { + token_name: tokenNameSnake, + token_symbol: tokenSymbolSnake, + token_decimals: tokenDecimalsSnake + }, + camel_case_access: { + tokenName: tokenNameCamel, + tokenSymbol: tokenSymbolCamel, + tokenDecimals: tokenDecimalsCamel + }, + both_should_work: tokenNameSnake === tokenNameCamel && tokenSymbolSnake === tokenSymbolCamel + }; + `, + } + + t.Run("RunNodeImmediately", func(t *testing.T) { + result, err := engine.RunNodeImmediately("customCode", customCodeConfig, inputVariables) + assert.NoError(t, err) + assert.NotNil(t, result) + + // Verify both access patterns work + assert.Equal(t, "USDC", result["snake_case_access"].(map[string]interface{})["token_name"]) + assert.Equal(t, "USDC", result["camel_case_access"].(map[string]interface{})["tokenName"]) + assert.True(t, result["both_should_work"].(bool), "Both snake_case and camelCase access should return the same values") + }) + + t.Run("SimulateTask", func(t *testing.T) { + // Create a simple task with manual trigger and custom code node + trigger := &avsproto.TaskTrigger{ + Id: "trigger1", + Name: "manualTrigger", + Type: avsproto.TriggerType_TRIGGER_TYPE_MANUAL, + TriggerType: &avsproto.TaskTrigger_Manual{Manual: true}, + } + + nodes := []*avsproto.TaskNode{ + { + Id: "node1", + Name: "testCustomCode", + TaskType: &avsproto.TaskNode_CustomCode{ + CustomCode: &avsproto.CustomCodeNode{ + Config: &avsproto.CustomCodeNode_Config{ + Source: customCodeConfig["source"].(string), + }, + }, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + { + Id: "edge1", + Source: "trigger1", + Target: "node1", + }, + } + + user := testutil.TestUser1() + execution, err := engine.SimulateTask(user, trigger, nodes, edges, inputVariables) + assert.NoError(t, err) + assert.NotNil(t, execution) + assert.True(t, execution.Success, "Simulation should succeed") + + // Find the custom code step + var customCodeStep *avsproto.Execution_Step + for _, step := range execution.Steps { + if step.Type == avsproto.NodeType_NODE_TYPE_CUSTOM_CODE.String() { + customCodeStep = step + break + } + } + assert.NotNil(t, customCodeStep, "Should find custom code execution step") + assert.True(t, customCodeStep.Success, "Custom code step should succeed") + + // Extract the result from the custom code output + customCodeOutput := customCodeStep.GetCustomCode() + assert.NotNil(t, customCodeOutput) + result := customCodeOutput.Data.AsInterface().(map[string]interface{}) + + // Verify both access patterns work + assert.Equal(t, "USDC", result["snake_case_access"].(map[string]interface{})["token_name"]) + assert.Equal(t, "USDC", result["camel_case_access"].(map[string]interface{})["tokenName"]) + assert.True(t, result["both_should_work"].(bool), "Both snake_case and camelCase access should return the same values") + }) + + // Note: Testing actual runTask would require setting up a full task in storage and queue system, + // which is more complex. The key insight is that runTask uses the same VM and preprocessing + // infrastructure as SimulateTask, so if SimulateTask works, runTask should work too. + // The main difference was in the branch node preprocessing, which we've now fixed. +} + +// TestBranchNodeFallbackResolution specifically tests that branch nodes can use +// the fallback variable resolution in their condition expressions +func TestBranchNodeFallbackResolution(t *testing.T) { + db := testutil.TestMustDB() + defer storage.Destroy(db.(*storage.BadgerStorage)) + + config := testutil.GetAggregatorConfig() + engine := New(db, config, nil, testutil.GetLogger()) + + // Test data with snake_case field names + testData := map[string]interface{}{ + "token_name": "USDC", + "token_amount": 1000, + } + + inputVariables := map[string]interface{}{ + "eventTrigger": map[string]interface{}{ + "data": testData, + }, + } + + // Test branch node that uses camelCase in condition (should fallback to snake_case) + branchConfig := map[string]interface{}{ + "conditions": []map[string]interface{}{ + { + "id": "condition1", + "type": "if", + "expression": "eventTrigger.data.tokenName === 'USDC' && eventTrigger.data.tokenAmount > 500", + }, + }, + } + + t.Run("BranchNodeFallbackResolution", func(t *testing.T) { + result, err := engine.RunNodeImmediately("branch", branchConfig, inputVariables) + assert.NoError(t, err) + assert.NotNil(t, result) + if result != nil && result["success"] != nil { + assert.True(t, result["success"].(bool), "Branch condition should evaluate to true using fallback resolution") + } + }) +} + +// TestContractReadFallbackResolution tests that contract read nodes can use +// fallback variable resolution in their configuration +func TestContractReadFallbackResolution(t *testing.T) { + db := testutil.TestMustDB() + defer storage.Destroy(db.(*storage.BadgerStorage)) + + config := testutil.GetAggregatorConfig() + engine := New(db, config, nil, testutil.GetLogger()) + + // Test data with snake_case field names + testData := map[string]interface{}{ + "contract_address": "0x1234567890123456789012345678901234567890", + } + + inputVariables := map[string]interface{}{ + "eventTrigger": map[string]interface{}{ + "data": testData, + }, + } + + // Test contract read that uses camelCase template (should fallback to snake_case) + contractReadConfig := map[string]interface{}{ + "contractAddress": "{{eventTrigger.data.contractAddress}}", // camelCase template + "contractAbi": `[{"inputs":[],"name":"decimals","outputs":[{"internalType":"uint8","name":"","type":"uint8"}],"stateMutability":"view","type":"function"}]`, + "methodCalls": []interface{}{ + map[string]interface{}{ + "methodName": "decimals", + "callData": "0x313ce567", + }, + }, + } + + t.Run("ContractReadFallbackResolution", func(t *testing.T) { + // This test will fail with RPC connection error, but we can check that the preprocessing worked + // by examining the error message - it should contain the resolved address, not the template + result, err := engine.RunNodeImmediately("contractRead", contractReadConfig, inputVariables) + + // We expect an error due to RPC connection, but the address should be resolved + assert.Error(t, err) + assert.Contains(t, err.Error(), "0x1234567890123456789012345678901234567890", + "Error should contain the resolved contract address, indicating template preprocessing worked") + + // The result might be nil due to the RPC error, which is expected + _ = result + }) +} diff --git a/core/taskengine/vm.go b/core/taskengine/vm.go index bf829d44..f8270771 100644 --- a/core/taskengine/vm.go +++ b/core/taskengine/vm.go @@ -59,8 +59,17 @@ func (c *CommonProcessor) SetOutputVarForStep(stepID string, data any) { if c.vm.vars == nil { c.vm.vars = make(map[string]any) } + + // Apply dual-access mapping to node output data if it's a map + // This enables both camelCase and snake_case field access for node outputs + // Example: apiNode.data.responseData AND apiNode.data.response_data both work + var processedData any = data + if dataMap, ok := data.(map[string]interface{}); ok { + processedData = CreateDualAccessMap(dataMap) + } + c.vm.vars[nodeNameVar] = map[string]any{ - "data": data, + "data": processedData, } } @@ -312,12 +321,17 @@ func NewVMWithDataAndTransferLog(task *model.Task, triggerData *TriggerData, sma } // Use shared function to build trigger data map from the TransferLog protobuf - triggerDataMap = buildTriggerDataMapFromProtobuf(avsproto.TriggerType_TRIGGER_TYPE_EVENT, v.parsedTriggerData.Event) + triggerDataMap = buildTriggerDataMapFromProtobuf(avsproto.TriggerType_TRIGGER_TYPE_EVENT, v.parsedTriggerData.Event, v.logger) } else { // Use shared function to build trigger data map from protobuf trigger outputs - triggerDataMap = buildTriggerDataMapFromProtobuf(triggerData.Type, triggerData.Output) + triggerDataMap = buildTriggerDataMapFromProtobuf(triggerData.Type, triggerData.Output, v.logger) } - v.AddVar(triggerNameStd, map[string]any{"data": triggerDataMap}) + + // Create dual-access map to support both camelCase and snake_case field access + // This enables both template fallback ({{trigger.data.token_symbol}}) and direct JS access + // (const {tokenSymbol} = eventTrigger.data AND const {token_symbol} = eventTrigger.data) + dualAccessTriggerData := CreateDualAccessMap(triggerDataMap) + v.AddVar(triggerNameStd, map[string]any{"data": dualAccessTriggerData}) } } else if task != nil { // Fallback if triggerData is nil but task is not triggerNameStd, err := v.GetTriggerNameAsVar() @@ -1856,9 +1870,26 @@ func CreateNodeFromType(nodeType string, config map[string]interface{}, nodeID s branchConfig := &avsproto.BranchNode_Config{} // Handle conditions from client - conditionsData, ok := config["conditions"].([]interface{}) - if !ok || len(conditionsData) == 0 { - return nil, fmt.Errorf("branch node requires conditions configuration") + conditionsValue, exists := config["conditions"] + if !exists { + return nil, fmt.Errorf("branch node requires conditions configuration - no conditions field found") + } + + conditionsData, ok := conditionsValue.([]interface{}) + if !ok { + // Try to convert from []map[string]interface{} to []interface{} + if conditionsSlice, ok := conditionsValue.([]map[string]interface{}); ok { + conditionsData = make([]interface{}, len(conditionsSlice)) + for i, condition := range conditionsSlice { + conditionsData[i] = condition + } + } else { + return nil, fmt.Errorf("branch node requires conditions configuration - invalid type: %T", conditionsValue) + } + } + + if len(conditionsData) == 0 { + return nil, fmt.Errorf("branch node requires conditions configuration - empty conditions array") } conditions := make([]*avsproto.BranchNode_Condition, len(conditionsData)) @@ -2126,3 +2157,59 @@ func (v *VM) AnalyzeExecutionResult() (bool, string, int) { return false, errorMessage, failedCount } + +// CreateDualAccessMap creates a map with both camelCase and snake_case field names +// pointing to the same values. This enables JavaScript code to access fields using +// either naming convention, providing fallback support for direct variable access. +// +// Example: +// +// input: {"tokenSymbol": "USDC", "blockNumber": 123} +// output: {"tokenSymbol": "USDC", "token_symbol": "USDC", "blockNumber": 123, "block_number": 123} +// +// This solves the issue where: +// - Templates use fallback: {{trigger.data.token_symbol}} -> tries token_symbol, then tokenSymbol +// - Direct JS access needs both: const {tokenSymbol} = data AND const {token_symbol} = data +func CreateDualAccessMap(data map[string]interface{}) map[string]interface{} { + if data == nil { + return nil + } + + result := make(map[string]interface{}) + + // First, copy all original fields + for key, value := range data { + result[key] = value + } + + // Then, add the alternate naming convention for each field + for key, value := range data { + // Check if the key contains underscores (snake_case) + if strings.Contains(key, "_") { + // Convert snake_case to camelCase and add it + camelKey := convertToCamelCase(key) + if camelKey != key && result[camelKey] == nil { + result[camelKey] = value + } + } else { + // Check if the key contains uppercase letters (camelCase) + hasCamelCase := false + for _, r := range key { + if unicode.IsUpper(r) { + hasCamelCase = true + break + } + } + + if hasCamelCase { + // Convert camelCase to snake_case and add it + snakeKey := convertToSnakeCase(key) + if snakeKey != key && result[snakeKey] == nil { + result[snakeKey] = value + } + } + } + } + + return result +} diff --git a/core/taskengine/vm_runner_branch.go b/core/taskengine/vm_runner_branch.go index a670e50a..e14f39cd 100644 --- a/core/taskengine/vm_runner_branch.go +++ b/core/taskengine/vm_runner_branch.go @@ -179,12 +179,7 @@ func (r *BranchProcessor) Execute(stepID string, node *avsproto.BranchNode) (*av } // Preprocess the expression using the VM's current variable context - processedExpression := condition.Expression - if strings.Contains(processedExpression, "{{") { - processedExpression = r.vm.preprocessText(processedExpression) - } else { - processedExpression = r.vm.preprocessTextWithVariableMapping(condition.Expression) - } + processedExpression := r.vm.preprocessTextWithVariableMapping(condition.Expression) log.WriteString("Processed expression for '" + condition.Id + "': " + processedExpression + "\n") // Check if expression became empty after preprocessing (indicates variable resolution failure) diff --git a/core/taskengine/vm_runner_contract_read.go b/core/taskengine/vm_runner_contract_read.go index 791cdf0b..78724c7d 100644 --- a/core/taskengine/vm_runner_contract_read.go +++ b/core/taskengine/vm_runner_contract_read.go @@ -90,7 +90,11 @@ func (r *ContractReadProcessor) buildStructuredData(method *abi.Method, result [ // executeMethodCall executes a single method call and returns the result func (r *ContractReadProcessor) executeMethodCall(ctx context.Context, contractAbi *abi.ABI, contractAddress common.Address, methodCall *avsproto.ContractReadNode_MethodCall) *avsproto.ContractReadNode_MethodResult { - calldata := common.FromHex(methodCall.CallData) + // Preprocess template variables in method call data + preprocessedCallData := r.vm.preprocessTextWithVariableMapping(methodCall.CallData) + methodName := r.vm.preprocessTextWithVariableMapping(methodCall.MethodName) + + calldata := common.FromHex(preprocessedCallData) msg := ethereum.CallMsg{ To: &contractAddress, Data: calldata, @@ -102,7 +106,7 @@ func (r *ContractReadProcessor) executeMethodCall(ctx context.Context, contractA return &avsproto.ContractReadNode_MethodResult{ Success: false, Error: fmt.Sprintf("contract call failed: %v", err), - MethodName: methodCall.MethodName, + MethodName: methodName, Data: []*avsproto.ContractReadNode_MethodResult_StructuredField{}, } } @@ -123,7 +127,7 @@ func (r *ContractReadProcessor) executeMethodCall(ctx context.Context, contractA "calldata", fmt.Sprintf("0x%x", calldata), "output_length", len(output), "output_hex", fmt.Sprintf("0x%x", output), - "method_name", methodCall.MethodName, + "method_name", methodName, ) } @@ -133,17 +137,17 @@ func (r *ContractReadProcessor) executeMethodCall(ctx context.Context, contractA return &avsproto.ContractReadNode_MethodResult{ Success: false, Error: fmt.Sprintf("failed to detect method from ABI: %v", err), - MethodName: methodCall.MethodName, + MethodName: methodName, Data: []*avsproto.ContractReadNode_MethodResult_StructuredField{}, } } // Validate that the provided methodName matches the actual method detected from callData - if method.Name != methodCall.MethodName { + if method.Name != methodName { return &avsproto.ContractReadNode_MethodResult{ Success: false, - Error: fmt.Sprintf("method name mismatch: callData corresponds to '%s' but methodName is '%s'. Please verify the function selector matches the intended method", method.Name, methodCall.MethodName), - MethodName: methodCall.MethodName, + Error: fmt.Sprintf("method name mismatch: callData corresponds to '%s' but methodName is '%s'. Please verify the function selector matches the intended method", method.Name, methodName), + MethodName: methodName, Data: []*avsproto.ContractReadNode_MethodResult_StructuredField{}, } } @@ -250,14 +254,18 @@ func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractRe return s, err } + // Preprocess template variables in configuration + contractAddress := r.vm.preprocessTextWithVariableMapping(config.ContractAddress) + contractAbi := r.vm.preprocessTextWithVariableMapping(config.ContractAbi) + // Parse the ABI - parsedABI, err := abi.JSON(strings.NewReader(config.ContractAbi)) + parsedABI, err := abi.JSON(strings.NewReader(contractAbi)) if err != nil { err = fmt.Errorf("failed to parse ABI: %w", err) return s, err } - contractAddr := common.HexToAddress(config.ContractAddress) + contractAddr := common.HexToAddress(contractAddress) var results []*avsproto.ContractReadNode_MethodResult // Execute each method call serially diff --git a/core/taskengine/vm_runner_eth_transfer.go b/core/taskengine/vm_runner_eth_transfer.go index 09a0f580..40eb5096 100644 --- a/core/taskengine/vm_runner_eth_transfer.go +++ b/core/taskengine/vm_runner_eth_transfer.go @@ -56,8 +56,9 @@ func (p *ETHTransferProcessor) Execute(stepID string, node *avsproto.ETHTransfer return executionLog, err } - destination := config.GetDestination() - amountStr := config.GetAmount() + // Preprocess template variables in configuration + destination := p.vm.preprocessTextWithVariableMapping(config.GetDestination()) + amountStr := p.vm.preprocessTextWithVariableMapping(config.GetAmount()) if destination == "" { err := fmt.Errorf("destination address is required for ETH transfer") diff --git a/core/taskengine/vm_secrets_test.go b/core/taskengine/vm_secrets_test.go index 203689b0..f7eeaef1 100644 --- a/core/taskengine/vm_secrets_test.go +++ b/core/taskengine/vm_secrets_test.go @@ -54,7 +54,11 @@ func TestSecretAccessPath(t *testing.T) { // Test 3: Verify configVars exists in apContext configVars, exists := apContextMap[ConfigVarsPath] if !exists { - t.Errorf("configVars not found in apContext, available keys: %v", getMapKeys(apContextMap)) + var keys []string + for k := range apContextMap { + keys = append(keys, k) + } + t.Errorf("configVars not found in apContext, available keys: %v", keys) return } @@ -62,7 +66,11 @@ func TestSecretAccessPath(t *testing.T) { for key, expectedValue := range testSecrets { actualValue, exists := configVars[key] if !exists { - t.Errorf("Secret key '%s' not found in configVars, available keys: %v", key, getMapKeys(configVars)) + var keys []string + for k := range configVars { + keys = append(keys, k) + } + t.Errorf("Secret key '%s' not found in configVars, available keys: %v", key, keys) continue } if actualValue != expectedValue { @@ -225,20 +233,19 @@ func TestCollectInputsIncludesSecrets(t *testing.T) { // Verify that apContext.configVars is included if _, exists := inputs[APContextConfigVarsPath]; !exists { - t.Errorf("apContext.configVars not found in CollectInputs output, available keys: %v", getMapKeys(inputs)) + var keys []string + for k := range inputs { + keys = append(keys, k) + } + t.Errorf("apContext.configVars not found in CollectInputs output, available keys: %v", keys) } // Verify that test_var.data is included if _, exists := inputs["test_var.data"]; !exists { - t.Errorf("test_var.data not found in CollectInputs output, available keys: %v", getMapKeys(inputs)) - } -} - -// Helper function to get map keys for debugging -func getMapKeys[K comparable, V any](m map[K]V) []K { - keys := make([]K, 0, len(m)) - for k := range m { - keys = append(keys, k) + var keys []string + for k := range inputs { + keys = append(keys, k) + } + t.Errorf("test_var.data not found in CollectInputs output, available keys: %v", keys) } - return keys }