Skip to content

[Postgres + MongoDB] Resumable replication #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fc1d92c
Refactor pgwire types.
rkistner May 26, 2025
76e4e2d
Add SnapshotQuery support for postgres again.
rkistner May 26, 2025
951c951
Add test again.
rkistner May 26, 2025
95b2095
Merge remote-tracking branch 'origin/main' into resumable-replication-2
rkistner May 29, 2025
0d248fb
WIP: Record replication progress for Postgres.
rkistner May 29, 2025
2aad7e7
Avoid promoteBuffers.
rkistner May 29, 2025
c1fcf74
MongoDB: Skip tables already replicated.
rkistner Jun 2, 2025
3372eec
Refactor MongoDB snapshot queries.
rkistner Jun 2, 2025
66188ab
Chunk MongoDB queries by _id.
rkistner Jun 2, 2025
3928604
Fix query typo.
rkistner Jun 2, 2025
dc02713
Get a resume token instead of clusterTime for initial snapshot.
rkistner Jun 2, 2025
a369964
Replication logging improvements.
rkistner Jun 2, 2025
0460455
Define a log prefix on child loggers.
rkistner Jun 2, 2025
959ad11
Make sure the change stream is always closed.
rkistner Jun 2, 2025
eaac127
tryNext(), not hasNext().
rkistner Jun 2, 2025
e084aef
Separate storage for snapshot LSN; fix snapshot resumeToken.
rkistner Jun 3, 2025
b2bde45
Improve test.
rkistner Jun 3, 2025
1117a13
Merge remote-tracking branch 'origin/main' into resumable-replication-2
rkistner Jun 3, 2025
fc8fe2e
Merge remote-tracking branch 'origin/main' into resumable-replication-2
rkistner Jun 3, 2025
b7efb76
Implement re-replication for postgres storage.
rkistner Jun 3, 2025
321c1c2
Some test cleanup.
rkistner Jun 3, 2025
a4958a8
Keepalive after table snapshot to fix schema tests.
rkistner Jun 3, 2025
8a1ef29
Test cleanup.
rkistner Jun 3, 2025
5dd028b
Add some mongodb replication tests.
rkistner Jun 3, 2025
a05ceea
Improve postgres replication abort logic and progress reporting.
rkistner Jun 4, 2025
35d7c31
Split out tests.
rkistner Jun 4, 2025
db64a37
Add tests for resuming mongodb snapshots.
rkistner Jun 4, 2025
8b5d03b
Clear mongo storage if we're resuming replication without an LSN.
rkistner Jun 4, 2025
ab2fd45
Correctly persist snapshot progress in postgres storage.
rkistner Jun 4, 2025
bb223d5
Fix down migrations on first run.
rkistner Jun 4, 2025
3ce8e0e
Fix migration tests.
rkistner Jun 4, 2025
5070e76
Address copilot comments.
rkistner Jun 4, 2025
ff396c2
Add changeset.
rkistner Jun 4, 2025
ff83c55
Add comments addressing review feedback.
rkistner Jun 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions libs/lib-services/src/logger/Logger.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
import winston from 'winston';

const prefixFormat = winston.format((info) => {
if (info.prefix) {
info.message = `${info.prefix}${info.message}`;
}
return {
...info,
prefix: undefined
};
});

export namespace LogFormat {
export const development = winston.format.combine(winston.format.colorize({ level: true }), winston.format.simple());
export const production = winston.format.combine(winston.format.timestamp(), winston.format.json());
export const development = winston.format.combine(
prefixFormat(),
winston.format.colorize({ level: true }),
winston.format.simple()
);
export const production = winston.format.combine(prefixFormat(), winston.format.timestamp(), winston.format.json());
}

export const logger = winston.createLogger();
Expand Down
5 changes: 4 additions & 1 deletion libs/lib-services/src/migrations/AbstractMigrationAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export abstract class AbstractMigrationAgent<Generics extends MigrationAgentGene
try {
const state = await this.store.load();

logger.info('Running migrations');
logger.info(`Running migrations ${direction}`);
const logStream = this.execute({
direction,
migrations,
Expand Down Expand Up @@ -142,6 +142,9 @@ export abstract class AbstractMigrationAgent<Generics extends MigrationAgentGene
) {
index += 1;
}
} else if (params.direction == defs.Direction.Down) {
// Down migration with no state - exclude all migrations
index = migrations.length;
}

migrations = migrations.slice(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ export class MongoBucketStorage
no_checkpoint_before: null,
keepalive_op: null,
snapshot_done: false,
snapshot_lsn: undefined,
state: storage.SyncRuleState.PROCESSING,
slot_name: slot_name,
last_checkpoint_ts: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ import {
container,
ErrorCode,
errors,
logger,
Logger,
logger as defaultLogger,
ReplicationAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
import { deserializeBson, InternalOpId, SaveOperationTag, storage, utils } from '@powersync/service-core';
import {
BucketStorageMarkRecordUnavailable,
deserializeBson,
InternalOpId,
SaveOperationTag,
storage,
utils
} from '@powersync/service-core';
import * as timers from 'node:timers/promises';
import { PowerSyncMongo } from './db.js';
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
Expand Down Expand Up @@ -46,12 +54,18 @@ export interface MongoBucketBatchOptions {
* Set to true for initial replication.
*/
skipExistingRows: boolean;

markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;

logger?: Logger;
}

export class MongoBucketBatch
extends BaseObserver<storage.BucketBatchStorageListener>
implements storage.BucketStorageBatch
{
private logger: Logger;

private readonly client: mongo.MongoClient;
public readonly db: PowerSyncMongo;
public readonly session: mongo.ClientSession;
Expand All @@ -65,6 +79,7 @@ export class MongoBucketBatch

private batch: OperationBatch | null = null;
private write_checkpoint_batch: storage.CustomWriteCheckpointOptions[] = [];
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;

/**
* Last LSN received associated with a checkpoint.
Expand All @@ -86,6 +101,7 @@ export class MongoBucketBatch

constructor(options: MongoBucketBatchOptions) {
super();
this.logger = options.logger ?? defaultLogger;
this.client = options.db.client;
this.db = options.db;
this.group_id = options.groupId;
Expand All @@ -96,6 +112,7 @@ export class MongoBucketBatch
this.sync_rules = options.syncRules;
this.storeCurrentData = options.storeCurrentData;
this.skipExistingRows = options.skipExistingRows;
this.markRecordUnavailable = options.markRecordUnavailable;
this.batch = new OperationBatch();

this.persisted_op = options.keepaliveOp ?? null;
Expand Down Expand Up @@ -232,7 +249,9 @@ export class MongoBucketBatch
current_data_lookup.set(cacheKey(doc._id.t, doc._id.k), doc);
}

let persistedBatch: PersistedBatch | null = new PersistedBatch(this.group_id, transactionSize);
let persistedBatch: PersistedBatch | null = new PersistedBatch(this.group_id, transactionSize, {
logger: this.logger
});

for (let op of b) {
if (resumeBatch) {
Expand Down Expand Up @@ -311,11 +330,18 @@ export class MongoBucketBatch
// Not an error if we re-apply a transaction
existing_buckets = [];
existing_lookups = [];
// Log to help with debugging if there was a consistency issue
if (this.storeCurrentData) {
logger.warn(
`Cannot find previous record for update on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
);
if (this.markRecordUnavailable != null) {
// This will trigger a "resnapshot" of the record.
// This is not relevant if storeCurrentData is false, since we'll get the full row
// directly in the replication stream.
this.markRecordUnavailable(record);
} else {
// Log to help with debugging if there was a consistency issue
this.logger.warn(
`Cannot find previous record for update on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
);
}
}
} else {
existing_buckets = result.buckets;
Expand All @@ -332,8 +358,8 @@ export class MongoBucketBatch
existing_buckets = [];
existing_lookups = [];
// Log to help with debugging if there was a consistency issue
if (this.storeCurrentData) {
logger.warn(
if (this.storeCurrentData && this.markRecordUnavailable == null) {
this.logger.warn(
`Cannot find previous record for delete on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
);
}
Expand Down Expand Up @@ -430,7 +456,7 @@ export class MongoBucketBatch
}
}
);
logger.error(
this.logger.error(
`Failed to evaluate data query on ${record.sourceTable.qualifiedName}.${record.after?.id}: ${error.error}`
);
}
Expand Down Expand Up @@ -470,7 +496,7 @@ export class MongoBucketBatch
}
}
);
logger.error(
this.logger.error(
`Failed to evaluate parameter query on ${record.sourceTable.qualifiedName}.${after.id}: ${error.error}`
);
}
Expand Down Expand Up @@ -524,7 +550,7 @@ export class MongoBucketBatch
if (e instanceof mongo.MongoError && e.hasErrorLabel('TransientTransactionError')) {
// Likely write conflict caused by concurrent write stream replicating
} else {
logger.warn('Transaction error', e as Error);
this.logger.warn('Transaction error', e as Error);
}
await timers.setTimeout(Math.random() * 50);
throw e;
Expand All @@ -549,7 +575,7 @@ export class MongoBucketBatch
await this.withTransaction(async () => {
flushTry += 1;
if (flushTry % 10 == 0) {
logger.info(`${this.slot_name} ${description} - try ${flushTry}`);
this.logger.info(`${description} - try ${flushTry}`);
}
if (flushTry > 20 && Date.now() > lastTry) {
throw new ServiceError(ErrorCode.PSYNC_S1402, 'Max transaction tries exceeded');
Expand Down Expand Up @@ -619,13 +645,13 @@ export class MongoBucketBatch
if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
// When re-applying transactions, don't create a new checkpoint until
// we are past the last transaction.
logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`);
this.logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`);
// Cannot create a checkpoint yet - return false
return false;
}
if (lsn < this.no_checkpoint_before_lsn) {
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
logger.info(
this.logger.info(
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
);
this.lastWaitingLogThottled = Date.now();
Expand Down Expand Up @@ -677,7 +703,8 @@ export class MongoBucketBatch
_id: this.group_id
},
{
$set: update
$set: update,
$unset: { snapshot_lsn: 1 }
},
{ session: this.session }
);
Expand All @@ -699,7 +726,7 @@ export class MongoBucketBatch
if (this.persisted_op != null) {
// The commit may have been skipped due to "no_checkpoint_before_lsn".
// Apply it now if relevant
logger.info(`Commit due to keepalive at ${lsn} / ${this.persisted_op}`);
this.logger.info(`Commit due to keepalive at ${lsn} / ${this.persisted_op}`);
return await this.commit(lsn);
}

Expand All @@ -713,7 +740,8 @@ export class MongoBucketBatch
snapshot_done: true,
last_fatal_error: null,
last_keepalive_ts: new Date()
}
},
$unset: { snapshot_lsn: 1 }
},
{ session: this.session }
);
Expand All @@ -722,6 +750,22 @@ export class MongoBucketBatch
return true;
}

async setSnapshotLsn(lsn: string): Promise<void> {
const update: Partial<SyncRuleDocument> = {
snapshot_lsn: lsn
};

await this.db.sync_rules.updateOne(
{
_id: this.group_id
},
{
$set: update
},
{ session: this.session }
);
}

async save(record: storage.SaveOptions): Promise<storage.FlushedResult | null> {
const { after, before, sourceTable, tag } = record;
for (const event of this.getTableEvents(sourceTable)) {
Expand All @@ -746,7 +790,7 @@ export class MongoBucketBatch
return null;
}

logger.debug(`Saving ${record.tag}:${record.before?.id}/${record.after?.id}`);
this.logger.debug(`Saving ${record.tag}:${record.before?.id}/${record.after?.id}`);

this.batch ??= new OperationBatch();
this.batch.push(new RecordOperation(record));
Expand Down Expand Up @@ -817,7 +861,7 @@ export class MongoBucketBatch
session: session
});
const batch = await cursor.toArray();
const persistedBatch = new PersistedBatch(this.group_id, 0);
const persistedBatch = new PersistedBatch(this.group_id, 0, { logger: this.logger });

for (let value of batch) {
persistedBatch.saveBucketData({
Expand Down Expand Up @@ -847,6 +891,37 @@ export class MongoBucketBatch
return last_op!;
}

async updateTableProgress(
table: storage.SourceTable,
progress: Partial<storage.TableSnapshotStatus>
): Promise<storage.SourceTable> {
const copy = table.clone();
const snapshotStatus = {
totalEstimatedCount: progress.totalEstimatedCount ?? copy.snapshotStatus?.totalEstimatedCount ?? 0,
replicatedCount: progress.replicatedCount ?? copy.snapshotStatus?.replicatedCount ?? 0,
lastKey: progress.lastKey ?? copy.snapshotStatus?.lastKey ?? null
};
copy.snapshotStatus = snapshotStatus;

await this.withTransaction(async () => {
await this.db.source_tables.updateOne(
{ _id: table.id },
{
$set: {
snapshot_status: {
last_key: snapshotStatus.lastKey == null ? null : new bson.Binary(snapshotStatus.lastKey),
total_estimated_count: snapshotStatus.totalEstimatedCount,
replicated_count: snapshotStatus.replicatedCount
}
}
},
{ session: this.session }
);
});

return copy;
}

async markSnapshotDone(tables: storage.SourceTable[], no_checkpoint_before_lsn: string) {
const session = this.session;
const ids = tables.map((table) => table.id);
Expand All @@ -857,6 +932,9 @@ export class MongoBucketBatch
{
$set: {
snapshot_done: true
},
$unset: {
snapshot_status: 1
}
},
{ session }
Expand All @@ -880,17 +958,8 @@ export class MongoBucketBatch
}
});
return tables.map((table) => {
const copy = new storage.SourceTable(
table.id,
table.connectionTag,
table.objectId,
table.schema,
table.table,
table.replicaIdColumns,
table.snapshotComplete
);
copy.syncData = table.syncData;
copy.syncParameters = table.syncParameters;
const copy = table.clone();
copy.snapshotComplete = true;
return copy;
});
}
Expand Down
Loading