-
Couldn't load subscription status.
- Fork 456
Feature/op metadata support #586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
56c4afb
1581dc6
71f36b1
f845c4f
06b290d
5939b98
6ba9af8
26c71ae
3bcbb05
2cb16ee
7954c44
fb28eed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -772,9 +772,11 @@ Backend.prototype._fetchSnapshot = function(collection, id, version, callback) { | |
| var db = this.db; | ||
| var backend = this; | ||
|
|
||
| var options = {metadata: true}; | ||
| var fields = null; | ||
| var shouldGetLatestSnapshot = version === null; | ||
| if (shouldGetLatestSnapshot) { | ||
| return backend.db.getSnapshot(collection, id, null, null, function(error, snapshot) { | ||
| return backend.db.getSnapshot(collection, id, fields, options, function(error, snapshot) { | ||
| if (error) return callback(error); | ||
|
|
||
| callback(null, snapshot); | ||
|
|
@@ -789,7 +791,7 @@ Backend.prototype._fetchSnapshot = function(collection, id, version, callback) { | |
| // - we want to avoid the 'op' middleware, because we later use the 'readSnapshots' middleware in _sanitizeSnapshots | ||
| // - we handle the projection in _sanitizeSnapshots | ||
| var from = milestoneSnapshot ? milestoneSnapshot.v : 0; | ||
| db.getOps(collection, id, from, version, null, function(error, ops) { | ||
| db.getOps(collection, id, from, version, options, function(error, ops) { | ||
|
||
| if (error) return callback(error); | ||
|
|
||
| backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, function(error, snapshot) { | ||
|
|
@@ -843,9 +845,12 @@ Backend.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp | |
| var from = 0; | ||
| var to = null; | ||
|
|
||
| var options = {metadata: true}; | ||
| var fields = null; | ||
|
|
||
| var shouldGetLatestSnapshot = timestamp === null; | ||
| if (shouldGetLatestSnapshot) { | ||
| return backend.db.getSnapshot(collection, id, null, null, function(error, snapshot) { | ||
| return backend.db.getSnapshot(collection, id, fields, options, function(error, snapshot) { | ||
| if (error) return callback(error); | ||
|
|
||
| callback(null, snapshot); | ||
|
|
@@ -861,7 +866,6 @@ Backend.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp | |
| if (error) return callback(error); | ||
| if (snapshot) to = snapshot.v; | ||
|
|
||
| var options = {metadata: true}; | ||
| db.getOps(collection, id, from, to, options, function(error, ops) { | ||
| if (error) return callback(error); | ||
| filterOpsInPlaceBeforeTimestamp(ops, timestamp); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -171,7 +171,7 @@ SubmitRequest.prototype.commit = function(callback) { | |
| var op = request.op; | ||
| op.c = request.collection; | ||
| op.d = request.id; | ||
| op.m = undefined; | ||
| op.m = request._metadataProjection(); | ||
| // Needed for agent to detect if it can ignore sending the op back to | ||
| // the client that submitted it in subscriptions | ||
| if (request.collection !== request.index) op.i = request.index; | ||
|
|
@@ -185,6 +185,39 @@ SubmitRequest.prototype.commit = function(callback) { | |
| }); | ||
| }; | ||
|
|
||
| SubmitRequest.prototype._metadataProjection = function() { | ||
| var request = this; | ||
|
|
||
| // Default behavior | ||
| if (!request.opMetadataProjection) { | ||
| return undefined; | ||
| } | ||
|
|
||
| // Granular projection | ||
| if (typeof request.opMetadataProjection === 'object') { | ||
| return request._granularMetadataProjection(); | ||
| } | ||
|
|
||
| // Full projection | ||
| return request.op.m; | ||
| }; | ||
|
|
||
| SubmitRequest.prototype._granularMetadataProjection = function() { | ||
| var request = this; | ||
| var metadataProjection = {}; | ||
| for (var key in request.opMetadataProjection) { | ||
|
||
| var doProject = request.opMetadataProjection[key]; | ||
| if (! doProject ) { | ||
| continue; | ||
| } | ||
|
|
||
| metadataProjection[key] = request.op.m[key]; | ||
maxfortun marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| return metadataProjection; | ||
| }; | ||
|
|
||
|
|
||
| SubmitRequest.prototype.retry = function(callback) { | ||
| this.retries++; | ||
| if (this.maxRetries != null && this.retries > this.maxRetries) { | ||
|
|
@@ -233,6 +266,8 @@ SubmitRequest.prototype._addSnapshotMeta = function() { | |
| meta.ctime = this.start; | ||
| } else if (this.op.del) { | ||
| this.op.m.data = this.snapshot.data; | ||
| // break circular dependency if snapshot data already contains metadata | ||
| delete this.op.m.data._m; | ||
| } | ||
| meta.mtime = this.start; | ||
| }; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,8 @@ var numberType = require('./number-type'); | |
| types.register(deserializedType.type); | ||
| types.register(deserializedType.type2); | ||
| types.register(numberType.type); | ||
| var util = require('../util'); | ||
| var errorHandler = util.errorHandler; | ||
|
|
||
| module.exports = function() { | ||
| describe('client submit', function() { | ||
|
|
@@ -1210,5 +1212,189 @@ module.exports = function() { | |
| }); | ||
| }); | ||
| }); | ||
|
|
||
| describe('metadata projection', function() { | ||
|
||
| it('passed metadata to connect', function(done) { | ||
|
||
| var metadata = {username: 'user'}; | ||
|
|
||
| this.backend.use('connect', function(request, next) { | ||
| Object.assign(request.agent.custom, request.req); | ||
| next(); | ||
| }); | ||
|
|
||
| var connection = this.backend.connect(undefined, metadata); | ||
| connection.on('connected', function() { | ||
| expect(connection.agent.custom).eql(metadata); | ||
| done(); | ||
| }); | ||
| }); | ||
|
|
||
| it('passed metadata to submit', function(done) { | ||
|
||
| var metadata = {username: 'user'}; | ||
|
|
||
| this.backend.use('connect', function(request, next) { | ||
| Object.assign(request.agent.custom, request.req); | ||
| next(); | ||
| }); | ||
|
|
||
| this.backend.use('submit', function(request) { | ||
| expect(request.agent.custom).eql(metadata); | ||
| done(); | ||
| }); | ||
|
|
||
| var connection = this.backend.connect(undefined, metadata); | ||
| var doc = null; | ||
| connection.on('connected', function() { | ||
| expect(connection.agent.custom).eql(metadata); | ||
| doc = connection.get('dogs', 'fido'); | ||
| doc.create({name: 'fido'}, function() { | ||
| doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(done)); | ||
| }); | ||
| }); | ||
| }); | ||
|
|
||
| it('received local op without metadata', function(done) { | ||
| var metadata = {username: 'user'}; | ||
|
|
||
| this.backend.use('connect', function(request, next) { | ||
| Object.assign(request.agent.custom, request.req); | ||
| next(); | ||
| }); | ||
|
|
||
| this.backend.use('submit', function(request, next) { | ||
| expect(request.agent.custom).eql(metadata); | ||
| Object.assign(request.op.m, request.agent.custom); | ||
| request.opMetadataProjection = {username: true}; | ||
| next(); | ||
| }); | ||
|
|
||
| var connection = this.backend.connect(undefined, metadata); | ||
| var doc = null; | ||
| connection.on('connected', function() { | ||
| expect(connection.agent.custom).eql(metadata); | ||
| doc = connection.get('dogs', 'fido'); | ||
| doc.create({name: 'fido'}, function() { | ||
| doc.on('op', function(op, source, src, context) { | ||
| if (src) { | ||
| return; | ||
| } | ||
| expect(context.op.m).equal(undefined); | ||
| done(); | ||
| }); | ||
| doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(function() {})); | ||
| }); | ||
| }); | ||
| }); | ||
|
|
||
| it('concurrent changes', function(done) { | ||
|
||
| this.backend.use('connect', function(request, next) { | ||
| expect(request.req).to.have.property('username'); | ||
| Object.assign(request.agent.custom, request.req); | ||
| next(); | ||
| }); | ||
|
|
||
| this.backend.use('submit', function(request, next) { | ||
| expect(request.agent.custom).to.have.property('username'); | ||
| Object.assign(request.op.m, request.agent.custom); | ||
| request.opMetadataProjection = {username: true}; | ||
| next(); | ||
| }); | ||
|
|
||
| this.backend.use('apply', function(request, next) { | ||
| Object.assign(request.snapshot.m, request.op.m); | ||
|
||
| expect(request.op.m).to.have.property('username'); | ||
| next(); | ||
| }); | ||
|
|
||
| this.backend.use('commit', function(request, next) { | ||
| expect(request.op.m).to.have.property('username'); | ||
| next(); | ||
| }); | ||
|
|
||
| this.backend.use('afterWrite', function(request, next) { | ||
| expect(request.op.m).to.have.property('username'); | ||
| next(); | ||
| }); | ||
|
|
||
| var subscriberCount = 10; | ||
| var subscriberOpCount = 10; | ||
|
|
||
| var metadatas = []; | ||
| for (var i = 0; i < subscriberCount; i++) { | ||
| metadatas[i] = {username: 'user-'+i}; | ||
| } | ||
|
|
||
| var ops = []; | ||
| for (var i = 0; i < subscriberCount; i++) { | ||
| ops[i] = []; | ||
| for (var j = 0; j < subscriberOpCount; j++) { | ||
| ops[i].push({p: ['tricks '+i+' '+j], oi: 1}); | ||
| } | ||
| } | ||
|
|
||
| var docs = []; | ||
|
|
||
| function submitOps() { | ||
| for (var j = 0; j < subscriberOpCount; j++) { | ||
| for (var i = 0; i < subscriberCount; i++) { | ||
| var doc = docs[i]; | ||
| doc.submitOp([ops[i][j]], {source: 'src-'+i}, errorHandler(doneAfter)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| function validateAndDone() { | ||
| var firstDoc = docs[0]; | ||
| // validate that all documents across connections are in sync | ||
| for (var i = 1; i < subscriberCount; i++) { | ||
| var doc = docs[i]; | ||
| expect(doc.data).eql(firstDoc.data); | ||
| } | ||
| done(); | ||
| }; | ||
|
|
||
| var submitOpsAfter = util.callAfter(subscriberCount - 1, submitOps); | ||
| var doneAfter = util.callAfter((subscriberCount * subscriberCount * subscriberOpCount) - 1, validateAndDone); | ||
|
|
||
| function getDoc(callback) { | ||
| var thisDoc = this; | ||
| thisDoc.fetch(function() { | ||
| if (!thisDoc.data) { | ||
| return thisDoc.create({}, function() { | ||
| thisDoc.subscribe(callback); | ||
| }); | ||
| } | ||
| thisDoc.subscribe(callback); | ||
| }); | ||
| } | ||
|
|
||
| for (var i = 0; i < subscriberCount; i++) { | ||
| var metadata = metadatas[i]; | ||
|
|
||
| var connection = this.backend.connect(undefined, Object.assign({}, metadata)); | ||
| connection.__test_metadata = Object.assign({}, metadata); | ||
| connection.__test_id = i; | ||
|
|
||
| connection.on('connected', function() { | ||
| var thisConnection = this; | ||
|
|
||
| expect(thisConnection.agent.custom).eql(thisConnection.__test_metadata); | ||
|
|
||
| thisConnection.doc = docs[thisConnection.__test_id] = thisConnection.get('dogs', 'fido'); | ||
|
|
||
| thisConnection.doc.on('op', function(op, source, src, context) { | ||
| if (!src || !context) { // If I am the source there is no metadata to check | ||
|
||
| return doneAfter(); | ||
| } | ||
| var id = op[0].p[0].split(' ')[1]; | ||
| expect(context.op.m).eql(metadatas[id]); | ||
| doneAfter(); | ||
| }); | ||
|
|
||
| getDoc.bind(thisConnection.doc)(submitOpsAfter); | ||
| }); | ||
| } | ||
| }); | ||
| }); | ||
| }); | ||
| }; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should potentially consider snapshot metadata in a separate PR. It's more complicated than op metadata, because there's not currently a mechanism to propagate updated snapshot metadata to remote clients (which only receive ops to transform their local data, and don't receive the full snapshot — this is sort of the point of OT in general).
For example, I'd expect a passing test case for this case: