Skip to content

Avoid change streams on the storage database #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.createCheckpointEventsCollection();

await db.write_checkpoints.createIndex(
{
processed_at_lsn: 1
},
{ name: 'processed_at_lsn' }
);

await db.custom_write_checkpoints.createIndex(
{
op_id: 1
},
{ name: 'op_id' }
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
if (await db.write_checkpoints.indexExists('processed_at_lsn')) {
await db.write_checkpoints.dropIndex('processed_at_lsn');
}
if (await db.custom_write_checkpoints.indexExists('op_id')) {
await db.custom_write_checkpoints.dropIndex('op_id');
}
await db.db.dropCollection('checkpoint_events');
} finally {
await db.client.close();
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ export class MongoBucketStorage
}
}
);
await this.db.notifyCheckpoint();
} else if (next == null && active?.id == sync_rules_group_id) {
// Slot removed for "active" sync rules, while there is no "next" one.
await this.updateSyncRules({
Expand All @@ -141,6 +142,7 @@ export class MongoBucketStorage
}
}
);
await this.db.notifyCheckpoint();
} else if (next != null && active?.id == sync_rules_group_id) {
// Already have next sync rules, but need to stop replicating the active one.

Expand All @@ -155,6 +157,7 @@ export class MongoBucketStorage
}
}
);
await this.db.notifyCheckpoint();
}
}

Expand Down Expand Up @@ -216,6 +219,7 @@ export class MongoBucketStorage
last_keepalive_ts: null
};
await this.db.sync_rules.insertOne(doc);
await this.db.notifyCheckpoint();
rules = new MongoPersistedSyncRulesContent(this.db, doc);
if (options.lock) {
const lock = await rules.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ export class MongoBucketBatch
result = r;
}
}
await batchCreateCustomWriteCheckpoints(this.db, this.write_checkpoint_batch);
this.write_checkpoint_batch = [];
return result;
}

Expand All @@ -139,6 +137,11 @@ export class MongoBucketBatch
await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => {
resumeBatch = await this.replicateBatch(session, batch, opSeq, options);

if (this.write_checkpoint_batch.length > 0) {
await batchCreateCustomWriteCheckpoints(this.db, session, this.write_checkpoint_batch, opSeq.next());
this.write_checkpoint_batch = [];
}

last_op = opSeq.last();
});

Expand Down Expand Up @@ -601,6 +604,7 @@ export class MongoBucketBatch
},
{ session }
);
// We don't notify checkpoint here - we don't make any checkpoint updates directly
});
}

Expand Down Expand Up @@ -648,6 +652,7 @@ export class MongoBucketBatch
},
{ session: this.session }
);
await this.db.notifyCheckpoint();

// Cannot create a checkpoint yet - return false
return false;
Expand All @@ -672,6 +677,23 @@ export class MongoBucketBatch
update.last_checkpoint = this.persisted_op;
}

// Mark relevant write checkpoints as "processed".
// This makes it easier to identify write checkpoints that are "valid" in order.
await this.db.write_checkpoints.updateMany(
{
processed_at_lsn: null,
'lsns.1': { $lte: lsn }
},
{
$set: {
processed_at_lsn: lsn
}
},
{
session: this.session
}
);

await this.db.sync_rules.updateOne(
{
_id: this.group_id
Expand All @@ -681,6 +703,7 @@ export class MongoBucketBatch
},
{ session: this.session }
);
await this.db.notifyCheckpoint();
this.persisted_op = null;
this.last_checkpoint_lsn = lsn;
return true;
Expand Down Expand Up @@ -717,6 +740,7 @@ export class MongoBucketBatch
},
{ session: this.session }
);
await this.db.notifyCheckpoint();
this.last_checkpoint_lsn = lsn;

return true;
Expand Down
Loading
Loading