Skip to content
Open
1 change: 1 addition & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ Agent.prototype._sendOp = function(collection, id, op) {
if ('op' in op) message.op = op.op;
if (op.create) message.create = op.create;
if (op.del) message.del = true;
if (op.m) message.m = op.m;

this.send(message);
};
Expand Down
12 changes: 8 additions & 4 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,11 @@ Backend.prototype._fetchSnapshot = function(collection, id, version, callback) {
var db = this.db;
var backend = this;

var options = {metadata: true};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should potentially consider snapshot metadata in a separate PR. It's more complicated than op metadata, because there's not currently a mechanism to propagate updated snapshot metadata to remote clients (which only receive ops to transform their local data, and don't receive the full snapshot — this is sort of the point of OT in general).

For example, I'd expect a passing test case for this case:

it('updates remote metadata', function(done) {
  backend.use('commit', (context, next) => {
    context.snapshot.m.updated = Date.now();
    next();
  });

  var connection1 = backend.connect();
  var connection2 = backend.connect();

  var doc1 = connection1.get(...);
  var doc2 = connection2.get(...);

  async.series([
    doc1.subscribe.bind(doc1),
    doc1.submitOp.bind(doc1, [{p: ['foo'], oi: 'bar'}]),
    doc2.subscribe.bind(doc2),
    function(next) {
      expect(doc1.metadata.updated).to.equal(doc2.metadata.updated);
      next();
    },
    function(next) {
      doc2.once('op', function() {
        expect(doc1.data).to.eql(doc2.data);
        expect(doc1.metadata.updated).to.equal(doc2.metadata.updated);
        next();
      });
      doc1.submitOp([{p: ['foo'], od: 'bar', oi: 'baz'}], errorHandler(next));
    },
  ], done);
});

var fields = null;
var shouldGetLatestSnapshot = version === null;
if (shouldGetLatestSnapshot) {
return backend.db.getSnapshot(collection, id, null, null, function(error, snapshot) {
return backend.db.getSnapshot(collection, id, fields, options, function(error, snapshot) {
if (error) return callback(error);

callback(null, snapshot);
Expand All @@ -789,7 +791,7 @@ Backend.prototype._fetchSnapshot = function(collection, id, version, callback) {
// - we want to avoid the 'op' middleware, because we later use the 'readSnapshots' middleware in _sanitizeSnapshots
// - we handle the projection in _sanitizeSnapshots
var from = milestoneSnapshot ? milestoneSnapshot.v : 0;
db.getOps(collection, id, from, version, null, function(error, ops) {
db.getOps(collection, id, from, version, options, function(error, ops) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we've put this in the right place? Shouldn't we be fetching op metadata in _getSanitizedOps() (and maybe its bulk counterpart)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that if a snapshot has some saved metadata and we do not pass fields and options to getSnapshot , the metadata will not be retrieved at all. Same with _getSanitizedOps, if we do not pass the options to getOps , the database layer will not retrieve the metadata and there will be nothing to sanitize. I have a very shallow understanding of the intent here, and maybe I am missing something fundamental. I could definitely use your help to understand this better. Thank you!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should basically not touch snapshots at all in this PR. As I mentioned in my other comment, snapshot metadata is a very different beast, and I think we should consider it separately to ops.

I think this belongs in _getSanitizedOps() with the call to backend.db.getOps().

if (error) return callback(error);

backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, function(error, snapshot) {
Expand Down Expand Up @@ -843,9 +845,12 @@ Backend.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp
var from = 0;
var to = null;

var options = {metadata: true};
var fields = null;

var shouldGetLatestSnapshot = timestamp === null;
if (shouldGetLatestSnapshot) {
return backend.db.getSnapshot(collection, id, null, null, function(error, snapshot) {
return backend.db.getSnapshot(collection, id, fields, options, function(error, snapshot) {
if (error) return callback(error);

callback(null, snapshot);
Expand All @@ -861,7 +866,6 @@ Backend.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp
if (error) return callback(error);
if (snapshot) to = snapshot.v;

var options = {metadata: true};
db.getOps(collection, id, from, to, options, function(error, ops) {
if (error) return callback(error);
filterOpsInPlaceBeforeTimestamp(ops, timestamp);
Expand Down
10 changes: 5 additions & 5 deletions lib/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ Doc.prototype._otApply = function(op, source) {

// NB: If we need to add another argument to this event, we should consider
// the fact that the 'op' event has op.src as its 3rd argument
this.emit('before op batch', op.op, source);
this.emit('before op batch', op.op, source, op.src, {op: op});

// Iteratively apply multi-component remote operations and rollback ops
// (source === false) for the default JSON0 OT type. It could use
Expand Down Expand Up @@ -637,24 +637,24 @@ Doc.prototype._otApply = function(op, source) {
this._setData(this.type.apply(this.data, componentOp.op));
this.emit('op', componentOp.op, source, op.src);
}
this.emit('op batch', op.op, source);
this.emit('op batch', op.op, source, op.src, {op: op});
// Pop whatever was submitted since we started applying this op
this._popApplyStack(stackLength);
return;
}

// The 'before op' event enables clients to pull any necessary data out of
// the snapshot before it gets changed
this.emit('before op', op.op, source, op.src);
this.emit('before op', op.op, source, op.src, {op: op});
// Apply the operation to the local data, mutating it in place
this._setData(this.type.apply(this.data, op.op));
// Emit an 'op' event once the local data includes the changes from the
// op. For locally submitted ops, this will be synchronously with
// submission and before the server or other clients have received the op.
// For ops from other clients, this will be after the op has been
// committed to the database and published
this.emit('op', op.op, source, op.src);
this.emit('op batch', op.op, source);
this.emit('op', op.op, source, op.src, {op: op});
this.emit('op batch', op.op, source, op.src, {op: op});
return;
}

Expand Down
37 changes: 36 additions & 1 deletion lib/submit-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ SubmitRequest.prototype.commit = function(callback) {
var op = request.op;
op.c = request.collection;
op.d = request.id;
op.m = undefined;
op.m = request._metadataProjection();
// Needed for agent to detect if it can ignore sending the op back to
// the client that submitted it in subscriptions
if (request.collection !== request.index) op.i = request.index;
Expand All @@ -185,6 +185,39 @@ SubmitRequest.prototype.commit = function(callback) {
});
};

SubmitRequest.prototype._metadataProjection = function() {
var request = this;

// Default behavior
if (!request.opMetadataProjection) {
return undefined;
}

// Granular projection
if (typeof request.opMetadataProjection === 'object') {
return request._granularMetadataProjection();
}

// Full projection
return request.op.m;
};

SubmitRequest.prototype._granularMetadataProjection = function() {
var request = this;
var metadataProjection = {};
for (var key in request.opMetadataProjection) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should at least add a code comment here that we only support a single level of projection.

Would be extra nice if we updated the docs to include this flag and notes on its use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment, and updated docs with an example.

var doProject = request.opMetadataProjection[key];
if (! doProject ) {
continue;
}

metadataProjection[key] = request.op.m[key];
}

return metadataProjection;
};


SubmitRequest.prototype.retry = function(callback) {
this.retries++;
if (this.maxRetries != null && this.retries > this.maxRetries) {
Expand Down Expand Up @@ -233,6 +266,8 @@ SubmitRequest.prototype._addSnapshotMeta = function() {
meta.ctime = this.start;
} else if (this.op.del) {
this.op.m.data = this.snapshot.data;
// break circular dependency if snapshot data already contains metadata
delete this.op.m.data._m;
}
meta.mtime = this.start;
};
Expand Down
186 changes: 186 additions & 0 deletions test/client/submit.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ var numberType = require('./number-type');
types.register(deserializedType.type);
types.register(deserializedType.type2);
types.register(numberType.type);
var util = require('../util');
var errorHandler = util.errorHandler;

module.exports = function() {
describe('client submit', function() {
Expand Down Expand Up @@ -1210,5 +1212,189 @@ module.exports = function() {
});
});
});

describe('metadata projection', function() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've flexed metadata on ops that were broadcast over pubsub, but ops can also be fetched when submitting an op from a stale document (that isn't subscribed, and is behind the server); can we test that the ops we get in that case also have their metadata correctly set?

This would need us to tweak SubmitRequest.submit().

Also, can we add a test case for an unsubscribed client calling fetch after another remote client has changed the doc: this is where you'll need the backend._getSanitizedOps() to handle metadata projection for us.

it('passed metadata to connect', function(done) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this test has anything to do with this PR?

var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

var connection = this.backend.connect(undefined, metadata);
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
done();
});
});

it('passed metadata to submit', function(done) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: I'm not sure this test has anything to do with this PR?

var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request) {
expect(request.agent.custom).eql(metadata);
done();
});

var connection = this.backend.connect(undefined, metadata);
var doc = null;
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
doc = connection.get('dogs', 'fido');
doc.create({name: 'fido'}, function() {
doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(done));
});
});
});

it('received local op without metadata', function(done) {
var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request, next) {
expect(request.agent.custom).eql(metadata);
Object.assign(request.op.m, request.agent.custom);
request.opMetadataProjection = {username: true};
next();
});

var connection = this.backend.connect(undefined, metadata);
var doc = null;
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
doc = connection.get('dogs', 'fido');
doc.create({name: 'fido'}, function() {
doc.on('op', function(op, source, src, context) {
if (src) {
return;
}
expect(context.op.m).equal(undefined);
done();
});
doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(function() {}));
});
});
});

it('concurrent changes', function(done) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please simplify this test case to just have two clients? I'm not sure there's a vast amount of benefit in having so many clients; it just makes the test harder to read. Would be nice to get rid of the for loops.

this.backend.use('connect', function(request, next) {
expect(request.req).to.have.property('username');
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request, next) {
expect(request.agent.custom).to.have.property('username');
Object.assign(request.op.m, request.agent.custom);
request.opMetadataProjection = {username: true};
next();
});

this.backend.use('apply', function(request, next) {
Object.assign(request.snapshot.m, request.op.m);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unneeded?

expect(request.op.m).to.have.property('username');
next();
});

this.backend.use('commit', function(request, next) {
expect(request.op.m).to.have.property('username');
next();
});

this.backend.use('afterWrite', function(request, next) {
expect(request.op.m).to.have.property('username');
next();
});

var subscriberCount = 10;
var subscriberOpCount = 10;

var metadatas = [];
for (var i = 0; i < subscriberCount; i++) {
metadatas[i] = {username: 'user-'+i};
}

var ops = [];
for (var i = 0; i < subscriberCount; i++) {
ops[i] = [];
for (var j = 0; j < subscriberOpCount; j++) {
ops[i].push({p: ['tricks '+i+' '+j], oi: 1});
}
}

var docs = [];

function submitOps() {
for (var j = 0; j < subscriberOpCount; j++) {
for (var i = 0; i < subscriberCount; i++) {
var doc = docs[i];
doc.submitOp([ops[i][j]], {source: 'src-'+i}, errorHandler(doneAfter));
}
}
}

function validateAndDone() {
var firstDoc = docs[0];
// validate that all documents across connections are in sync
for (var i = 1; i < subscriberCount; i++) {
var doc = docs[i];
expect(doc.data).eql(firstDoc.data);
}
done();
};

var submitOpsAfter = util.callAfter(subscriberCount - 1, submitOps);
var doneAfter = util.callAfter((subscriberCount * subscriberCount * subscriberOpCount) - 1, validateAndDone);

function getDoc(callback) {
var thisDoc = this;
thisDoc.fetch(function() {
if (!thisDoc.data) {
return thisDoc.create({}, function() {
thisDoc.subscribe(callback);
});
}
thisDoc.subscribe(callback);
});
}

for (var i = 0; i < subscriberCount; i++) {
var metadata = metadatas[i];

var connection = this.backend.connect(undefined, Object.assign({}, metadata));
connection.__test_metadata = Object.assign({}, metadata);
connection.__test_id = i;

connection.on('connected', function() {
var thisConnection = this;

expect(thisConnection.agent.custom).eql(thisConnection.__test_metadata);

thisConnection.doc = docs[thisConnection.__test_id] = thisConnection.get('dogs', 'fido');

thisConnection.doc.on('op', function(op, source, src, context) {
if (!src || !context) { // If I am the source there is no metadata to check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we send metadata back to the original client? The server might add context the sender wants (and which remote clients will get).

What's your use-case? Does it work without the original op submitter getting their metadata?

We could add the meta to the op acknowledgement.

return doneAfter();
}
var id = op[0].p[0].split(' ')[1];
expect(context.op.m).eql(metadatas[id]);
doneAfter();
});

getDoc.bind(thisConnection.doc)(submitOpsAfter);
});
}
});
});
});
};