Skip to content

Troubleshoot operator on Base Sepolia and redo TaskRegistry for three triggrers #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,13 @@ alias-ecdsa.key.json

keys/

# BadgerDB database files (runtime data)
cmd/data/
**/badger/
*.vlog
DISCARD
KEYREGISTRY
MANIFEST

# Plugin files for semantic release
.semrel/
Binary file removed cmd/data/badger/000001.vlog
Binary file not shown.
Binary file removed cmd/data/badger/000002.vlog
Binary file not shown.
Binary file removed cmd/data/badger/DISCARD
Binary file not shown.
1 change: 0 additions & 1 deletion cmd/data/badger/KEYREGISTRY

This file was deleted.

Binary file removed cmd/data/badger/MANIFEST
Binary file not shown.
14 changes: 12 additions & 2 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,9 @@ func (n *Engine) CreateTask(user *model.User, taskPayload *avsproto.CreateTaskRe
n.tasks[task.Id] = task
n.lock.Unlock()

// Notify operators about the new task
n.notifyOperatorsTaskOperation(task.Id, avsproto.MessageOp_MonitorTaskTrigger)
// Note: MonitorTaskTrigger notifications are handled by StreamCheckToOperator
// which sends complete task metadata. The batched notification system is only
// for CancelTask/DeleteTask operations that don't need complete metadata.

// Log successful task creation with final counts
n.logger.Info("✅ CreateTask completed successfully",
Expand Down Expand Up @@ -1062,6 +1063,15 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av
// notifyOperatorsTaskOperation queues notifications for batched sending to operators
// This method is non-blocking and batches notifications for efficiency
func (n *Engine) notifyOperatorsTaskOperation(taskID string, operation avsproto.MessageOp) {
// MonitorTaskTrigger should not use batched notifications as it requires complete task metadata
if operation == avsproto.MessageOp_MonitorTaskTrigger {
n.logger.Warn("❌ MonitorTaskTrigger should not be sent via batched notifications",
"task_id", taskID,
"operation", operation.String(),
"solution", "MonitorTaskTrigger is handled by StreamCheckToOperator with complete metadata")
return
}

n.notificationMutex.Lock()
defer n.notificationMutex.Unlock()

Expand Down
192 changes: 150 additions & 42 deletions core/taskengine/trigger/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ type TriggerMetadata[T any] struct {
type BlockTrigger struct {
*CommonTrigger

schedule map[int64]map[string]bool
// New unified registry (preferred)
registry *TaskRegistry

// Legacy schedule map (for backward compatibility)
schedule map[int64]map[string]bool
legacyMode bool

// Smart monitoring state
minInterval int64 // Minimum interval across all tasks
Expand All @@ -50,7 +55,9 @@ func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64], logger
logger: logger,
mu: sync.Mutex{},
},
registry: NewTaskRegistry(),
schedule: make(map[int64]map[string]bool),
legacyMode: false, // Start in new mode
minInterval: 0,
subscriptionActive: false,
triggerCh: triggerCh,
Expand All @@ -70,17 +77,75 @@ func NewBlockTrigger(o *RpcOption, triggerCh chan TriggerMetadata[int64], logger
return &b
}

// ensureNewFormat ensures we're using the new TaskRegistry format
// This provides automatic conversion from legacy schedule map format
func (b *BlockTrigger) ensureNewFormat() {
if b.legacyMode {
b.logger.Info("🔄 Converting from legacy schedule map format to new TaskRegistry format")

// Convert existing data
b.convertFromScheduleMap()

// Clear old data
b.schedule = make(map[int64]map[string]bool)
b.legacyMode = false

b.logger.Info("✅ Successfully converted to new TaskRegistry format",
"task_count", b.registry.GetBlockTaskCount())
}
}

// detectLegacyData checks if we have data in the old format
func (b *BlockTrigger) detectLegacyData() {
hasLegacyData := len(b.schedule) > 0

if hasLegacyData && b.registry.GetBlockTaskCount() == 0 {
b.legacyMode = true
b.logger.Info("🔍 Detected legacy schedule map data - will convert on next operation")
}
}

// convertFromScheduleMap converts old schedule map data to the new TaskRegistry
// This provides backward compatibility during migration
func (b *BlockTrigger) convertFromScheduleMap() {
for interval, tasks := range b.schedule {
for taskID := range tasks {
// Create BlockTaskData for the new registry
blockData := &BlockTaskData{
Interval: interval,
}

// We don't have the original TaskMetadata in the old format,
// so we'll create a minimal one for compatibility
taskMetadata := &avsproto.SyncMessagesResp_TaskMetadata{
TaskId: taskID,
}

// Add to new registry
b.registry.AddTask(taskID, taskMetadata, nil, blockData, nil)

b.logger.Debug("🔄 Converted legacy block task",
"task_id", taskID,
"interval", interval)
}
}
}

// calculateMinInterval finds the minimum interval among all registered tasks
func (b *BlockTrigger) calculateMinInterval() int64 {
// Auto-convert from legacy format if needed
b.detectLegacyData()
b.ensureNewFormat()

minInterval := int64(0)

for interval := range b.schedule {
if len(b.schedule[interval]) > 0 { // Only consider intervals with active tasks
if minInterval == 0 || interval < minInterval {
minInterval = interval
}
b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool {
interval := entry.BlockData.Interval
if minInterval == 0 || interval < minInterval {
minInterval = interval
}
}
return true // Continue iteration
})

return minInterval
}
Expand All @@ -96,6 +161,10 @@ func (b *BlockTrigger) shouldCheckAtBlock(blockNumber int64) bool {
}

func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) error {
// Auto-convert from legacy format if needed
b.detectLegacyData()
b.ensureNewFormat()

b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -106,15 +175,27 @@ func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e
return fmt.Errorf("invalid block trigger interval %d for task %s: interval must be greater than 0", interval, check.TaskId)
}

// Add task to schedule
if _, ok := b.schedule[interval]; !ok {
b.schedule[interval] = map[string]bool{
check.TaskId: true,
// Create BlockTaskData for the new registry
blockData := &BlockTaskData{
Interval: interval,
}

// Add to new registry
b.registry.AddTask(check.TaskId, check, nil, blockData, nil)

// Count tasks with the same interval for logging
tasksWithSameInterval := 0
b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool {
if entry.BlockData.Interval == interval {
tasksWithSameInterval++
}
return true
})

if tasksWithSameInterval == 1 {
b.logger.Info("📦 First task registered for block interval", "interval", interval, "task_id", check.TaskId)
} else {
b.schedule[interval][check.TaskId] = true
b.logger.Debug("📦 Additional task registered for block interval", "interval", interval, "task_id", check.TaskId, "total_tasks_for_interval", len(b.schedule[interval]))
b.logger.Debug("📦 Additional task registered for block interval", "interval", interval, "task_id", check.TaskId, "total_tasks_for_interval", tasksWithSameInterval)
}

// Recalculate minimum interval
Expand All @@ -124,17 +205,22 @@ func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e
oldMinInterval := b.minInterval
b.minInterval = newMinInterval

// Count unique intervals for logging
uniqueIntervals := make(map[int64]bool)
b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool {
uniqueIntervals[entry.BlockData.Interval] = true
return true
})

b.logger.Info("🔄 Block monitoring minimum interval updated",
"old_min_interval", oldMinInterval,
"new_min_interval", newMinInterval,
"total_intervals", len(b.schedule))
"total_intervals", len(uniqueIntervals))

// Log active intervals for debugging
activeIntervals := make([]int64, 0)
for interval, tasks := range b.schedule {
if len(tasks) > 0 {
activeIntervals = append(activeIntervals, interval)
}
activeIntervals := make([]int64, 0, len(uniqueIntervals))
for interval := range uniqueIntervals {
activeIntervals = append(activeIntervals, interval)
}
b.logger.Debug("📊 Active block intervals", "intervals", activeIntervals, "min_interval", newMinInterval)
}
Expand All @@ -143,24 +229,40 @@ func (b *BlockTrigger) AddCheck(check *avsproto.SyncMessagesResp_TaskMetadata) e
}

