Skip to content

Commit 241886f

Browse files
Merge pull request #34 from retail-ai-inc/fix/error-logging-and-changestream
Fix/error logging and changestream
2 parents f7174b2 + d6e0f69 commit 241886f

File tree

8 files changed

+599
-480
lines changed

8 files changed

+599
-480
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
[![Coverage Status](https://codecov.io/gh/retail-ai-inc/sync/graph/badge.svg)](https://codecov.io/gh/retail-ai-inc/sync)
55
[![GoDoc](https://godoc.org/github.com/retail-ai-inc/sync?status.svg)](https://godoc.org/github.com/retail-ai-inc/sync)
66
[![License](https://img.shields.io/badge/license-MIT-blue)](LICENSE)
7+
[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/retail-ai-inc/sync)
78

89
Synchronize Production NOSQL and SQL data to Standalone instances for Data scientists or other purposes. A **Go-based** tool to synchronize MongoDB or SQL data from a **MongoDB replica set** or **sharded cluster** or production SQL instance to a **standalone instance**, supports initial and incremental synchronization, including **indexes** with change stream monitoring.
910

READMEUI.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ Sync Data Platform provides an intuitive interface for managing and monitoring d
1313

1414
## Main Features
1515

16+
### Dashboard
17+
<img width="800" alt="image" src="https://github.com/user-attachments/assets/9a874651-0832-48ba-8cd3-1137e6da5afe" />
18+
1619
### Task Management
17-
<img width="800" alt="image" src="https://github.com/user-attachments/assets/08c9d723-8dca-4dd8-9ca9-2459c2072858" />
20+
<img width="800" alt="image" src="https://github.com/user-attachments/assets/597c5987-3eea-4a0a-ad4d-39c7d4594868" />
1821

1922

2023
- View task status and details
@@ -23,7 +26,7 @@ Sync Data Platform provides an intuitive interface for managing and monitoring d
2326
- Operate tasks (start/stop/monitor)
2427

2528
### Monitoring Dashboard
26-
<img width="800" alt="image" src="https://github.com/user-attachments/assets/2a1bd9f1-0d46-4961-868f-80dab83b3705" />
29+
<img width="800" alt="image" src="https://github.com/user-attachments/assets/e3670689-5662-4509-937e-6758a6536cce" />
2730
<img width="800" alt="image" src="https://github.com/user-attachments/assets/a38eed54-96ac-4bda-9191-29bc25cdef06" />
2831

2932
- **Key Metrics**:

pkg/api/monitor_handler.go

Lines changed: 60 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package api
22

33
import (
44
"database/sql"
5-
"encoding/json"
65
"errors"
6+
"fmt"
77
"net/http"
88
"strings"
99
"time"
@@ -371,157 +371,100 @@ func ChangeStreamsStatusHandler(w http.ResponseWriter, r *http.Request) {
371371
}
372372
defer db.Close()
373373

374-
// Query recent 2 hours of Comprehensive ChangeStream Status logs
375-
// Get the latest record for each sync_task_id
374+
// Query changestream_statistics table directly
376375
query := `
377-
SELECT t1.sync_task_id, t1.message, t1.log_time
378-
FROM sync_log t1
379-
WHERE t1.message LIKE '%[MongoDB] Comprehensive ChangeStream Status%'
380-
AND t1.log_time >= datetime('now', '-2 hours')
381-
AND t1.log_time = (
382-
SELECT MAX(t2.log_time)
383-
FROM sync_log t2
384-
WHERE t2.sync_task_id = t1.sync_task_id
385-
AND t2.message LIKE '%[MongoDB] Comprehensive ChangeStream Status%'
386-
AND t2.log_time >= datetime('now', '-2 hours')
387-
)
388-
ORDER BY t1.sync_task_id
376+
SELECT
377+
task_id,
378+
collection_name,
379+
received,
380+
executed,
381+
pending,
382+
errors,
383+
inserted,
384+
updated,
385+
deleted,
386+
last_updated,
387+
created_at
388+
FROM changestream_statistics
389+
ORDER BY task_id, collection_name
389390
`
390391

391392
rows, err := db.Query(query)
392393
if err != nil {
393-
errorJSON(w, "query sync_log fail", err)
394+
errorJSON(w, "query changestream_statistics fail", err)
394395
return
395396
}
396397
defer rows.Close()
397398

398-
// Track processed task IDs to avoid duplicates
399-
processedTasks := make(map[string]bool)
400-
401399
// Aggregated data
402400
totalReceived := 0
403401
totalExecuted := 0
404402
totalPending := 0
403+
totalErrors := 0
405404
totalActiveStreams := 0
406405
allChangeStreams := make([]map[string]interface{}, 0)
407406
lastUpdated := ""
407+
taskIDs := make(map[int]bool)
408408

409409
for rows.Next() {
410-
var taskID, message, logTime string
411-
if err := rows.Scan(&taskID, &message, &logTime); err != nil {
412-
continue
413-
}
414-
415-
// Skip if we've already processed this task (take only the latest)
416-
if processedTasks[taskID] {
417-
continue
418-
}
419-
processedTasks[taskID] = true
420-
421-
// Extract JSON from message
422-
jsonStart := strings.Index(message, "{")
423-
if jsonStart == -1 {
424-
continue
425-
}
426-
427-
// Find the end of JSON by counting braces
428-
jsonStr := ""
429-
braceCount := 0
430-
inString := false
431-
escaped := false
432-
433-
for i := jsonStart; i < len(message); i++ {
434-
char := message[i]
435-
436-
if escaped {
437-
escaped = false
438-
continue
439-
}
440-
441-
if char == '\\' {
442-
escaped = true
443-
continue
444-
}
410+
var taskID int
411+
var collectionName string
412+
var received, executed, pending, errors, inserted, updated, deleted int
413+
var lastUpdatedTime, createdAt string
445414

446-
if char == '"' {
447-
inString = !inString
448-
continue
449-
}
450-
451-
if !inString {
452-
if char == '{' {
453-
braceCount++
454-
} else if char == '}' {
455-
braceCount--
456-
if braceCount == 0 {
457-
jsonStr = message[jsonStart : i+1]
458-
break
459-
}
460-
}
461-
}
462-
}
463-
464-
if jsonStr == "" {
415+
if err := rows.Scan(&taskID, &collectionName, &received, &executed, &pending, &errors, &inserted, &updated, &deleted, &lastUpdatedTime, &createdAt); err != nil {
465416
continue
466417
}
467418

468-
// Parse JSON
469-
var statusData map[string]interface{}
470-
if err := json.Unmarshal([]byte(jsonStr), &statusData); err != nil {
419+
// Skip Task ID=0 to filter out legacy data
420+
if taskID == 0 {
471421
continue
472422
}
473423

474-
// Extract summary data
475-
if summary, ok := statusData["summary"].(map[string]interface{}); ok {
476-
if received, ok := summary["total_received"].(float64); ok {
477-
totalReceived += int(received)
478-
}
479-
if executed, ok := summary["total_executed"].(float64); ok {
480-
totalExecuted += int(executed)
481-
}
482-
if pending, ok := summary["total_pending"].(float64); ok {
483-
totalPending += int(pending)
484-
}
424+
// Track unique task IDs
425+
taskIDs[taskID] = true
426+
427+
// Aggregate summary data
428+
totalReceived += received
429+
totalExecuted += executed
430+
totalPending += pending
431+
totalErrors += errors
432+
totalActiveStreams++
433+
434+
// Create changestream detail (maintain original API format)
435+
csDetail := map[string]interface{}{
436+
"task_id": fmt.Sprintf("%d", taskID), // Convert to string format
437+
"name": collectionName, // Use "name" instead of "collection_name"
438+
"received": received,
439+
"executed": executed,
440+
"pending": pending,
441+
"errors": errors,
442+
"operations": map[string]interface{}{ // Nest operations object
443+
"inserted": inserted,
444+
"updated": updated,
445+
"deleted": deleted,
446+
},
485447
}
486448

487-
// Extract changestreams data
488-
if changestreams, ok := statusData["changestreams"].(map[string]interface{}); ok {
489-
if activeCount, ok := changestreams["active_count"].(float64); ok {
490-
totalActiveStreams += int(activeCount)
491-
}
492-
493-
if details, ok := changestreams["details"].([]interface{}); ok {
494-
for _, detail := range details {
495-
if detailMap, ok := detail.(map[string]interface{}); ok {
496-
// Add task_id to each changestream for identification
497-
csDetail := make(map[string]interface{})
498-
for k, v := range detailMap {
499-
csDetail[k] = v
500-
}
501-
csDetail["task_id"] = taskID
502-
allChangeStreams = append(allChangeStreams, csDetail)
503-
}
504-
}
505-
}
506-
}
449+
allChangeStreams = append(allChangeStreams, csDetail)
507450

508-
// Update last updated time
509-
if lastUpdated == "" || logTime > lastUpdated {
510-
lastUpdated = logTime
451+
// Update last updated time (take the latest)
452+
if lastUpdated == "" || lastUpdatedTime > lastUpdated {
453+
lastUpdated = lastUpdatedTime
511454
}
512455
}
513456

514457
if err := rows.Err(); err != nil {
515-
errorJSON(w, "sync_log iteration error", err)
458+
errorJSON(w, "changestream_statistics iteration error", err)
516459
return
517460
}
518461

519-
// Calculate processing rate (simplified calculation)
520-
processingRate := "0/sec"
462+
// Calculate processing rate based on total received/executed over time
463+
processingRate := "N/A"
521464
if totalActiveStreams > 0 && totalExecuted > 0 {
522-
// This is a simplified rate calculation
523-
// In a real scenario, you might want to calculate based on time windows
524-
processingRate = "N/A" // We need time-based calculation for accurate rate
465+
// Simple calculation: assume data represents recent activity
466+
// For more accurate rate, we would need time-based windows
467+
// processingRate = fmt.Sprintf("~%d/min", totalExecuted)
525468
}
526469

527470
// Prepare response
@@ -537,7 +480,7 @@ ORDER BY t1.sync_task_id
537480
},
538481
"changestreams": allChangeStreams,
539482
"last_updated": lastUpdated,
540-
"tasks_count": len(processedTasks),
483+
"tasks_count": len(taskIDs),
541484
},
542485
}
543486

0 commit comments

Comments
 (0)