diff --git a/.gitignore b/.gitignore index 62578291..2f056650 100644 --- a/.gitignore +++ b/.gitignore @@ -32,5 +32,13 @@ alias-ecdsa.key.json keys/ +# BadgerDB database files (runtime data) +cmd/data/ +**/badger/ +*.vlog +DISCARD +KEYREGISTRY +MANIFEST + # Plugin files for semantic release .semrel/ \ No newline at end of file diff --git a/cmd/data/badger/000001.vlog b/cmd/data/badger/000001.vlog deleted file mode 100644 index 91b0b36e..00000000 Binary files a/cmd/data/badger/000001.vlog and /dev/null differ diff --git a/cmd/data/badger/000002.vlog b/cmd/data/badger/000002.vlog deleted file mode 100644 index c9ae3e20..00000000 Binary files a/cmd/data/badger/000002.vlog and /dev/null differ diff --git a/cmd/data/badger/DISCARD b/cmd/data/badger/DISCARD deleted file mode 100644 index 9e0f96a2..00000000 Binary files a/cmd/data/badger/DISCARD and /dev/null differ diff --git a/cmd/data/badger/KEYREGISTRY b/cmd/data/badger/KEYREGISTRY deleted file mode 100644 index 13312d58..00000000 --- a/cmd/data/badger/KEYREGISTRY +++ /dev/null @@ -1 +0,0 @@ -ฆ ฎnqฏูmŸช๔ิHello Badger \ No newline at end of file diff --git a/cmd/data/badger/MANIFEST b/cmd/data/badger/MANIFEST deleted file mode 100644 index c12ad2b6..00000000 Binary files a/cmd/data/badger/MANIFEST and /dev/null differ diff --git a/core/taskengine/engine.go b/core/taskengine/engine.go index 2ee238ff..1fa8a133 100644 --- a/core/taskengine/engine.go +++ b/core/taskengine/engine.go @@ -628,8 +628,9 @@ func (n *Engine) CreateTask(user *model.User, taskPayload *avsproto.CreateTaskRe n.tasks[task.Id] = task n.lock.Unlock() - // Notify operators about the new task - n.notifyOperatorsTaskOperation(task.Id, avsproto.MessageOp_MonitorTaskTrigger) + // Note: MonitorTaskTrigger notifications are handled by StreamCheckToOperator + // which sends complete task metadata. The batched notification system is only + // for CancelTask/DeleteTask operations that don't need complete metadata. // Log successful task creation with final counts n.logger.Info("โœ… CreateTask completed successfully", @@ -1062,6 +1063,15 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av // notifyOperatorsTaskOperation queues notifications for batched sending to operators // This method is non-blocking and batches notifications for efficiency func (n *Engine) notifyOperatorsTaskOperation(taskID string, operation avsproto.MessageOp) { + // MonitorTaskTrigger should not use batched notifications as it requires complete task metadata + if operation == avsproto.MessageOp_MonitorTaskTrigger { + n.logger.Warn("โŒ MonitorTaskTrigger should not be sent via batched notifications", + "task_id", taskID, + "operation", operation.String(), + "solution", "MonitorTaskTrigger is handled by StreamCheckToOperator with complete metadata") + return + } + n.notificationMutex.Lock() defer n.notificationMutex.Unlock() diff --git a/core/taskengine/trigger/block.go b/core/taskengine/trigger/block.go index 5bcaf79e..a4b42215 100644 --- a/core/taskengine/trigger/block.go +++ b/core/taskengine/trigger/block.go @@ -25,7 +25,12 @@ type TriggerMetadata[T any] struct { type BlockTrigger struct { *CommonTrigger - schedule map[int64]map[string]bool + // New unified registry (preferred) + registry *TaskRegistry + + // Legacy schedule map (for backward compatibility) + schedule map[int64]map[string]bool + legacyMode bool // Smart monitoring state minInterval int64 // Minimum interval across all tasks @@ -50,7 +55,9 @@ func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64], logger logger: logger, mu: sync.Mutex{}, }, + registry: NewTaskRegistry(), schedule: make(map[int64]map[string]bool), + legacyMode: false, // Start in new mode minInterval: 0, subscriptionActive: false, triggerCh: triggerCh, @@ -70,17 +77,81 @@ func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64], logger return &b } +// ensureNewFormat ensures we're using the new TaskRegistry format +// This provides automatic conversion from legacy schedule map format +func (b *BlockTrigger) ensureNewFormat() { + if b.legacyMode { + b.logger.Info("๐Ÿ”„ Converting from legacy schedule map format to new TaskRegistry format") + + // Convert existing data + b.convertFromScheduleMap() + + // Clear old data + b.schedule = make(map[int64]map[string]bool) + b.legacyMode = false + + b.logger.Info("โœ… Successfully converted to new TaskRegistry format", + "task_count", b.registry.GetBlockTaskCount()) + } +} + +// ensureLegacyConversion consolidates legacy data detection and conversion +// This helper function eliminates code duplication across methods +func (b *BlockTrigger) ensureLegacyConversion() { + b.detectLegacyData() + b.ensureNewFormat() +} + +// detectLegacyData checks if we have data in the old format +func (b *BlockTrigger) detectLegacyData() { + hasLegacyData := len(b.schedule) > 0 + + if hasLegacyData && b.registry.GetBlockTaskCount() == 0 { + b.legacyMode = true + b.logger.Info("๐Ÿ” Detected legacy schedule map data - will convert on next operation") + } +} + +// convertFromScheduleMap converts old schedule map data to the new TaskRegistry +// This provides backward compatibility during migration +func (b *BlockTrigger) convertFromScheduleMap() { + for interval, tasks := range b.schedule { + for taskID := range tasks { + // Create BlockTaskData for the new registry + blockData := &BlockTaskData{ + Interval: interval, + } + + // We don't have the original TaskMetadata in the old format, + // so we'll create a minimal one for compatibility + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: taskID, + } + + // Add to new registry + b.registry.AddTask(taskID, taskMetadata, nil, blockData, nil) + + b.logger.Debug("๐Ÿ”„ Converted legacy block task", + "task_id", taskID, + "interval", interval) + } + } +} + // calculateMinInterval finds the minimum interval among all registered tasks func (b *BlockTrigger) calculateMinInterval() int64 { + // Auto-convert from legacy format if needed + b.ensureLegacyConversion() + minInterval := int64(0) - for interval := range b.schedule { - if len(b.schedule[interval]) > 0 { // Only consider intervals with active tasks - if minInterval == 0 || interval < minInterval { - minInterval = interval - } + b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool { + interval := entry.BlockData.Interval + if minInterval == 0 || interval < minInterval { + minInterval = interval } - } + return true // Continue iteration + }) return minInterval } @@ -96,6 +167,9 @@ func (b *BlockTrigger) shouldCheckAtBlock(blockNumber int64) bool { } func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error { + // Auto-convert from legacy format if needed + b.ensureLegacyConversion() + b.mu.Lock() defer b.mu.Unlock() @@ -106,15 +180,27 @@ func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e return fmt.Errorf("invalid block trigger interval %d for task %s: interval must be greater than 0", interval, check.TaskId) } - // Add task to schedule - if _, ok := b.schedule[interval]; !ok { - b.schedule[interval] = map[string]bool{ - check.TaskId: true, + // Create BlockTaskData for the new registry + blockData := &BlockTaskData{ + Interval: interval, + } + + // Add to new registry + b.registry.AddTask(check.TaskId, check, nil, blockData, nil) + + // Count tasks with the same interval for logging + tasksWithSameInterval := 0 + b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool { + if entry.BlockData.Interval == interval { + tasksWithSameInterval++ } + return true + }) + + if tasksWithSameInterval == 1 { b.logger.Info("๐Ÿ“ฆ First task registered for block interval", "interval", interval, "task_id", check.TaskId) } else { - b.schedule[interval][check.TaskId] = true - b.logger.Debug("๐Ÿ“ฆ Additional task registered for block interval", "interval", interval, "task_id", check.TaskId, "total_tasks_for_interval", len(b.schedule[interval])) + b.logger.Debug("๐Ÿ“ฆ Additional task registered for block interval", "interval", interval, "task_id", check.TaskId, "total_tasks_for_interval", tasksWithSameInterval) } // Recalculate minimum interval @@ -124,17 +210,22 @@ func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e oldMinInterval := b.minInterval b.minInterval = newMinInterval + // Count unique intervals for logging + uniqueIntervals := make(map[int64]bool) + b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool { + uniqueIntervals[entry.BlockData.Interval] = true + return true + }) + b.logger.Info("๐Ÿ”„ Block monitoring minimum interval updated", "old_min_interval", oldMinInterval, "new_min_interval", newMinInterval, - "total_intervals", len(b.schedule)) + "total_intervals", len(uniqueIntervals)) // Log active intervals for debugging - activeIntervals := make([]int64, 0) - for interval, tasks := range b.schedule { - if len(tasks) > 0 { - activeIntervals = append(activeIntervals, interval) - } + activeIntervals := make([]int64, 0, len(uniqueIntervals)) + for interval := range uniqueIntervals { + activeIntervals = append(activeIntervals, interval) } b.logger.Debug("๐Ÿ“Š Active block intervals", "intervals", activeIntervals, "min_interval", newMinInterval) } @@ -143,24 +234,39 @@ func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e } func (b *BlockTrigger) RemoveCheck(taskID string) error { + // Auto-convert from legacy format if needed + b.ensureLegacyConversion() + b.mu.Lock() defer b.mu.Unlock() removedFromInterval := int64(0) - for interval, tasks := range b.schedule { - if _, exists := tasks[taskID]; exists { - delete(b.schedule[interval], taskID) - removedFromInterval = interval + // Get the task before removing it to know which interval it was from + if entry, exists := b.registry.GetTask(taskID); exists && entry.BlockData != nil { + removedFromInterval = entry.BlockData.Interval - b.logger.Debug("๐Ÿ—‘๏ธ Removed block task", "task_id", taskID, "interval", interval, "remaining_tasks_for_interval", len(tasks)-1) + // Count remaining tasks with the same interval before removal + remainingTasksForInterval := 0 + b.registry.RangeBlockTasks(func(tID string, e *TaskEntry) bool { + if e.BlockData.Interval == removedFromInterval && tID != taskID { + remainingTasksForInterval++ + } + return true + }) + + // Remove from new registry + removed := b.registry.RemoveTask(taskID) + + if removed { + b.logger.Debug("๐Ÿ—‘๏ธ Removed block task", "task_id", taskID, "interval", removedFromInterval, "remaining_tasks_for_interval", remainingTasksForInterval) - // Clean up empty intervals - if len(b.schedule[interval]) == 0 { - delete(b.schedule, interval) - b.logger.Info("๐Ÿงน Cleaned up empty block interval", "interval", interval) + // Check if this was the last task for this interval + if remainingTasksForInterval == 0 { + b.logger.Info("๐Ÿงน Cleaned up empty block interval", "interval", removedFromInterval) } - break + } else { + b.logger.Debug("๐Ÿคท Task not found for removal", "task_id", taskID) } } @@ -171,11 +277,18 @@ func (b *BlockTrigger) RemoveCheck(taskID string) error { oldMinInterval := b.minInterval b.minInterval = newMinInterval + // Count remaining intervals + uniqueIntervals := make(map[int64]bool) + b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool { + uniqueIntervals[entry.BlockData.Interval] = true + return true + }) + b.logger.Info("๐Ÿ”„ Block monitoring minimum interval updated after removal", "removed_from_interval", removedFromInterval, "old_min_interval", oldMinInterval, "new_min_interval", newMinInterval, - "remaining_intervals", len(b.schedule)) + "remaining_intervals", len(uniqueIntervals)) // If no more tasks, we'll stop subscription in the main loop if newMinInterval == 0 { @@ -306,15 +419,14 @@ func (b *BlockTrigger) processBlock(header *types.Header) { "block_hash", header.Hash().Hex(), "min_interval", minInterval) - b.mu.RLock() - scheduleSnapshot := make(map[int64]map[string]bool) - for interval, tasks := range b.schedule { - scheduleSnapshot[interval] = make(map[string]bool) - for taskID := range tasks { - scheduleSnapshot[interval][taskID] = true - } - } - b.mu.RUnlock() + // Create a snapshot of tasks grouped by interval for processing + intervalTasksMap := make(map[int64][]string) + + b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool { + interval := entry.BlockData.Interval + intervalTasksMap[interval] = append(intervalTasksMap[interval], taskID) + return true + }) triggeredTasks := 0 @@ -323,7 +435,7 @@ func (b *BlockTrigger) processBlock(header *types.Header) { remainder := new(big.Int) // Check all intervals to see which ones should trigger at this block - for interval, tasks := range scheduleSnapshot { + for interval, tasks := range intervalTasksMap { if len(tasks) == 0 { continue } @@ -331,8 +443,8 @@ func (b *BlockTrigger) processBlock(header *types.Header) { // Check if this interval should trigger at this block number // Using checkpoint system: interval should trigger when blockNumber % interval == 0 intervalBig.SetInt64(interval) - if remainder.Mod(header.Number, intervalBig).Cmp(zero) == 0 { - for taskID := range tasks { + if remainder.Mod(header.Number, intervalBig).Cmp(big.NewInt(0)) == 0 { + for _, taskID := range tasks { b.triggerCh <- TriggerMetadata[int64]{ TaskID: taskID, Marker: blockNumber, diff --git a/core/taskengine/trigger/block_test.go b/core/taskengine/trigger/block_test.go index 482b0846..d58d1b8f 100644 --- a/core/taskengine/trigger/block_test.go +++ b/core/taskengine/trigger/block_test.go @@ -134,15 +134,21 @@ func TestBlockTrigger_AddCheck_ValidInterval(t *testing.T) { t.Errorf("AddCheck() unexpected error for valid interval: %v", err) } - // Verify the task was added to the schedule - blockTrigger.mu.Lock() - defer blockTrigger.mu.Unlock() + // Verify the task was added to the registry + if blockTrigger.registry.GetBlockTaskCount() != 1 { + t.Error("Expected 1 block task in registry") + } + + entry, exists := blockTrigger.registry.GetTask("test-task-valid") + if !exists { + t.Error("Task was not found in registry") + } - if _, exists := blockTrigger.schedule[10]; !exists { - t.Error("Task was not added to schedule for interval 10") + if entry.BlockData == nil { + t.Error("Task does not have BlockData") } - if _, exists := blockTrigger.schedule[10]["test-task-valid"]; !exists { - t.Error("Task ID was not found in schedule for interval 10") + if entry.BlockData.Interval != 10 { + t.Errorf("Expected interval 10, got %d", entry.BlockData.Interval) } } diff --git a/core/taskengine/trigger/event.go b/core/taskengine/trigger/event.go index f812ab39..3b680cfc 100644 --- a/core/taskengine/trigger/event.go +++ b/core/taskengine/trigger/event.go @@ -50,7 +50,12 @@ type QueryInfo struct { type EventTrigger struct { *CommonTrigger - checks sync.Map + // New unified registry (preferred) + registry *TaskRegistry + + // Legacy sync.Map (for backward compatibility) + checks sync.Map + legacyMode bool // channel that we will push the trigger information back triggerCh chan TriggerMetadata[EventMark] @@ -97,8 +102,10 @@ func NewEventTrigger(o *RpcOption, triggerCh chan TriggerMetadata[EventMark], lo logger: logger, }, + registry: NewTaskRegistry(), triggerCh: triggerCh, checks: sync.Map{}, + legacyMode: false, // Start in new mode subscriptions: make([]SubscriptionInfo, 0), updateSubsCh: make(chan struct{}, 1), eventCounts: make(map[string]map[uint64]uint32), @@ -125,7 +132,49 @@ func (t *EventTrigger) SetOverloadAlertCallback(callback func(alert *avsproto.Ev t.onOverloadAlert = callback } +// ensureNewFormat ensures we're using the new TaskRegistry format +// This provides automatic conversion from legacy sync.Map format +func (t *EventTrigger) ensureNewFormat() { + if t.legacyMode { + t.logger.Info("๐Ÿ”„ Converting from legacy sync.Map format to new TaskRegistry format") + + // Convert existing data + t.registry.ConvertFromSyncMap(&t.checks) + + // Clear old data + t.checks = sync.Map{} + t.legacyMode = false + + t.logger.Info("โœ… Successfully converted to new TaskRegistry format", + "task_count", t.registry.GetEventTaskCount()) + } +} + +// ensureLegacyConversion consolidates legacy data detection and conversion +// This helper function eliminates code duplication across methods +func (t *EventTrigger) ensureLegacyConversion() { + t.detectLegacyData() + t.ensureNewFormat() +} + +// detectLegacyData checks if we have data in the old format +func (t *EventTrigger) detectLegacyData() { + hasLegacyData := false + t.checks.Range(func(key, value interface{}) bool { + hasLegacyData = true + return false // Stop after finding first item + }) + + if hasLegacyData && t.registry.GetEventTaskCount() == 0 { + t.legacyMode = true + t.logger.Info("๐Ÿ” Detected legacy sync.Map data - will convert on next operation") + } +} + func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error { + // Auto-convert from legacy format if needed + t.ensureLegacyConversion() + sTrigger := check.GetTrigger() if sTrigger == nil { return fmt.Errorf("trigger not found from sync message") @@ -143,10 +192,10 @@ func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e return fmt.Errorf("no queries found in event trigger config for task %s", taskID) } - c := &Check{ - TaskMetadata: check, - Queries: queries, - ParsedABIs: make(map[int]*abi.ABI), + // Create EventTaskData for the new registry + eventData := &EventTaskData{ + Queries: queries, + ParsedABIs: make(map[int]*abi.ABI), } // Pre-parse ABIs for queries that have conditions to avoid repeated parsing @@ -161,7 +210,7 @@ func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e "query_index", i, "error", err) } else { - c.ParsedABIs[i] = &parsedABI + eventData.ParsedABIs[i] = &parsedABI t.logger.Debug("โœ… Pre-parsed ABI for conditional filtering", "task_id", taskID, "query_index", i, @@ -176,7 +225,8 @@ func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e } } - t.checks.Store(taskID, c) + // Add to new registry + t.registry.AddTask(taskID, check, eventData, nil, nil) // Initialize event counts for this task t.eventCountsMutex.Lock() @@ -208,17 +258,15 @@ func (t *EventTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e } func (t *EventTrigger) RemoveCheck(id string) error { - // Clean up cached ABIs before removing the check - if checkValue, exists := t.checks.Load(id); exists { - if check, ok := checkValue.(*Check); ok { - // Clear cached ABIs to free memory - for queryIndex := range check.ParsedABIs { - delete(check.ParsedABIs, queryIndex) - } - } - } + // Auto-convert from legacy format if needed + t.ensureLegacyConversion() - t.checks.Delete(id) + // Remove from new registry (handles cleanup automatically) + removed := t.registry.RemoveTask(id) + + if !removed { + t.logger.Debug("๐Ÿคท Task not found for removal", "task_id", id) + } // Clean up event counts for this task t.eventCountsMutex.Lock() @@ -508,10 +556,9 @@ func (t *EventTrigger) checkEventSafety(log types.Log) bool { queryIndex = subInfo.queryIndex // Get safety limits for this task - if check, exists := t.checks.Load(matchingTaskID); exists { - checkObj := check.(Check) - if queryIndex < len(checkObj.Queries) { - maxEventsPerBlock = checkObj.Queries[queryIndex].GetMaxEventsPerBlock() + if entry, exists := t.registry.GetTask(matchingTaskID); exists && entry.EventData != nil { + if queryIndex < len(entry.EventData.Queries) { + maxEventsPerBlock = entry.EventData.Queries[queryIndex].GetMaxEventsPerBlock() } } break @@ -639,11 +686,8 @@ func (t *EventTrigger) processLogInternal(log types.Log) error { var triggeredTasks []string // Check all registered tasks to see which ones match this log - t.checks.Range(func(key any, value any) bool { - taskID := key.(string) - check := value.(Check) - - if t.logMatchesTask(log, &check) { + t.registry.RangeEventTasks(func(taskID string, entry *TaskEntry) bool { + if t.logMatchesTaskEntry(log, entry) { triggeredTasks = append(triggeredTasks, taskID) // Send trigger notification @@ -682,9 +726,30 @@ func (t *EventTrigger) processLogInternal(log types.Log) error { } // logMatchesTask checks if a log matches any of the queries for a specific task +// logMatchesTaskEntry checks if a log matches a task entry (new format) +func (t *EventTrigger) logMatchesTaskEntry(log types.Log, entry *TaskEntry) bool { + if entry.EventData == nil { + return false + } + + for queryIndex, query := range entry.EventData.Queries { + if t.logMatchesEventQuery(log, query, entry.EventData, queryIndex) { + return true + } + } + return false +} + +// logMatchesTask checks if a log matches a task check (legacy format - kept for compatibility) func (t *EventTrigger) logMatchesTask(log types.Log, check *Check) bool { + // Convert to EventTaskData for compatibility + eventData := &EventTaskData{ + Queries: check.Queries, + ParsedABIs: check.ParsedABIs, + } + for i, query := range check.Queries { - if t.logMatchesEventQuery(log, query, check, i) { + if t.logMatchesEventQuery(log, query, eventData, i) { return true } } @@ -692,7 +757,8 @@ func (t *EventTrigger) logMatchesTask(log types.Log, check *Check) bool { } // logMatchesEventQuery checks if a log matches a specific EventTrigger_Query -func (t *EventTrigger) logMatchesEventQuery(log types.Log, query *avsproto.EventTrigger_Query, check *Check, queryIndex int) bool { +// logMatchesEventQuery checks if a log matches a specific event query (works with both old and new formats) +func (t *EventTrigger) logMatchesEventQuery(log types.Log, query *avsproto.EventTrigger_Query, eventData *EventTaskData, queryIndex int) bool { // Check addresses addresses := query.GetAddresses() if len(addresses) > 0 { @@ -737,13 +803,40 @@ func (t *EventTrigger) logMatchesEventQuery(log types.Log, query *avsproto.Event // NEW: Evaluate conditional filtering if conditions are provided conditions := query.GetConditions() if len(conditions) > 0 { - return t.evaluateEventConditions(log, query, conditions, check, queryIndex) + return t.evaluateEventConditionsWithEventData(log, query, conditions, eventData, queryIndex) } return true } -// evaluateEventConditions checks if a log matches the provided ABI-based conditions +// evaluateEventConditionsWithEventData checks if a log matches the provided ABI-based conditions (new format) +func (t *EventTrigger) evaluateEventConditionsWithEventData(log types.Log, query *avsproto.EventTrigger_Query, conditions []*avsproto.EventCondition, eventData *EventTaskData, queryIndex int) bool { + // Use cached ABI if available, otherwise parse it (fallback for backward compatibility) + var contractABI *abi.ABI + if cachedABI, exists := eventData.ParsedABIs[queryIndex]; exists && cachedABI != nil { + contractABI = cachedABI + t.logger.Debug("๐Ÿš€ Using cached ABI for conditional filtering", "query_index", queryIndex) + } else { + // Fallback: parse ABI on-demand (this should rarely happen with the new caching) + abiString := query.GetContractAbi() + if abiString == "" { + t.logger.Warn("๐Ÿšซ Conditional filtering requires contract ABI but none provided") + return false + } + + parsedABI, err := abi.JSON(strings.NewReader(abiString)) + if err != nil { + t.logger.Error("โŒ Failed to parse contract ABI for conditional filtering", "error", err) + return false + } + contractABI = &parsedABI + t.logger.Debug("โš ๏ธ Parsed ABI on-demand (consider pre-parsing for better performance)", "query_index", queryIndex) + } + + return t.evaluateEventConditionsCommon(log, query, conditions, contractABI, queryIndex) +} + +// evaluateEventConditions checks if a log matches the provided ABI-based conditions (legacy format) func (t *EventTrigger) evaluateEventConditions(log types.Log, query *avsproto.EventTrigger_Query, conditions []*avsproto.EventCondition, check *Check, queryIndex int) bool { // Use cached ABI if available, otherwise parse it (fallback for backward compatibility) var contractABI *abi.ABI @@ -767,6 +860,11 @@ func (t *EventTrigger) evaluateEventConditions(log types.Log, query *avsproto.Ev t.logger.Debug("โš ๏ธ Parsed ABI on-demand (consider pre-parsing for better performance)", "query_index", queryIndex) } + return t.evaluateEventConditionsCommon(log, query, conditions, contractABI, queryIndex) +} + +// evaluateEventConditionsCommon contains the shared logic for both legacy and new formats +func (t *EventTrigger) evaluateEventConditionsCommon(log types.Log, query *avsproto.EventTrigger_Query, conditions []*avsproto.EventCondition, contractABI *abi.ABI, queryIndex int) bool { // Find the matching event in ABI using the first topic (event signature) if len(log.Topics) == 0 { t.logger.Debug("๐Ÿšซ Log has no topics, cannot match event signature") @@ -1109,15 +1207,16 @@ func getMapKeys(m map[string]interface{}) []string { func (t *EventTrigger) buildFilterQueries() []QueryInfo { var allQueries []QueryInfo - t.checks.Range(func(key any, value any) bool { - taskID := key.(string) - check := value.(*Check) + // Auto-convert from legacy format if needed + t.ensureLegacyConversion() + + t.registry.RangeEventTasks(func(taskID string, entry *TaskEntry) bool { // Group queries by their filter criteria to identify duplicates/overlaps queryGroups := make(map[string][]int) // queryKey -> []queryIndex // Convert each EventTrigger_Query to ethereum.FilterQuery and group by criteria - for i, query := range check.Queries { + for i, query := range entry.EventData.Queries { ethQuery := t.convertToFilterQuery(query) // Create a unique key for this query's filter criteria @@ -1129,14 +1228,14 @@ func (t *EventTrigger) buildFilterQueries() []QueryInfo { for _, queryIndices := range queryGroups { // Use the first query as the base (they should all be identical) baseQueryIndex := queryIndices[0] - baseQuery := check.Queries[baseQueryIndex] + baseQuery := entry.EventData.Queries[baseQueryIndex] ethQuery := t.convertToFilterQuery(baseQuery) // Find the maximum maxEventsPerBlock across all identical queries maxEventsPerBlock := baseQuery.GetMaxEventsPerBlock() for _, idx := range queryIndices[1:] { - if check.Queries[idx].GetMaxEventsPerBlock() > maxEventsPerBlock { - maxEventsPerBlock = check.Queries[idx].GetMaxEventsPerBlock() + if entry.EventData.Queries[idx].GetMaxEventsPerBlock() > maxEventsPerBlock { + maxEventsPerBlock = entry.EventData.Queries[idx].GetMaxEventsPerBlock() } } diff --git a/core/taskengine/trigger/event_conditional_test.go b/core/taskengine/trigger/event_conditional_test.go index 72cceeef..ed4ff0a3 100644 --- a/core/taskengine/trigger/event_conditional_test.go +++ b/core/taskengine/trigger/event_conditional_test.go @@ -135,8 +135,14 @@ func TestEventTriggerConditionalFiltering(t *testing.T) { ParsedABIs: make(map[int]*abi.ABI), } + // Convert Check to EventTaskData for the new API + eventData := &EventTaskData{ + Queries: check.Queries, + ParsedABIs: check.ParsedABIs, + } + // Test the conditional filtering - result := eventTrigger.logMatchesEventQuery(mockLog, query, check, 0) + result := eventTrigger.logMatchesEventQuery(mockLog, query, eventData, 0) if tc.shouldTrigger { assert.True(t, result, "Expected condition to match for: %s", tc.description) @@ -191,13 +197,19 @@ func TestEventTriggerMultipleConditions(t *testing.T) { ParsedABIs: make(map[int]*abi.ABI), } - result := eventTrigger.logMatchesEventQuery(mockLog, query, check, 0) + // Convert Check to EventTaskData for the new API + eventData := &EventTaskData{ + Queries: check.Queries, + ParsedABIs: check.ParsedABIs, + } + + result := eventTrigger.logMatchesEventQuery(mockLog, query, eventData, 0) assert.True(t, result, "Multiple conditions should pass (price > $2000 AND round > 10000)") t.Logf("โœ… Multiple conditions test passed") // Test case where one condition fails query.Conditions[1].Value = "20000" // Round > 20000 (should fail) - result = eventTrigger.logMatchesEventQuery(mockLog, query, check, 0) + result = eventTrigger.logMatchesEventQuery(mockLog, query, eventData, 0) assert.False(t, result, "Should fail when one condition doesn't match") t.Logf("๐Ÿšซ Multiple conditions correctly rejected when one fails") } @@ -230,7 +242,13 @@ func TestEventTriggerWithoutConditions(t *testing.T) { ParsedABIs: make(map[int]*abi.ABI), } - result := eventTrigger.logMatchesEventQuery(mockLog, query, check, 0) + // Convert Check to EventTaskData for the new API + eventData := &EventTaskData{ + Queries: check.Queries, + ParsedABIs: check.ParsedABIs, + } + + result := eventTrigger.logMatchesEventQuery(mockLog, query, eventData, 0) assert.True(t, result, "Should match when no conditions are specified (backward compatibility)") t.Logf("โœ… Backward compatibility test passed") } @@ -271,7 +289,13 @@ func TestEventTriggerInvalidABI(t *testing.T) { ParsedABIs: make(map[int]*abi.ABI), } - result := eventTrigger.logMatchesEventQuery(mockLog, query, check, 0) + // Convert Check to EventTaskData for the new API + eventData := &EventTaskData{ + Queries: check.Queries, + ParsedABIs: check.ParsedABIs, + } + + result := eventTrigger.logMatchesEventQuery(mockLog, query, eventData, 0) assert.False(t, result, "Should fail gracefully with invalid ABI") t.Logf("โœ… Invalid ABI handled gracefully") } @@ -447,8 +471,14 @@ func TestSignedIntegerConditions(t *testing.T) { ParsedABIs: make(map[int]*abi.ABI), } + // Convert Check to EventTaskData for the new API + eventData := &EventTaskData{ + Queries: check.Queries, + ParsedABIs: check.ParsedABIs, + } + // Test the condition evaluation - result := eventTrigger.logMatchesEventQuery(mockLog, query, check, 0) + result := eventTrigger.logMatchesEventQuery(mockLog, query, eventData, 0) if tc.shouldMatch { assert.True(t, result, "Expected condition to match for: %s", tc.description) diff --git a/core/taskengine/trigger/event_test.go b/core/taskengine/trigger/event_test.go index f3c6a451..5dc6719d 100644 --- a/core/taskengine/trigger/event_test.go +++ b/core/taskengine/trigger/event_test.go @@ -35,7 +35,9 @@ func (m *MockLogger) With(tags ...any) sdklogging.Logger { return m } func TestBuildFilterQueriesOptimization(t *testing.T) { // Create a mock event trigger with proper initialization trigger := &EventTrigger{ - checks: sync.Map{}, + registry: NewTaskRegistry(), + checks: sync.Map{}, + legacyMode: false, CommonTrigger: &CommonTrigger{ done: make(chan bool), shutdown: false, @@ -121,7 +123,9 @@ func TestBuildFilterQueriesOptimization(t *testing.T) { func TestBuildFilterQueriesDistinguishFromTo(t *testing.T) { // Create a mock event trigger with proper initialization trigger := &EventTrigger{ - checks: sync.Map{}, + registry: NewTaskRegistry(), + checks: sync.Map{}, + legacyMode: false, CommonTrigger: &CommonTrigger{ done: make(chan bool), shutdown: false, diff --git a/core/taskengine/trigger/registry.go b/core/taskengine/trigger/registry.go new file mode 100644 index 00000000..5cd89b73 --- /dev/null +++ b/core/taskengine/trigger/registry.go @@ -0,0 +1,251 @@ +package trigger + +import ( + "sync" + "time" + + avsproto "github.com/AvaProtocol/EigenLayer-AVS/protobuf" + "github.com/ethereum/go-ethereum/accounts/abi" + gocron "github.com/go-co-op/gocron/v2" +) + +// TaskRegistry provides a unified, type-safe way to manage tasks across all trigger types +type TaskRegistry struct { + mu sync.RWMutex + tasks map[string]*TaskEntry +} + +// TaskEntry represents a single task with trigger-specific data +type TaskEntry struct { + TaskID string + TaskMetadata *avsproto.SyncMessagesResp_TaskMetadata + + // Trigger-specific data (only one will be populated based on trigger type) + EventData *EventTaskData + BlockData *BlockTaskData + TimeData *TimeTaskData + + CreatedAt time.Time + UpdatedAt time.Time +} + +// EventTaskData contains event trigger specific information +type EventTaskData struct { + Queries []*avsproto.EventTrigger_Query + ParsedABIs map[int]*abi.ABI // queryIndex -> parsed ABI +} + +// BlockTaskData contains block trigger specific information +type BlockTaskData struct { + Interval int64 +} + +// TimeTaskData contains time trigger specific data +type TimeTaskData struct { + Job gocron.Job // The scheduled job + Schedules []string // Original cron schedules + Epochs []int64 // Original epochs for fixed time triggers +} + +// NewTaskRegistry creates a new unified task registry +func NewTaskRegistry() *TaskRegistry { + return &TaskRegistry{ + tasks: make(map[string]*TaskEntry), + } +} + +// AddTask adds a new task to the registry +func (r *TaskRegistry) AddTask(taskID string, taskMetadata *avsproto.SyncMessagesResp_TaskMetadata, eventData *EventTaskData, blockData *BlockTaskData, timeData *TimeTaskData) { + r.mu.Lock() + defer r.mu.Unlock() + + now := time.Now() + entry := &TaskEntry{ + TaskID: taskID, + TaskMetadata: taskMetadata, + EventData: eventData, + BlockData: blockData, + TimeData: timeData, + CreatedAt: now, + UpdatedAt: now, + } + + r.tasks[taskID] = entry +} + +// GetTask retrieves a task by ID +func (r *TaskRegistry) GetTask(taskID string) (*TaskEntry, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + entry, exists := r.tasks[taskID] + return entry, exists +} + +// RemoveTask removes a task from the registry +func (r *TaskRegistry) RemoveTask(taskID string) bool { + r.mu.Lock() + defer r.mu.Unlock() + + if entry, exists := r.tasks[taskID]; exists { + // Clean up EventData if present + if entry.EventData != nil { + // Clear cached ABIs to free memory + for queryIndex := range entry.EventData.ParsedABIs { + delete(entry.EventData.ParsedABIs, queryIndex) + } + } + + delete(r.tasks, taskID) + return true + } + return false +} + +// ListTasks returns all task IDs +func (r *TaskRegistry) ListTasks() []string { + r.mu.RLock() + defer r.mu.RUnlock() + + taskIDs := make([]string, 0, len(r.tasks)) + for taskID := range r.tasks { + taskIDs = append(taskIDs, taskID) + } + return taskIDs +} + +// RangeEventTasks iterates over all tasks with EventData +func (r *TaskRegistry) RangeEventTasks(fn func(taskID string, entry *TaskEntry) bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + for taskID, entry := range r.tasks { + if entry.EventData != nil { + if !fn(taskID, entry) { + break + } + } + } +} + +// GetEventTaskCount returns the number of tasks with EventData +func (r *TaskRegistry) GetEventTaskCount() int { + r.mu.RLock() + defer r.mu.RUnlock() + + count := 0 + for _, entry := range r.tasks { + if entry.EventData != nil { + count++ + } + } + return count +} + +// GetBlockTaskCount returns the number of tasks with BlockData +func (r *TaskRegistry) GetBlockTaskCount() int { + r.mu.RLock() + defer r.mu.RUnlock() + + count := 0 + for _, entry := range r.tasks { + if entry.BlockData != nil { + count++ + } + } + return count +} + +// RangeBlockTasks iterates over all tasks with BlockData +func (r *TaskRegistry) RangeBlockTasks(fn func(taskID string, entry *TaskEntry) bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + for taskID, entry := range r.tasks { + if entry.BlockData != nil { + if !fn(taskID, entry) { + break + } + } + } +} + +// GetTimeTaskCount returns the number of tasks with TimeData +func (r *TaskRegistry) GetTimeTaskCount() int { + r.mu.RLock() + defer r.mu.RUnlock() + + count := 0 + for _, entry := range r.tasks { + if entry.TimeData != nil { + count++ + } + } + return count +} + +// RangeTimeTasks iterates over all tasks with TimeData +func (r *TaskRegistry) RangeTimeTasks(fn func(taskID string, entry *TaskEntry) bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + for taskID, entry := range r.tasks { + if entry.TimeData != nil { + if !fn(taskID, entry) { + break + } + } + } +} + +// Legacy conversion functions for backward compatibility + +// ConvertFromSyncMap converts old sync.Map data to the new TaskRegistry +// This provides backward compatibility during migration +func (r *TaskRegistry) ConvertFromSyncMap(oldChecks *sync.Map) { + r.mu.Lock() + defer r.mu.Unlock() + + oldChecks.Range(func(key, value interface{}) bool { + taskID, ok := key.(string) + if !ok { + return true // Skip invalid keys + } + + // Handle both *Check and Check types for maximum compatibility + var check *Check + switch v := value.(type) { + case *Check: + check = v + case Check: + check = &v + default: + return true // Skip invalid values + } + + // Convert to new format + eventData := &EventTaskData{ + Queries: check.Queries, + ParsedABIs: check.ParsedABIs, + } + + now := time.Now() + entry := &TaskEntry{ + TaskID: taskID, + TaskMetadata: check.TaskMetadata, + EventData: eventData, + CreatedAt: now, + UpdatedAt: now, + } + + r.tasks[taskID] = entry + return true + }) +} + +// Check if we need to convert from old format +func (r *TaskRegistry) NeedsConversion() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.tasks) == 0 +} diff --git a/core/taskengine/trigger/registry_test.go b/core/taskengine/trigger/registry_test.go new file mode 100644 index 00000000..15681295 --- /dev/null +++ b/core/taskengine/trigger/registry_test.go @@ -0,0 +1,226 @@ +package trigger + +import ( + "fmt" + "sync" + "testing" + + avsproto "github.com/AvaProtocol/EigenLayer-AVS/protobuf" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTaskRegistry_ConvertFromSyncMap(t *testing.T) { + registry := NewTaskRegistry() + + // Create a legacy sync.Map with test data + oldChecks := &sync.Map{} + + // Create test task metadata + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: "test-task-1", + } + + // Create test queries + queries := []*avsproto.EventTrigger_Query{ + { + Addresses: []string{"0x1234567890123456789012345678901234567890"}, + Topics: []*avsproto.EventTrigger_Topics{ + {Values: []string{"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"}}, + }, + }, + } + + // Create legacy Check struct + legacyCheck := &Check{ + TaskMetadata: taskMetadata, + Queries: queries, + ParsedABIs: make(map[int]*abi.ABI), + } + + // Store in old format + oldChecks.Store("test-task-1", legacyCheck) + + // Test conversion + registry.ConvertFromSyncMap(oldChecks) + + // Verify conversion worked + entry, exists := registry.GetTask("test-task-1") + require.True(t, exists, "Task should exist after conversion") + require.NotNil(t, entry.EventData, "EventData should be populated") + + assert.Equal(t, "test-task-1", entry.TaskID) + assert.Equal(t, taskMetadata, entry.TaskMetadata) + assert.Equal(t, queries, entry.EventData.Queries) + assert.NotNil(t, entry.EventData.ParsedABIs) + assert.NotZero(t, entry.CreatedAt) + assert.NotZero(t, entry.UpdatedAt) +} + +func TestTaskRegistry_ConvertFromSyncMap_HandlesBothPointerAndValue(t *testing.T) { + registry := NewTaskRegistry() + oldChecks := &sync.Map{} + + // Test with pointer type (*Check) + taskMetadata1 := &avsproto.SyncMessagesResp_TaskMetadata{TaskId: "task-1"} + check1 := &Check{ + TaskMetadata: taskMetadata1, + Queries: []*avsproto.EventTrigger_Query{}, + ParsedABIs: make(map[int]*abi.ABI), + } + oldChecks.Store("task-1", check1) // Store pointer + + // Test with value type (Check) + taskMetadata2 := &avsproto.SyncMessagesResp_TaskMetadata{TaskId: "task-2"} + check2 := Check{ + TaskMetadata: taskMetadata2, + Queries: []*avsproto.EventTrigger_Query{}, + ParsedABIs: make(map[int]*abi.ABI), + } + oldChecks.Store("task-2", check2) // Store value + + // Convert + registry.ConvertFromSyncMap(oldChecks) + + // Both should be converted successfully + entry1, exists1 := registry.GetTask("task-1") + assert.True(t, exists1) + assert.Equal(t, "task-1", entry1.TaskID) + + entry2, exists2 := registry.GetTask("task-2") + assert.True(t, exists2) + assert.Equal(t, "task-2", entry2.TaskID) +} + +func TestTaskRegistry_ConvertFromSyncMap_SkipsInvalidData(t *testing.T) { + registry := NewTaskRegistry() + oldChecks := &sync.Map{} + + // Add invalid key type + oldChecks.Store(123, &Check{}) + + // Add invalid value type + oldChecks.Store("invalid-value", "not-a-check") + + // Add valid data + validCheck := &Check{ + TaskMetadata: &avsproto.SyncMessagesResp_TaskMetadata{TaskId: "valid-task"}, + Queries: []*avsproto.EventTrigger_Query{}, + ParsedABIs: make(map[int]*abi.ABI), + } + oldChecks.Store("valid-task", validCheck) + + // Convert + registry.ConvertFromSyncMap(oldChecks) + + // Only valid data should be converted + assert.Equal(t, 1, registry.GetEventTaskCount()) + + entry, exists := registry.GetTask("valid-task") + assert.True(t, exists) + assert.Equal(t, "valid-task", entry.TaskID) +} + +func TestTaskRegistry_BasicOperations(t *testing.T) { + registry := NewTaskRegistry() + + // Test empty registry + assert.Equal(t, 0, registry.GetEventTaskCount()) + assert.Equal(t, []string{}, registry.ListTasks()) + + // Add a task + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{TaskId: "test-task"} + eventData := &EventTaskData{ + Queries: []*avsproto.EventTrigger_Query{}, + ParsedABIs: make(map[int]*abi.ABI), + } + + registry.AddTask("test-task", taskMetadata, eventData, nil, nil) + + // Verify task was added + assert.Equal(t, 1, registry.GetEventTaskCount()) + assert.Equal(t, []string{"test-task"}, registry.ListTasks()) + + entry, exists := registry.GetTask("test-task") + assert.True(t, exists) + assert.Equal(t, "test-task", entry.TaskID) + assert.Equal(t, taskMetadata, entry.TaskMetadata) + assert.Equal(t, eventData, entry.EventData) + + // Remove task + removed := registry.RemoveTask("test-task") + assert.True(t, removed) + assert.Equal(t, 0, registry.GetEventTaskCount()) + assert.Equal(t, []string{}, registry.ListTasks()) + + // Try to remove non-existent task + removed = registry.RemoveTask("non-existent") + assert.False(t, removed) +} + +func TestTaskRegistry_RangeEventTasks(t *testing.T) { + registry := NewTaskRegistry() + + // Add mixed task types + eventData := &EventTaskData{ + Queries: []*avsproto.EventTrigger_Query{}, + ParsedABIs: make(map[int]*abi.ABI), + } + blockData := &BlockTaskData{Interval: 10} + + registry.AddTask("event-task", &avsproto.SyncMessagesResp_TaskMetadata{TaskId: "event-task"}, eventData, nil, nil) + registry.AddTask("block-task", &avsproto.SyncMessagesResp_TaskMetadata{TaskId: "block-task"}, nil, blockData, nil) + registry.AddTask("event-task-2", &avsproto.SyncMessagesResp_TaskMetadata{TaskId: "event-task-2"}, eventData, nil, nil) + + // Range should only return event tasks + var eventTasks []string + registry.RangeEventTasks(func(taskID string, entry *TaskEntry) bool { + eventTasks = append(eventTasks, taskID) + return true + }) + + assert.Equal(t, 2, len(eventTasks)) + assert.Contains(t, eventTasks, "event-task") + assert.Contains(t, eventTasks, "event-task-2") + assert.NotContains(t, eventTasks, "block-task") +} + +func TestTaskRegistry_ThreadSafety(t *testing.T) { + registry := NewTaskRegistry() + + // Test concurrent access + done := make(chan bool) + + // Writer goroutine + go func() { + for i := 0; i < 100; i++ { + taskID := fmt.Sprintf("task-%d", i) + eventData := &EventTaskData{ + Queries: []*avsproto.EventTrigger_Query{}, + ParsedABIs: make(map[int]*abi.ABI), + } + registry.AddTask(taskID, &avsproto.SyncMessagesResp_TaskMetadata{TaskId: taskID}, eventData, nil, nil) + } + done <- true + }() + + // Reader goroutine + go func() { + for i := 0; i < 100; i++ { + registry.GetEventTaskCount() + registry.ListTasks() + registry.RangeEventTasks(func(taskID string, entry *TaskEntry) bool { + return true + }) + } + done <- true + }() + + // Wait for both goroutines + <-done + <-done + + // Verify final state + assert.Equal(t, 100, registry.GetEventTaskCount()) +} diff --git a/core/taskengine/trigger/time.go b/core/taskengine/trigger/time.go index d673a796..532b23ee 100644 --- a/core/taskengine/trigger/time.go +++ b/core/taskengine/trigger/time.go @@ -14,8 +14,13 @@ import ( type TimeTrigger struct { *CommonTrigger - scheduler gocron.Scheduler - jobs map[string]gocron.Job // map taskID to job for removal + // New unified registry (preferred) + registry *TaskRegistry + + // Legacy job management (for backward compatibility) + scheduler gocron.Scheduler + jobs map[string]gocron.Job // map taskID to job for removal + legacyMode bool // channel that we will push the trigger information back triggerCh chan TriggerMetadata[uint64] @@ -35,25 +40,92 @@ func NewTimeTrigger(triggerCh chan TriggerMetadata[uint64], logger sdklogging.Lo logger: logger, mu: sync.Mutex{}, }, - scheduler: scheduler, - jobs: make(map[string]gocron.Job), - triggerCh: triggerCh, + registry: NewTaskRegistry(), + scheduler: scheduler, + jobs: make(map[string]gocron.Job), + legacyMode: false, // Start in new mode + triggerCh: triggerCh, } return &t } +// ensureNewFormat ensures we're using the new TaskRegistry format +// This provides automatic conversion from legacy jobs map format +func (t *TimeTrigger) ensureNewFormat() { + if t.legacyMode { + t.logger.Info("๐Ÿ”„ Converting from legacy jobs map format to new TaskRegistry format") + + // Convert existing data + t.convertFromJobsMap() + + // Clear old data + t.jobs = make(map[string]gocron.Job) + t.legacyMode = false + + t.logger.Info("โœ… Successfully converted to new TaskRegistry format", + "task_count", t.registry.GetTimeTaskCount()) + } +} + +// ensureLegacyConversion consolidates legacy data detection and conversion +// This helper function eliminates code duplication across methods +func (t *TimeTrigger) ensureLegacyConversion() { + t.detectLegacyData() + t.ensureNewFormat() +} + +// detectLegacyData checks if we have data in the old format +func (t *TimeTrigger) detectLegacyData() { + hasLegacyData := len(t.jobs) > 0 + + if hasLegacyData && t.registry.GetTimeTaskCount() == 0 { + t.legacyMode = true + t.logger.Info("๐Ÿ” Detected legacy jobs map data - will convert on next operation") + } +} + +// convertFromJobsMap converts old jobs map data to the new TaskRegistry +// This provides backward compatibility during migration +func (t *TimeTrigger) convertFromJobsMap() { + for taskID, job := range t.jobs { + // Create TimeTaskData for the new registry + timeData := &TimeTaskData{ + Job: job, + Schedules: []string{}, // We can't recover original schedules from job + Epochs: []int64{}, // We can't recover original epochs from job + } + + // We don't have the original TaskMetadata in the old format, + // so we'll create a minimal one for compatibility + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: taskID, + } + + // Add to new registry + t.registry.AddTask(taskID, taskMetadata, nil, nil, timeData) + + t.logger.Debug("๐Ÿ”„ Converted legacy time task", + "task_id", taskID) + } +} + func (t *TimeTrigger) epochToCron(epoch int64) string { - // Convert epoch to time - tm := time.Unix(epoch/1000, 0) - // Create cron expression for specific time - return fmt.Sprintf("%d %d %d %d %d *", tm.Minute(), tm.Hour(), tm.Day(), tm.Month(), tm.Weekday()) + // Convert epoch to time in UTC + tm := time.Unix(epoch/1000, 0).UTC() + // Create cron expression for specific time (minute hour day month *) + // Standard cron format: minute hour day month dayofweek + // But gocron expects: minute hour day month * + return fmt.Sprintf("%d %d %d %d *", tm.Minute(), tm.Hour(), tm.Day(), int(tm.Month())) } func (t *TimeTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error { t.mu.Lock() defer t.mu.Unlock() + // Ensure we're using the new format + t.ensureLegacyConversion() + taskID := check.TaskId // Function to be executed when trigger fires @@ -68,10 +140,12 @@ func (t *TimeTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) er var job gocron.Job var err error + var schedules []string + var epochs []int64 if fixedTime := check.GetTrigger().GetFixedTime(); fixedTime != nil { // Handle epoch-based scheduling - epochs := fixedTime.GetConfig().GetEpochs() + epochs = fixedTime.GetConfig().GetEpochs() if len(epochs) == 0 { return fmt.Errorf("no epochs provided") } @@ -94,11 +168,10 @@ func (t *TimeTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) er if err != nil { return fmt.Errorf("failed to schedule epoch job: %w", err) } - t.jobs[taskID] = job } } else if cronTrigger := check.GetTrigger().GetCron(); cronTrigger != nil { // Handle cron-based scheduling - schedules := cronTrigger.GetConfig().GetSchedules() + schedules = cronTrigger.GetConfig().GetSchedules() if len(schedules) == 0 { return fmt.Errorf("no cron expressions provided") } @@ -116,10 +189,23 @@ func (t *TimeTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) er if err != nil { return fmt.Errorf("failed to schedule cron job: %w", err) } - t.jobs[taskID] = job } } + // Store in new registry format + timeData := &TimeTaskData{ + Job: job, + Schedules: schedules, + Epochs: epochs, + } + + t.registry.AddTask(taskID, check, nil, nil, timeData) + + t.logger.Debug("โœ… Added time task to registry", + "task_id", taskID, + "schedules_count", len(schedules), + "epochs_count", len(epochs)) + return nil } @@ -137,13 +223,28 @@ func (t *TimeTrigger) RemoveCheck(taskID string) error { t.mu.Lock() defer t.mu.Unlock() - if job, exists := t.jobs[taskID]; exists { - if err := t.scheduler.RemoveJob(job.ID()); err != nil { + // Ensure we're using the new format + t.ensureLegacyConversion() + + // Get task from registry + task, exists := t.registry.GetTask(taskID) + if !exists || task == nil { + t.logger.Debug("task not found in registry", "task_id", taskID) + return nil + } + + // Remove job from scheduler if it exists + if task.TimeData != nil && task.TimeData.Job != nil { + if err := t.scheduler.RemoveJob(task.TimeData.Job.ID()); err != nil { t.logger.Error("failed to remove job", "task_id", taskID, "error", err) } - delete(t.jobs, taskID) } + // Remove from registry + t.registry.RemoveTask(taskID) + + t.logger.Debug("โœ… Removed time task from registry", "task_id", taskID) + return nil } diff --git a/core/taskengine/trigger/time_test.go b/core/taskengine/trigger/time_test.go new file mode 100644 index 00000000..54a47cbf --- /dev/null +++ b/core/taskengine/trigger/time_test.go @@ -0,0 +1,303 @@ +package trigger + +import ( + "fmt" + "testing" + "time" + + "github.com/AvaProtocol/EigenLayer-AVS/core/testutil" + avsproto "github.com/AvaProtocol/EigenLayer-AVS/protobuf" + gocron "github.com/go-co-op/gocron/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTimeTrigger_NewFormat(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Should start in new format + assert.False(t, timeTrigger.legacyMode) + assert.NotNil(t, timeTrigger.registry) + assert.Equal(t, 0, timeTrigger.registry.GetTimeTaskCount()) +} + +func TestTimeTrigger_AddCheck_CronTrigger(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Create a cron trigger task + cronConfig := &avsproto.CronTrigger_Config{ + Schedules: []string{"0 0 * * *"}, // Daily at midnight + } + cronTrigger := &avsproto.CronTrigger{ + Config: cronConfig, + } + trigger := &avsproto.TaskTrigger{ + TriggerType: &avsproto.TaskTrigger_Cron{ + Cron: cronTrigger, + }, + } + + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: "test-cron-task", + Trigger: trigger, + } + + // Add the task + err := timeTrigger.AddCheck(taskMetadata) + require.NoError(t, err) + + // Verify task was added to registry + assert.Equal(t, 1, timeTrigger.registry.GetTimeTaskCount()) + + task, exists := timeTrigger.registry.GetTask("test-cron-task") + require.True(t, exists) + require.NotNil(t, task.TimeData) + assert.Equal(t, []string{"0 0 * * *"}, task.TimeData.Schedules) + assert.Empty(t, task.TimeData.Epochs) + assert.NotNil(t, task.TimeData.Job) +} + +func TestTimeTrigger_AddCheck_FixedTimeTrigger(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Create a fixed time trigger task with future epoch + futureEpoch := time.Now().Add(24 * time.Hour).UnixMilli() + fixedTimeConfig := &avsproto.FixedTimeTrigger_Config{ + Epochs: []int64{futureEpoch}, + } + fixedTimeTrigger := &avsproto.FixedTimeTrigger{ + Config: fixedTimeConfig, + } + trigger := &avsproto.TaskTrigger{ + TriggerType: &avsproto.TaskTrigger_FixedTime{ + FixedTime: fixedTimeTrigger, + }, + } + + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: "test-fixed-time-task", + Trigger: trigger, + } + + // Add the task + err := timeTrigger.AddCheck(taskMetadata) + require.NoError(t, err) + + // Verify task was added to registry + assert.Equal(t, 1, timeTrigger.registry.GetTimeTaskCount()) + + task, exists := timeTrigger.registry.GetTask("test-fixed-time-task") + require.True(t, exists) + require.NotNil(t, task.TimeData) + assert.Empty(t, task.TimeData.Schedules) + assert.Equal(t, []int64{futureEpoch}, task.TimeData.Epochs) + assert.NotNil(t, task.TimeData.Job) +} + +func TestTimeTrigger_AddCheck_PastEpochSkipped(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Create a fixed time trigger task with past epoch + pastEpoch := time.Now().Add(-24 * time.Hour).UnixMilli() + fixedTimeConfig := &avsproto.FixedTimeTrigger_Config{ + Epochs: []int64{pastEpoch}, + } + fixedTimeTrigger := &avsproto.FixedTimeTrigger{ + Config: fixedTimeConfig, + } + trigger := &avsproto.TaskTrigger{ + TriggerType: &avsproto.TaskTrigger_FixedTime{ + FixedTime: fixedTimeTrigger, + }, + } + + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: "test-past-epoch-task", + Trigger: trigger, + } + + // Add the task + err := timeTrigger.AddCheck(taskMetadata) + require.NoError(t, err) + + // Verify task was added to registry even though epoch was skipped + assert.Equal(t, 1, timeTrigger.registry.GetTimeTaskCount()) + + task, exists := timeTrigger.registry.GetTask("test-past-epoch-task") + require.True(t, exists) + require.NotNil(t, task.TimeData) + assert.Equal(t, []int64{pastEpoch}, task.TimeData.Epochs) +} + +func TestTimeTrigger_RemoveCheck(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Add a task first + cronConfig := &avsproto.CronTrigger_Config{ + Schedules: []string{"0 0 * * *"}, + } + cronTrigger := &avsproto.CronTrigger{ + Config: cronConfig, + } + trigger := &avsproto.TaskTrigger{ + TriggerType: &avsproto.TaskTrigger_Cron{ + Cron: cronTrigger, + }, + } + + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: "test-remove-task", + Trigger: trigger, + } + + err := timeTrigger.AddCheck(taskMetadata) + require.NoError(t, err) + assert.Equal(t, 1, timeTrigger.registry.GetTimeTaskCount()) + + // Remove the task + err = timeTrigger.RemoveCheck("test-remove-task") + require.NoError(t, err) + + // Verify task was removed + assert.Equal(t, 0, timeTrigger.registry.GetTimeTaskCount()) + + _, exists := timeTrigger.registry.GetTask("test-remove-task") + assert.False(t, exists) +} + +func TestTimeTrigger_RemoveCheck_NonExistentTask(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Try to remove non-existent task + err := timeTrigger.RemoveCheck("non-existent-task") + require.NoError(t, err) // Should not error + + assert.Equal(t, 0, timeTrigger.registry.GetTimeTaskCount()) +} + +func TestTimeTrigger_LegacyMigration(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Simulate legacy data by creating a job manually + scheduler, _ := gocron.NewScheduler(gocron.WithLocation(time.UTC)) + job, err := scheduler.NewJob( + gocron.CronJob("0 0 * * *", false), + gocron.NewTask(func() {}), + ) + require.NoError(t, err) + + // Manually populate legacy data + timeTrigger.jobs["legacy-task"] = job + timeTrigger.legacyMode = false // Will be detected automatically + + // Add a new task - this should trigger migration + cronConfig := &avsproto.CronTrigger_Config{ + Schedules: []string{"0 1 * * *"}, + } + cronTrigger := &avsproto.CronTrigger{ + Config: cronConfig, + } + trigger := &avsproto.TaskTrigger{ + TriggerType: &avsproto.TaskTrigger_Cron{ + Cron: cronTrigger, + }, + } + + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: "new-task", + Trigger: trigger, + } + + err = timeTrigger.AddCheck(taskMetadata) + require.NoError(t, err) + + // Should have migrated legacy data + new task + assert.Equal(t, 2, timeTrigger.registry.GetTimeTaskCount()) + assert.False(t, timeTrigger.legacyMode) + assert.Empty(t, timeTrigger.jobs) // Legacy data should be cleared + + // Verify both tasks exist + _, exists := timeTrigger.registry.GetTask("legacy-task") + assert.True(t, exists) + + _, exists = timeTrigger.registry.GetTask("new-task") + assert.True(t, exists) +} + +func TestTimeTrigger_RangeTimeTasks(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Add multiple tasks + for i := 0; i < 3; i++ { + cronConfig := &avsproto.CronTrigger_Config{ + Schedules: []string{"0 0 * * *"}, + } + cronTrig := &avsproto.CronTrigger{ + Config: cronConfig, + } + trigger := &avsproto.TaskTrigger{ + TriggerType: &avsproto.TaskTrigger_Cron{ + Cron: cronTrig, + }, + } + + taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{ + TaskId: fmt.Sprintf("task-%d", i), + Trigger: trigger, + } + + err := timeTrigger.AddCheck(taskMetadata) + require.NoError(t, err) + } + + // Test range iteration + var taskIDs []string + timeTrigger.registry.RangeTimeTasks(func(taskID string, entry *TaskEntry) bool { + taskIDs = append(taskIDs, taskID) + return true + }) + + assert.Equal(t, 3, len(taskIDs)) + assert.Contains(t, taskIDs, "task-0") + assert.Contains(t, taskIDs, "task-1") + assert.Contains(t, taskIDs, "task-2") +} + +func TestTimeTrigger_EpochToCron(t *testing.T) { + triggerCh := make(chan TriggerMetadata[uint64], 10) + logger := testutil.GetLogger() + + timeTrigger := NewTimeTrigger(triggerCh, logger) + + // Test epoch conversion - use UTC time + epoch := time.Date(2024, 1, 15, 14, 30, 0, 0, time.UTC).UnixMilli() + cronExpr := timeTrigger.epochToCron(epoch) + + // Should create a cron expression for the specific time (5-field format) + expected := "30 14 15 1 *" // minute hour day month * + assert.Equal(t, expected, cronExpr) +} diff --git a/integration_test/operator_reconnection_test.go b/integration_test/operator_reconnection_test.go index 2130efda..8912d776 100644 --- a/integration_test/operator_reconnection_test.go +++ b/integration_test/operator_reconnection_test.go @@ -97,8 +97,10 @@ func TestOperatorReconnectionFlow(t *testing.T) { require.NoError(t, err) // Step 1: Create a task that requires event monitoring + userAddress := common.HexToAddress("0x1234567890123456789012345678901234567890") user := &model.User{ - Address: common.HexToAddress("0x1234567890123456789012345678901234567890"), + Address: userAddress, + SmartAccountAddress: &userAddress, // Initialize to prevent nil pointer dereference } taskReq := &avsproto.CreateTaskReq{ @@ -292,8 +294,10 @@ func TestOperatorReconnectionRaceCondition(t *testing.T) { defer engine.Stop() // Create a simple task + userAddress := common.HexToAddress("0x1234567890123456789012345678901234567890") user := &model.User{ - Address: common.HexToAddress("0x1234567890123456789012345678901234567890"), + Address: userAddress, + SmartAccountAddress: &userAddress, // Initialize to prevent nil pointer dereference } taskReq := &avsproto.CreateTaskReq{ diff --git a/operator/worker_loop.go b/operator/worker_loop.go index e0b5f626..0a9a6e15 100644 --- a/operator/worker_loop.go +++ b/operator/worker_loop.go @@ -510,7 +510,6 @@ func (o *Operator) StreamMessages() { } } time.Sleep(time.Duration(retryIntervalSecond) * time.Second) - o.retryConnect() continue } @@ -580,46 +579,99 @@ func (o *Operator) StreamMessages() { } // Additional nil check for Trigger field - if resp.TaskMetadata.Trigger == nil { + triggerObj := resp.TaskMetadata.GetTrigger() + if triggerObj == nil { o.logger.Warn("โŒ Received MonitorTaskTrigger message with nil Trigger", "task_id", resp.Id, "solution", "This may indicate a protocol mismatch or aggregator issue") continue } - if trigger := resp.TaskMetadata.GetTrigger().GetEvent(); trigger != nil { + if trigger := triggerObj.GetEvent(); trigger != nil { o.logger.Info("๐Ÿ“ฅ Monitoring event trigger", "task_id", resp.Id) - if err := o.eventTrigger.AddCheck(resp.TaskMetadata); err != nil { - o.logger.Info("โŒ Failed to add event trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for events") - } - } else if trigger := resp.TaskMetadata.Trigger.GetBlock(); trigger != nil { - o.logger.Info("๐Ÿ“ฆ Monitoring block trigger", "task_id", resp.Id, "interval", trigger.Config.Interval) - if err := o.blockTrigger.AddCheck(resp.TaskMetadata); err != nil { - o.logger.Info("โŒ Failed to add block trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for blocks") - } - } else if trigger := resp.TaskMetadata.Trigger.GetCron(); trigger != nil { - scheduleStr := strings.Join(trigger.Config.Schedules, ", ") - o.logger.Info("โฐ Monitoring cron trigger", "task_id", resp.Id, "schedule", scheduleStr) - if err := o.timeTrigger.AddCheck(resp.TaskMetadata); err != nil { - o.logger.Info("โŒ Failed to add cron trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for scheduled execution") - } - } else if trigger := resp.TaskMetadata.Trigger.GetFixedTime(); trigger != nil { - epochCount := len(trigger.Config.Epochs) - var epochInfo string - if epochCount == 1 { - epochInfo = fmt.Sprintf("epoch %d", trigger.Config.Epochs[0]) - } else { - epochInfo = fmt.Sprintf("%d epochs", epochCount) + + // Safely call AddCheck with panic recovery + func() { + defer func() { + if r := recover(); r != nil { + o.logger.Error("๐Ÿšจ CRITICAL: eventTrigger.AddCheck() caused segmentation fault", + "task_id", resp.Id, + "panic", r, + "solution", "resp.TaskMetadata is corrupted - cannot add event trigger") + } + }() + if err := o.eventTrigger.AddCheck(resp.TaskMetadata); err != nil { + o.logger.Info("โŒ Failed to add event trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for events") + } + }() + } else if trigger := triggerObj.GetBlock(); trigger != nil { + o.logger.Info("๐Ÿ“ฆ Monitoring block trigger", "task_id", resp.Id, "interval", trigger.Config.GetInterval()) + + // Safely call AddCheck with panic recovery + func() { + defer func() { + if r := recover(); r != nil { + o.logger.Error("๐Ÿšจ CRITICAL: blockTrigger.AddCheck() caused segmentation fault", + "task_id", resp.Id, + "panic", r, + "solution", "resp.TaskMetadata is corrupted - cannot add block trigger") + } + }() + if err := o.blockTrigger.AddCheck(resp.TaskMetadata); err != nil { + o.logger.Info("โŒ Failed to add block trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for blocks") + } + }() + } else if trigger := triggerObj.GetCron(); trigger != nil { + scheduleInfo := "unknown" + if trigger.Config != nil && trigger.Config.Schedules != nil { + scheduleInfo = strings.Join(trigger.Config.Schedules, ", ") } - o.logger.Info("๐Ÿ“… Monitoring fixed time trigger", "task_id", resp.Id, "epochs", epochInfo) - if err := o.timeTrigger.AddCheck(resp.TaskMetadata); err != nil { - o.logger.Info("โŒ Failed to add fixed time trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for scheduled execution") + o.logger.Info("โฐ Monitoring cron trigger", "task_id", resp.Id, "schedule", scheduleInfo) + + // Safely call AddCheck with panic recovery + func() { + defer func() { + if r := recover(); r != nil { + o.logger.Error("๐Ÿšจ CRITICAL: timeTrigger.AddCheck() caused segmentation fault", + "task_id", resp.Id, + "panic", r, + "solution", "resp.TaskMetadata is corrupted - cannot add cron trigger") + } + }() + if err := o.timeTrigger.AddCheck(resp.TaskMetadata); err != nil { + o.logger.Info("โŒ Failed to add cron trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for scheduled execution") + } + }() + } else if trigger := triggerObj.GetFixedTime(); trigger != nil { + epochInfo := "unknown" + if trigger.Config != nil && trigger.Config.Epochs != nil { + epochCount := len(trigger.Config.Epochs) + if epochCount == 1 { + epochInfo = fmt.Sprintf("epoch: %d", trigger.Config.Epochs[0]) + } else { + epochInfo = fmt.Sprintf("%d epochs", epochCount) + } } + o.logger.Info("๐Ÿ“… Monitoring fixed time trigger", "task_id", resp.Id, "epoch_info", epochInfo) + + // Safely call AddCheck with panic recovery + func() { + defer func() { + if r := recover(); r != nil { + o.logger.Error("๐Ÿšจ CRITICAL: timeTrigger.AddCheck() caused segmentation fault", + "task_id", resp.Id, + "panic", r, + "solution", "resp.TaskMetadata is corrupted - cannot add fixed time trigger") + } + }() + if err := o.timeTrigger.AddCheck(resp.TaskMetadata); err != nil { + o.logger.Info("โŒ Failed to add fixed time trigger to monitoring", "error", err, "task_id", resp.Id, "solution", "Task may not be monitored for fixed time execution") + } + }() } else { - o.logger.Warn("โŒ Received MonitorTaskTrigger message with unsupported or missing trigger", + o.logger.Warn("โ“ Unsupported or unrecognized trigger type", "task_id", resp.Id, - "trigger_type", resp.TaskMetadata.Trigger.GetType(), - "solution", "Check if trigger type is supported by this operator version") + "solution", "Task may not be monitored") } } }