Skip to content

Commit d726a4c

Browse files
Merge pull request #24 from retail-ai-inc/feature/ui
Feature/UI
2 parents 182dacb + ce2eeb4 commit d726a4c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+3809
-2667
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ deploy/
4949
# Ignore config file
5050
env.json
5151
config_*.yaml
52-
# config.yaml
52+
config.yaml
5353
config.json
5454
.DS_Store
5555
.goreleaser.yaml
5656
dist/
5757

5858
sync
5959
cloudbuild.yaml
60+
# sync.db

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ WORKDIR /app
3030
COPY --from=builder /app/sync .
3131

3232
# Copy the configuration file
33-
COPY --from=builder /app/configs ./configs
33+
COPY --from=builder /app/sync.db ./sync.db
3434

3535
# Run the application
3636
ENTRYPOINT ["./sync"]

README.md

Lines changed: 9 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ Create standalone databases outside of your production database servers with the
6363
- When **`enable_table_row_count_monitoring`** is enabled, the tool records data changes, including table row counts, in **GCP Logging**.
6464
- These logs are then forwarded via **Log Router** to **GCP BigQuery**.<br>Finally, **Grafana** is used to visualize this data, providing users with insights into the synchronization process.
6565
- This integration is part of a temporary transition phase, and future development will focus on using a more flexible database solution for direct display and synchronization.
66+
- **UI Interface**:
67+
The tool has added a **UI interface**, making it easier to operate and monitor the synchronization process.
6668

6769
## Prerequisites
6870
- For MongoDB sources:
@@ -83,11 +85,14 @@ Create standalone databases outside of your production database servers with the
8385