func (b *BlockTrigger) RemoveCheck(taskID string) error {
// Auto-convert from legacy format if needed
b.detectLegacyData()
b.ensureNewFormat()

b.mu.Lock()
defer b.mu.Unlock()

removedFromInterval := int64(0)

for interval, tasks := range b.schedule {
if _, exists := tasks[taskID]; exists {
delete(b.schedule[interval], taskID)
removedFromInterval = interval
// Get the task before removing it to know which interval it was from
if entry, exists := b.registry.GetTask(taskID); exists && entry.BlockData != nil {
removedFromInterval = entry.BlockData.Interval

b.logger.Debug("🗑️ Removed block task", "task_id", taskID, "interval", interval, "remaining_tasks_for_interval", len(tasks)-1)
// Count remaining tasks with the same interval before removal
remainingTasksForInterval := 0
b.registry.RangeBlockTasks(func(tID string, e *TaskEntry) bool {
if e.BlockData.Interval == removedFromInterval && tID != taskID {
remainingTasksForInterval++
}
return true
})

// Remove from new registry
removed := b.registry.RemoveTask(taskID)

if removed {
b.logger.Debug("🗑️ Removed block task", "task_id", taskID, "interval", removedFromInterval, "remaining_tasks_for_interval", remainingTasksForInterval)

// Clean up empty intervals
if len(b.schedule[interval]) == 0 {
delete(b.schedule, interval)
b.logger.Info("🧹 Cleaned up empty block interval", "interval", interval)
// Check if this was the last task for this interval
if remainingTasksForInterval == 0 {
b.logger.Info("🧹 Cleaned up empty block interval", "interval", removedFromInterval)
}
break
} else {
b.logger.Debug("🤷 Task not found for removal", "task_id", taskID)
}
}

Expand All @@ -171,11 +273,18 @@ func (b *BlockTrigger) RemoveCheck(taskID string) error {
oldMinInterval := b.minInterval
b.minInterval = newMinInterval

// Count remaining intervals
uniqueIntervals := make(map[int64]bool)
b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool {
uniqueIntervals[entry.BlockData.Interval] = true
return true
})

b.logger.Info("🔄 Block monitoring minimum interval updated after removal",
"removed_from_interval", removedFromInterval,
"old_min_interval", oldMinInterval,
"new_min_interval", newMinInterval,
"remaining_intervals", len(b.schedule))
"remaining_intervals", len(uniqueIntervals))

// If no more tasks, we'll stop subscription in the main loop
if newMinInterval == 0 {
Expand Down Expand Up @@ -306,15 +415,14 @@ func (b *BlockTrigger) processBlock(header *types.Header) {
"block_hash", header.Hash().Hex(),
"min_interval", minInterval)

b.mu.RLock()
scheduleSnapshot := make(map[int64]map[string]bool)
for interval, tasks := range b.schedule {
scheduleSnapshot[interval] = make(map[string]bool)
for taskID := range tasks {
scheduleSnapshot[interval][taskID] = true
}
}
b.mu.RUnlock()
// Create a snapshot of tasks grouped by interval for processing
intervalTasksMap := make(map[int64][]string)

b.registry.RangeBlockTasks(func(taskID string, entry *TaskEntry) bool {
interval := entry.BlockData.Interval
intervalTasksMap[interval] = append(intervalTasksMap[interval], taskID)
return true
})

triggeredTasks := 0

Expand All @@ -323,16 +431,16 @@ func (b *BlockTrigger) processBlock(header *types.Header) {
remainder := new(big.Int)

// Check all intervals to see which ones should trigger at this block
for interval, tasks := range scheduleSnapshot {
for interval, tasks := range intervalTasksMap {
if len(tasks) == 0 {
continue
}

// Check if this interval should trigger at this block number
// Using checkpoint system: interval should trigger when blockNumber % interval == 0
intervalBig.SetInt64(interval)
if remainder.Mod(header.Number, intervalBig).Cmp(zero) == 0 {
for taskID := range tasks {
if remainder.Mod(header.Number, intervalBig).Cmp(big.NewInt(0)) == 0 {
for _, taskID := range tasks {
b.triggerCh <- TriggerMetadata[int64]{
TaskID: taskID,
Marker: blockNumber,
Expand Down
20 changes: 13 additions & 7 deletions core/taskengine/trigger/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,21 @@ func TestBlockTrigger_AddCheck_ValidInterval(t *testing.T) {
t.Errorf("AddCheck() unexpected error for valid interval: %v", err)
}

// Verify the task was added to the schedule
blockTrigger.mu.Lock()
defer blockTrigger.mu.Unlock()
// Verify the task was added to the registry
if blockTrigger.registry.GetBlockTaskCount() != 1 {
t.Error("Expected 1 block task in registry")
}

entry, exists := blockTrigger.registry.GetTask("test-task-valid")
if !exists {
t.Error("Task was not found in registry")
}

if _, exists := blockTrigger.schedule[10]; !exists {
t.Error("Task was not added to schedule for interval 10")
if entry.BlockData == nil {
t.Error("Task does not have BlockData")
}

if _, exists := blockTrigger.schedule[10]["test-task-valid"]; !exists {
t.Error("Task ID was not found in schedule for interval 10")
if entry.BlockData.Interval != 10 {
t.Errorf("Expected interval 10, got %d", entry.BlockData.Interval)
}
}
Loading