Skip to content

Commit dc1d67a

Browse files
Merge pull request #20 from retail-ai-inc/feature/redis-support
feat: add support for Redis integration
2 parents 845a5fe + 07024de commit dc1d67a

File tree

16 files changed

+1057
-281
lines changed

16 files changed

+1057
-281
lines changed

.github/workflows/codecov.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ jobs:
7272
echo "Waiting for MongoDB..."
7373
sleep 1
7474
done
75-
75+
76+
sleep 5
7677
echo "All services are up!"
7778
7879
- name: Run tests
@@ -87,4 +88,4 @@ jobs:
8788
- name: Tear down Docker Compose
8889
if: always()
8990
run: |
90-
docker-compose -f docker/docker-compose.yml down
91+
docker-compose -f docker/docker-compose.yml down

README.md

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Synchronize Production NOSQL and SQL data to Standalone instances for Data scientists or other purposes. A **Go-based** tool to synchronize MongoDB or SQL data from a **MongoDB replica set** or **sharded cluster** or production SQL instance to a **standalone instance**, supports initial and incremental synchronization with change stream monitoring.
44

55
> [!NOTE]
6-
> Sync is now supporting MongoDB, MySQl, PostgreSQL and MariaDB. Next `Sync` will support Redis and Elasticsearch.
6+
> Sync is now supporting MongoDB, MySQL, PostgreSQL, MariaDB, and Redis. Next, `Sync` will support Elasticsearch.
77
88
## What is the problem
99
Let’s assume you have a mid to big-size SaaS platform or service with multiple tech teams and stakeholders. Different teams have different requirements for analyzing the production data independently. However, the tech team doesn’t want to allow all these stakeholders direct access to the production databases due to security and stability issues.
@@ -16,7 +16,8 @@ Create standalone databases outside of your production database servers with the
1616
- MongoDB (Sharded clusters, Replica sets)
1717
- MySQL
1818
- MariaDB
19-
- PostgreSQL (PostgreSQL version 10+、Enable logical replication)
19+
- PostgreSQL (PostgreSQL version 10+ with logical replication enabled)
20+
- Redis (Standalone, Sentinel; does not support cluster mode)
2021

2122
## High Level Design Diagram
2223

@@ -31,7 +32,7 @@ Create standalone databases outside of your production database servers with the
3132