8486
This is a demo for macOS on amd64. For other operating systems and architectures, you need to replace the download link with the appropriate binary URL for your system.
8587
```
86-
curl -L -o sync.tar.gz https://github.com/retail-ai-inc/sync/releases/download/v2.0.1/sync_2.0.1_darwin_amd64.tar.gz
88+
curl -L -o sync.tar.gz https://github.com/retail-ai-inc/sync/releases/download/v3.0.1/sync_3.0.1_darwin_amd64.tar.gz
8789
tar -xzf sync.tar.gz
88-
# Edit configs/config.yaml to replace the placeholders with your instance details.
8990
./sync
9091
```
92+
After running `./sync`, the application will output a browser address:
93+
- URL: [http://localhost:8080](http://localhost:8080)
94+
- Username: `admin`
95+
- Password: `admin`
9196

9297
## Installation(For development)
9398

@@ -100,100 +105,11 @@ cd sync
100105
go mod tidy
101106
102107
# 3. Build the binary
103-
# Edit configs/config.yaml to replace the placeholders with your instance details.
104108
go build -o sync cmd/sync/main.go
105109
106110
# 4. Build the Docker image
107111
docker build -t sync .
108-
docker run -v $(pwd)/configs/config.yaml:/app/configs/config.yaml sync
109-
```
110-
111-
### Configuration File: config.yaml
112-
113-
The `config.yaml` defines multiple sync tasks. Each sync task specifies:
114-
- **Multi Mappings Config**: Sync supports multiple mappings per source, allowing you to replicate data from multiple databases or schemas to one or more target databases/schemas simultaneously.
115-
- **Type of Source** (`mongodb`, `mysql`, `mariadb`, `postgresql`).
116-
- **Source and Target Connection Strings**.
117-
- **Database/Table or Collection Mappings**.
118-
- **enable_table_row_count_monitoring**:
119-
Enables tracking of row counts for tables, providing insights into data growth and consistency.
120-
- **State File Paths for Resume Tokens or Binlog Positions**:
121-
Supports **resume functionality**, allowing the sync to continue from the last state after interruptions, ensuring data consistency.
122-
- **MongoDB**: `mongodb_resume_token_path` specifies the file path where the MongoDB resume token is stored.
123-
- **MySQL/MariaDB**: `mysql_position_path` specifies the file path where the MySQL/MariaDB binlog position is stored.
124-
- **PostgreSQL**: `pg_replication_slot` and `pg_plugin` specify the replication slot and plugin used for capturing WAL changes.
125-
126-
#### Example `config.yaml`
127-
128-
```yaml
129-
enable_table_row_count_monitoring: true
130-
log_level: "info" # Optional values: "debug", "info", "warn", "error", "fatal", "panic"
131-
132-
sync_configs:
133-
- type: "mongodb"
134-
enable: true
135-
source_connection: "mongodb://localhost:27017"
136-
target_connection: "mongodb://localhost:27017"
137-
mongodb_resume_token_path: "/tmp/state/mongodb_resume_token"
138-
mappings:
139-
- source_database: "source_db"
140-
target_database: "target_db"
141-
tables:
142-
- source_table: "users"
143-
target_table: "users"
144-
145-
- type: "mysql"
146-
enable: true
147-
source_connection: "root:root@tcp(localhost:3306)/source_db"
148-
target_connection: "root:root@tcp(localhost:3306)/target_db"
149-
mysql_position_path: "/tmp/state/mysql_position"
150-
mappings:
151-
- source_database: "source_db"
152-
target_database: "target_db"
153-
tables:
154-
- source_table: "users"
155-
target_table: "users"
156-
157-
- type: "mariadb"
158-
enable: true
159-
source_connection: "root:root@tcp(localhost:3307)/source_db"
160-
target_connection: "root:root@tcp(localhost:3307)/target_db"
161-
mysql_position_path: "/tmp/state/mariadb_position"
162-
mappings:
163-
- source_database: "source_db"
164-
target_database: "target_db"
165-
tables:
166-
- source_table: "users"
167-
target_table: "users"
168-
169-
- type: "postgresql"
170-
enable: true
171-
source_connection: "postgres://root:root@localhost:5432/source_db?sslmode=disable"
172-
target_connection: "postgres://root:root@localhost:5432/target_db?sslmode=disable"
173-
pg_position_path: "/tmp/state/pg_position"
174-
pg_replication_slot: "sync_slot"
175-
pg_plugin: "pgoutput"
176-
pg_publication_names: "mypub"
177-
mappings:
178-
- source_database: "source_db"
179-
source_schema: "public"
180-
target_database: "target_db"
181-
target_schema: "public"
182-
tables:
183-
- source_table: "users"
184-
target_table: "users"
185-
186-
- type: "redis"
187-
enable: true
188-
source_connection: "redis://localhost:6379/0"
189-
target_connection: "redis://localhost:6379/1"
190-
redis_position_path: "/tmp/state/redis_position"
191-
mappings:
192-
- source_database: "db0"
193-
target_database: "db1"
194-
tables:
195-
- source_table: "source_stream" # Redis Stream Name
196-
target_table: ""
112+
docker run -v sync
197113
```
198114

199115
## Real-Time Synchronization
@@ -220,4 +136,4 @@ Note: All interactions here should conform to the [Code of Conduct](https://gith
220136

221137
## Give a Star! ⭐
222138

223-
If you like or are using this project, please give it a **star**. Thanks!
139+
If you like or are using this project, please give it a **star**. Thanks!

cmd/sync/main.go

Lines changed: 125 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,96 +1,177 @@
11
package main
22

33
import (
4+
"bytes"
45
"context"
6+
"encoding/json"
7+
"net/http"
58
"os"
69
"os/signal"
710
"sync"
811
"syscall"
912
"time"
1013

11-
_ "github.com/go-sql-driver/mysql" // MySQL / MariaDB driver
12-
_ "github.com/lib/pq" // PostgreSQL driver
14+
"github.com/go-chi/chi/v5"
15+
"github.com/retail-ai-inc/sync/pkg/api"
1316
"github.com/retail-ai-inc/sync/pkg/config"
1417
"github.com/retail-ai-inc/sync/pkg/logger"
1518
"github.com/retail-ai-inc/sync/pkg/syncer"
1619
"github.com/retail-ai-inc/sync/pkg/utils"
20+
"github.com/sirupsen/logrus"
1721
)
1822

19-
// Interval for row count monitoring every minute
2023
const monitorInterval = time.Second * 60
2124

2225
func main() {
23-
// Initialize context
26+
cfg := config.NewConfig()
27+
log := logger.InitLogger(cfg.LogLevel)
28+
29+
if _, err := os.Stat("ui/dist"); os.IsNotExist(err) {
30+
log.Info("ui/dist directory does not exist, extracting ui/dist.zip...")
31+
if err := utils.UnzipDistFile("ui/dist.zip", "ui/"); err != nil {
32+
log.Errorf("Error unzipping dist.zip: %v", err)
33+
return
34+
}
35+
log.Info("ui/dist directory extracted successfully.")
36+
}
37+
2438
ctx, cancel := context.WithCancel(context.Background())
2539
defer cancel()
26-
27-
// Capture system interrupt signal
28-
c := make(chan os.Signal, 1)
29-
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
40+
sigs := make(chan os.Signal, 1)
41+
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
3042
go func() {
31-
<-c
32-
logger.Log.Info("Received interrupt signal, exiting...")
43+
<-sigs
3344
cancel()
3445
}()
3546

36-
// Load configuration
37-
cfg := config.NewConfig()
38-
log := logger.InitLogger(cfg.LogLevel)
47+
if cfg.EnableTableRowCountMonitoring {
48+
utils.StartRowCountMonitoring(ctx, cfg, log, monitorInterval)
49+
}
50+
51+
router := chi.NewRouter()
52+
router.Mount("/api", api.NewRouter())
53+
router.Handle("/*", http.StripPrefix("/", http.FileServer(http.Dir("ui/dist"))))
54+
55+
server := &http.Server{
56+
Addr: ":8080",
57+
Handler: router,
58+
}
59+
go func() {
60+
log.Info("UI is running at http://localhost:8080")
61+
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
62+
log.Errorf("HTTP server error: %v", err)
63+
cancel()
64+
}
65+
}()
66+
67+
go runSyncTasks(ctx, log, cfg)
3968

40-
// Start backend synchronization
69+
<-ctx.Done()
70+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
71+
defer shutdownCancel()
72+
if err := server.Shutdown(shutdownCtx); err != nil {
73+
log.Errorf("HTTP server Shutdown error: %v", err)
74+
} else {
75+
log.Info("HTTP server gracefully stopped")
76+
}
77+
78+
time.Sleep(2 * time.Second)
79+
log.Info("Program exited")
80+
}
81+
82+
func runSyncTasks(parentCtx context.Context, log *logrus.Logger, cfg *config.Config) {
83+
configReloadInterval := 10 * time.Second
84+
currentConfig := cfg
4185
var wg sync.WaitGroup
86+
syncCtx, syncCancel := context.WithCancel(parentCtx)
87+
startSyncTasks(syncCtx, currentConfig, &wg, log)
88+
ticker := time.NewTicker(configReloadInterval)
89+
defer ticker.Stop()
90+
91+
// Monitor for config changes and restart row count monitoring if needed
92+
var rowCountMonitorCancel context.CancelFunc
93+
var rowCountMonitorCtx context.Context // Declare the context variable here
94+
95+
for {
96+
select {
97+
case <-parentCtx.Done():
98+
syncCancel()
99+
wg.Wait()
100+
if rowCountMonitorCancel != nil {
101+
rowCountMonitorCancel()
102+
}
103+
return
104+
case <-ticker.C:
105+
newConfig := config.NewConfig()
106+
if !configsEqual(currentConfig, newConfig) {
107+
log.Info("Config change detected, restarting sync tasks and row count monitoring...")
108+
syncCancel()
109+
wg.Wait()
110+
111+
// Stop the row count monitoring if it was started
112+
if rowCountMonitorCancel != nil {
113+
rowCountMonitorCancel()
114+
}
115+
116+
currentConfig = newConfig
117+
syncCtx, syncCancel = context.WithCancel(parentCtx)
118+
wg = sync.WaitGroup{}
119+
startSyncTasks(syncCtx, currentConfig, &wg, log)
120+
121+
// Restart row count monitoring if needed
122+
if currentConfig.EnableTableRowCountMonitoring {
123+
// Cancel the previous row count monitoring context (if any)
124+
if rowCountMonitorCancel != nil {
125+
rowCountMonitorCancel()
126+
}
127+
// Start a new row count monitoring with a fresh context
128+
rowCountMonitorCtx, rowCountMonitorCancel = context.WithCancel(parentCtx)
129+
utils.StartRowCountMonitoring(rowCountMonitorCtx, currentConfig, log, monitorInterval)
130+
}
131+
}
132+
}
133+
}
134+
}
135+
136+
func startSyncTasks(ctx context.Context, cfg *config.Config, wg *sync.WaitGroup, log *logrus.Logger) {
42137
for _, syncCfg := range cfg.SyncConfigs {
43138
if !syncCfg.Enable {
44139
continue
45140
}
46141
wg.Add(1)
47142
switch syncCfg.Type {
48143
case "mongodb":
49-
go func(syncCfg config.SyncConfig) {
144+
go func(sc config.SyncConfig) {
50145
defer wg.Done()
51-
syncer := syncer.NewMongoDBSyncer(syncCfg, log)
52-
syncer.Start(ctx)
146+
syncer.NewMongoDBSyncer(sc, log).Start(ctx)
53147
}(syncCfg)
54-
case "mysql":
55-
go func(syncCfg config.SyncConfig) {
148+
case "mysql", "mariadb":
149+
go func(sc config.SyncConfig) {
56150
defer wg.Done()
57-
syncer := syncer.NewMySQLSyncer(syncCfg, log)
58-
syncer.Start(ctx)
59-
}(syncCfg)
60-
case "mariadb":
61-
go func(syncCfg config.SyncConfig) {
62-
defer wg.Done()
63-
syncer := syncer.NewMariaDBSyncer(syncCfg, log)
64-
syncer.Start(ctx)
151+
syncer.NewMySQLSyncer(sc, log).Start(ctx)
65152
}(syncCfg)
66153
case "postgresql":
67-
go func(syncCfg config.SyncConfig) {
154+
go func(sc config.SyncConfig) {
68155
defer wg.Done()
69-
syncer := syncer.NewPostgreSQLSyncer(syncCfg, log)
70-
syncer.Start(ctx)
156+
syncer.NewPostgreSQLSyncer(sc, log).Start(ctx)
71157
}(syncCfg)
72158
case "redis":
73-
go func(syncCfg config.SyncConfig) {
159+
go func(sc config.SyncConfig) {
74160
defer wg.Done()
75-
syncer := syncer.NewRedisSyncer(syncCfg, log)
76-
syncer.Start(ctx)
161+
syncer.NewRedisSyncer(sc, log).Start(ctx)
77162
}(syncCfg)
78163
default:
79164
log.Errorf("Unknown sync type: %s", syncCfg.Type)
80165
wg.Done()
81166
}
82167
}
168+
}
83169

84-
// Start monitoring goroutine: output row counts every minute for each mapped table (source/target)
85-
if cfg.EnableTableRowCountMonitoring {
86-
utils.StartRowCountMonitoring(ctx, cfg, log, monitorInterval)
170+
func configsEqual(c1, c2 *config.Config) bool {
171+
b1, err1 := json.Marshal(c1.SyncConfigs)
172+
b2, err2 := json.Marshal(c2.SyncConfigs)
173+
if err1 != nil || err2 != nil {
174+
return false
87175
}
88-
89-
// Wait for all sync to complete
90-
wg.Wait()
91-
logger.Log.Info("All synchronization tasks have completed.")
92-
93-
// Wait for program to end
94-
<-ctx.Done()
95-
logger.Log.Info("Program has exited")
176+
return bytes.Equal(b1, b2)
96177
}

0 commit comments

Comments
 (0)