Skip to content

Commit 0a2a9ff

Browse files
Merge pull request #42 from retail-ai-inc/fix/mongodb-ignore-delete-ops
fix(mongodb): restore ignoreDeleteOps functionality for BSON format
2 parents acc438d + f439cf1 commit 0a2a9ff

File tree

1 file changed

+22
-3
lines changed

1 file changed

+22
-3
lines changed

pkg/syncer/mongodb/mongodb.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,6 +1160,14 @@ func (s *MongoDBSyncer) convertRawBSONToWriteModel(rawData bson.Raw, sourceDB, c
11601160
return mongo.NewReplaceOneModel().SetFilter(bson.M{"_id": docID}).SetReplacement(fullDoc).SetUpsert(true)
11611161
}
11621162
case "delete":
1163+
// Check if delete operations should be ignored for this collection
1164+
advancedSettings := s.findTableAdvancedSettings(collectionName)
1165+
if advancedSettings.IgnoreDeleteOps {
1166+
s.logger.Debugf("[MongoDB] Ignoring delete operation for %s.%s (ignoreDeleteOps=true)",
1167+
sourceDB, collectionName)
1168+
return nil
1169+
}
1170+
11631171
var docID interface{}
11641172
if dk, ok := event["documentKey"].(bson.M); ok {
11651173
docID = dk["_id"]
@@ -1899,15 +1907,18 @@ func (s *MongoDBSyncer) processDeadLetterBatch(ctx context.Context, filePath str
18991907
var stillFailedOps []FailedOperation
19001908

19011909
for _, op := range operationsToRetry {
1902-
writeModel, err := s.deserializeWriteModel(op.WriteModel)
1910+
writeModel, err := s.deserializeWriteModel(op.WriteModel, collectionName)
19031911
if err != nil {
19041912
s.logger.Warnf("[MongoDB] Failed to deserialize WriteModel for operation %s: %v", op.ID, err)
19051913
op.RetryCount++
19061914
stillFailedOps = append(stillFailedOps, op)
19071915
continue
19081916
}
19091917

1910-
retryModels = append(retryModels, writeModel)
1918+
// Skip nil models (e.g., when delete operations are ignored)
1919+
if writeModel != nil {
1920+
retryModels = append(retryModels, writeModel)
1921+
}
19111922
}
19121923

19131924
// Execute retry operations
@@ -1958,7 +1969,7 @@ func (s *MongoDBSyncer) processDeadLetterBatch(ctx context.Context, filePath str
19581969
}
19591970

19601971
// deserializeWriteModel converts JSON back to WriteModel
1961-
func (s *MongoDBSyncer) deserializeWriteModel(data json.RawMessage) (mongo.WriteModel, error) {
1972+
func (s *MongoDBSyncer) deserializeWriteModel(data json.RawMessage, collectionName string) (mongo.WriteModel, error) {
19621973
var modelData map[string]interface{}
19631974
if err := json.Unmarshal(data, &modelData); err != nil {
19641975
return nil, fmt.Errorf("failed to unmarshal model data: %w", err)
@@ -2012,6 +2023,14 @@ func (s *MongoDBSyncer) deserializeWriteModel(data json.RawMessage) (mongo.Write
20122023
return replaceModel, nil
20132024

20142025
case "delete":
2026+
// Check if delete operations should be ignored for this collection
2027+
advancedSettings := s.findTableAdvancedSettings(collectionName)
2028+
if advancedSettings.IgnoreDeleteOps {
2029+
s.logger.Debugf("[MongoDB] Ignoring delete operation during retry for %s (ignoreDeleteOps=true)",
2030+
collectionName)
2031+
return nil, nil
2032+
}
2033+
20152034
filter, ok := modelData["filter"]
20162035
if !ok {
20172036
return nil, fmt.Errorf("missing filter for delete operation")

0 commit comments

Comments
 (0)