3233
![image](https://github.com/user-attachments/assets/65b23a4c-56db-4833-89a1-0f802af878bd)
3334

34-
### Grafana Integration(temporary transition phase)
35+
### Grafana Integration (temporary transition phase)
3536
![image](https://github.com/user-attachments/assets/cdc8e57b-8aa4-4386-8aa8-de5028698fd0)
3637

3738

@@ -41,14 +42,17 @@ Create standalone databases outside of your production database servers with the
4142
- MongoDB: Bulk synchronization of data from the MongoDB cluster or MongoDB replica set to the standalone MongoDB instance.
4243
- MySQL/MariaDB: Initial synchronization using batch inserts (default batch size: 100 rows) from the source to the target if the target table is empty.
4344
- PostgreSQL: Initial synchronization using batch inserts (default batch size: 100 rows) from the source to the target using logical replication slots and the pgoutput plugin.
44-
- **Change Stream & Binlog Monitoring**:
45+
- Redis: Supports full data synchronization for standalone Redis and Sentinel setups using Redis Streams and Keyspace Notifications.
46+
- **Change Stream & Incremental Updates**:
4547
- MongoDB: Watches for real-time changes (insert, update, replace, delete) in the cluster's collections and reflects them in the standalone instance.
4648
- MySQL/MariaDB: Uses binlog replication events to capture and apply incremental changes to the target.
4749
- PostgreSQL: Uses WAL (Write-Ahead Log) with the pgoutput plugin to capture and apply incremental changes to the target.
50+
- Redis: Uses Redis Streams and Keyspace Notifications to capture and sync incremental changes in real-time.
4851
- **Batch Processing & Concurrency**:
4952
Handles synchronization in batches for optimized performance and supports parallel synchronization for multiple collections/tables.
5053
- **Restart Resilience**:
51-
Stores MongoDB resume tokens, MySQL binlog positions, and PostgreSQL replication positions in configurable state files, allowing the tool to resume synchronization from the last known position after a restart.
54+
Stores MongoDB resume tokens, MySQL binlog positions, PostgreSQL replication positions, and Redis stream offsets in configurable state files, allowing the tool to resume synchronization from the last known position after a restart.
55+
- **Note for Redis**: Redis does not support resuming from the last state after a sync interruption. If `Sync` is interrupted or crashes, it will restart the synchronization process by executing the initial sync method to retrieve all keys and sync them to the target database. This is due to limitations in Redis Streams and Keyspace Notifications, which do not provide a built-in mechanism to persist and resume stream offsets across restarts. As a result, the tool cannot accurately determine the last synced state and must perform a full resync to ensure data consistency.
5256
- **Grafana Integration**:
5357
- For data visualization, this tool integrates with **Grafana** using data from **GCP Logging** and **GCP BigQuery**.
5458
- When **`enable_table_row_count_monitoring`** is enabled, the tool records data changes, including table row counts, in **GCP Logging**.
@@ -65,6 +69,10 @@ Create standalone databases outside of your production database servers with the
6569
- For PostgreSQL sources:
6670
- A PostgreSQL instance with logical replication enabled and a replication slot created.
6771
- A target PostgreSQL instance with write permissions.
72+
- For Redis sources:
73+
- Redis standalone or Sentinel setup with Redis version >= 5.0.
74+
- Redis Streams and Keyspace Notifications enabled.
75+
- A target Redis instance with write permissions.
6876

6977
## Quick start
7078

@@ -169,21 +177,36 @@ sync_configs:
169177
tables:
170178
- source_table: "users"
171179
target_table: "users"
180+
181+
- type: "redis"
182+
enable: true
183+
source_connection: "redis://localhost:6379/0"
184+
target_connection: "redis://localhost:6379/1"
185+
redis_position_path: "/tmp/state/redis_position"
186+
mappings:
187+
- source_database: "db0"
188+
target_database: "db1"
189+
tables:
190+
- source_table: "source_stream" # Redis Stream Name
191+
target_table: ""
172192
```
173193
174194
## Real-Time Synchronization
175195
176196
- MongoDB: Uses Change Streams from replica sets or sharded clusters for incremental updates.
177197
- MySQL/MariaDB: Uses binlog replication to apply incremental changes to the target.
178198
- PostgreSQL: Uses WAL (Write-Ahead Log) with the pgoutput plugin to apply incremental changes to the target.
199+
- Redis: Uses Redis Streams and Keyspace Notifications to sync changes in real-time.
200+
- **Note for Redis**: If `Sync` is interrupted, Redis will restart the synchronization process with an initial sync of all keys to the target. This ensures data consistency but may increase synchronization time after interruptions.
179201

180202
On the restart, the tool resumes from the stored state (resume token for MongoDB, binlog position for MySQL/MariaDB, replication slot for PostgreSQL).
181203

182204
## Availability
183205

184206
- MongoDB: MongoDB Change Streams require a replica set or sharded cluster. See [Convert Standalone to Replica Set](https://www.mongodb.com/docs/manual/tutorial/convert-standalone-to-replica-set/).
185207
- MySQL/MariaDB: MySQL/MariaDB binlog-based incremental sync requires ROW or MIXED binlog format for proper event capturing.
186-
- PostgreSQL incremental sync requires logical replication enabled with a replication slot.
208+
- PostgreSQL: PostgreSQL incremental sync requires logical replication enabled with a replication slot.
209+
- Redis: Redis sync supports standalone and Sentinel setups but does not support Redis Cluster mode. Redis does not support resuming from the last synced state after a crash or interruption.
187210

188211
## Contributing
189212

cmd/sync/main.go

Lines changed: 9 additions & 199 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@ package main
22

33
import (
44
"context"
5-
"database/sql"
6-
"fmt"
75
"os"
86
"os/signal"
9-
"strings"
107
"sync"
118
"syscall"
129
"time"
@@ -16,9 +13,7 @@ import (
1613
"github.com/retail-ai-inc/sync/pkg/config"
1714
"github.com/retail-ai-inc/sync/pkg/logger"
1815
"github.com/retail-ai-inc/sync/pkg/syncer"
19-
"github.com/sirupsen/logrus"
20-
"go.mongodb.org/mongo-driver/mongo"
21-
"go.mongodb.org/mongo-driver/mongo/options"
16+
"github.com/retail-ai-inc/sync/pkg/utils"
2217
)
2318

2419
// Interval for row count monitoring every minute
@@ -74,6 +69,12 @@ func main() {
7469
syncer := syncer.NewPostgreSQLSyncer(syncCfg, log)
7570
syncer.Start(ctx)
7671
}(syncCfg)
72+
case "redis":
73+
go func(syncCfg config.SyncConfig) {
74+
defer wg.Done()
75+
syncer := syncer.NewRedisSyncer(syncCfg, log)
76+
syncer.Start(ctx)
77+
}(syncCfg)
7778
default:
7879
log.Errorf("Unknown sync type: %s", syncCfg.Type)
7980
wg.Done()
@@ -82,24 +83,7 @@ func main() {
8283

8384
// Start monitoring goroutine: output row counts every minute for each mapped table (source/target)
8485
if cfg.EnableTableRowCountMonitoring {
85-
go func() {
86-
ticker := time.NewTicker(monitorInterval)
87-
defer ticker.Stop()
88-
for {
89-
select {
90-
case <-ctx.Done():
91-
return
92-
case <-ticker.C:
93-
// For each sync config, gather row counts and log
94-
for _, sc := range cfg.SyncConfigs {
95-
if !sc.Enable {
96-
continue
97-
}
98-
countAndLogTables(ctx, sc)
99-
}
100-
}
101-
}
102-
}()
86+
utils.StartRowCountMonitoring(ctx, cfg, log, monitorInterval)
10387
}
10488

10589
// Wait for all sync to complete
@@ -109,178 +93,4 @@ func main() {
10993
// Wait for program to end
11094
<-ctx.Done()
11195
logger.Log.Info("Program has exited")
112-
}
113-
114-
// countAndLogTables logs row counts for a single database connection
115-
func countAndLogTables(ctx context.Context, sc config.SyncConfig) {
116-
switch strings.ToLower(sc.Type) {
117-
case "mysql", "mariadb":
118-
countAndLogMySQLOrMariaDB(ctx, sc)
119-
case "postgresql":
120-
countAndLogPostgreSQL(ctx, sc)
121-
case "mongodb":
122-
countAndLogMongoDB(ctx, sc)
123-
default:
124-
// Unsupported or not needed
125-
}
126-
}
127-
128-
// countAndLogMySQLOrMariaDB obtains row counts for MySQL / MariaDB tables
129-
func countAndLogMySQLOrMariaDB(ctx context.Context, sc config.SyncConfig) {
130-
db, err := sql.Open("mysql", sc.SourceConnection)
131-
if err != nil {
132-
logger.Log.WithError(err).WithField("db_type", sc.Type).
133-
Error("[Monitor] Fail connect to source")
134-
return
135-
}
136-
defer db.Close()
137-
138-
db2, err := sql.Open("mysql", sc.TargetConnection)
139-
if err != nil {
140-
logger.Log.WithError(err).WithField("db_type", sc.Type).
141-
Error("[Monitor] Fail connect to target")
142-
return
143-
}
144-
defer db2.Close()
145-
146-
dbType := strings.ToUpper(sc.Type) // "MYSQL" or "MARIADB"
147-
for _, mapping := range sc.Mappings {
148-
srcDBName := mapping.SourceDatabase
149-
tgtDBName := mapping.TargetDatabase
150-
for _, tblMap := range mapping.Tables {
151-
srcName := tblMap.SourceTable
152-
tgtName := tblMap.TargetTable
153-
154-
// Source table
155-
srcCount := getRowCount(db, fmt.Sprintf("%s.%s", srcDBName, srcName))
156-
// Target table
157-
tgtCount := getRowCount(db2, fmt.Sprintf("%s.%s", tgtDBName, tgtName))
158-
159-
logger.Log.WithFields(logrus.Fields{
160-
"db_type": dbType,
161-
"src_db": srcDBName,
162-
"src_table": srcName,
163-
"src_row_count": srcCount,
164-
"tgt_db": tgtDBName,
165-
"tgt_table": tgtName,
166-
"tgt_row_count": tgtCount,
167-
"monitor_action": "row_count_minutely",
168-
}).Info("row_count_minutely")
169-
}
170-
}
171-
}
172-
173-
// countAndLogPostgreSQL obtains row counts for PostgreSQL tables
174-
func countAndLogPostgreSQL(ctx context.Context, sc config.SyncConfig) {
175-
db, err := sql.Open("postgres", sc.SourceConnection)
176-
if err != nil {
177-
logger.Log.WithError(err).WithField("db_type", "POSTGRESQL").
178-
Error("[Monitor] Fail connect to source")
179-
return
180-
}
181-
defer db.Close()
182-
183-
db2, err := sql.Open("postgres", sc.TargetConnection)
184-
if err != nil {
185-
logger.Log.WithError(err).WithField("db_type", "POSTGRESQL").
186-
Error("[Monitor] Fail connect to target")
187-
return
188-
}
189-
defer db2.Close()
190-
191-
for _, mapping := range sc.Mappings {
192-
srcDBName := mapping.SourceDatabase
193-
tgtDBName := mapping.TargetDatabase
194-
srcSchema := mapping.SourceSchema
195-
if srcSchema == "" {
196-
srcSchema = "public"
197-
}
198-
tgtSchema := mapping.TargetSchema
199-
if tgtSchema == "" {
200-
tgtSchema = "public"
201-
}
202-
203-
for _, tblMap := range mapping.Tables {
204-
srcName := tblMap.SourceTable
205-
tgtName := tblMap.TargetTable
206-
207-
fullSrc := fmt.Sprintf("%s.%s", srcSchema, srcName)
208-
fullTgt := fmt.Sprintf("%s.%s", tgtSchema, tgtName)
209-
210-
srcCount := getRowCount(db, fullSrc)
211-
tgtCount := getRowCount(db2, fullTgt)
212-
213-
logger.Log.WithFields(logrus.Fields{
214-
"db_type": "POSTGRESQL",
215-
"src_schema": srcSchema,
216-
"src_table": srcName,
217-
"src_db": srcDBName,
218-
"src_row_count": srcCount,
219-
"tgt_schema": tgtSchema,
220-
"tgt_table": tgtName,
221-
"tgt_db": tgtDBName,
222-
"tgt_row_count": tgtCount,
223-
"monitor_action": "row_count_minutely",
224-
}).Info("row_count_minutely")
225-
}
226-
}
227-
}
228-
229-
// countAndLogMongoDB obtains document counts for MongoDB collections
230-
func countAndLogMongoDB(ctx context.Context, sc config.SyncConfig) {
231-
srcClient, err := mongo.Connect(ctx, options.Client().ApplyURI(sc.SourceConnection))
232-
if err != nil {
233-
logger.Log.WithError(err).WithField("db_type", "MONGODB").
234-
Error("[Monitor] Fail connect to source")
235-
return
236-
}
237-
defer func() {
238-
_ = srcClient.Disconnect(ctx)
239-
}()
240-
241-
tgtClient, err := mongo.Connect(ctx, options.Client().ApplyURI(sc.TargetConnection))
242-
if err != nil {
243-
logger.Log.WithError(err).WithField("db_type", "MONGODB").
244-
Error("[Monitor] Fail connect to target")
245-
return
246-
}
247-
defer func() {
248-
_ = tgtClient.Disconnect(ctx)
249-
}()
250-
251-
for _, mapping := range sc.Mappings {
252-
srcDBName := mapping.SourceDatabase
253-
tgtDBName := mapping.TargetDatabase
254-
for _, tblMap := range mapping.Tables {
255-
srcName := tblMap.SourceTable
256-
tgtName := tblMap.TargetTable
257-
258-
srcColl := srcClient.Database(srcDBName).Collection(srcName)
259-
tgtColl := tgtClient.Database(tgtDBName).Collection(tgtName)
260-
261-
srcCount, _ := srcColl.EstimatedDocumentCount(ctx)
262-
tgtCount, _ := tgtColl.EstimatedDocumentCount(ctx)
263-
264-
logger.Log.WithFields(logrus.Fields{
265-
"db_type": "MONGODB",
266-
"src_db": srcDBName,
267-
"src_coll": srcName,
268-
"src_row_count": srcCount,
269-
"tgt_db": tgtDBName,
270-
"tgt_coll": tgtName,
271-
"tgt_row_count": tgtCount,
272-
"monitor_action": "row_count_minutely",
273-
}).Info("row_count_minutely")
274-
}
275-
}
276-
}
277-
278-
// getRowCount is a general SQL count function
279-
func getRowCount(db *sql.DB, table string) int64 {
280-
query := fmt.Sprintf("SELECT COUNT(*) FROM %s", table)
281-
var cnt int64
282-
if err := db.QueryRow(query).Scan(&cnt); err != nil {
283-
return -1
284-
}
285-
return cnt
286-
}
96+
}

configs/config.yaml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,16 @@ sync_configs:
5353
target_schema: "public"
5454
tables:
5555
- source_table: "users"
56-
target_table: "users"
56+
target_table: "users"
57+
58+
- type: "redis"
59+
enable: true
60+
source_connection: "redis://localhost:6379/0"
61+
target_connection: "redis://localhost:6379/1"
62+
redis_position_path: "/tmp/state/redis_position"
63+
mappings:
64+
- source_database: "db0"
65+
target_database: "db1"
66+
tables:
67+
- source_table: "source_stream" # Redis Stream Name
68+
target_table: ""

0 commit comments

Comments
 (0)