Skip to content

Commit b44f11a

Browse files
denis631MongoDB Bot
authored andcommitted
SERVER-95976 Introduce "matchCollectionUUIDForUpdateLookup" parameter in the change stream stage (#33332)
GitOrigin-RevId: 41d5ad6de82fa64a746aedc16c8f1a672bbf526b
1 parent 4fd0c45 commit b44f11a

32 files changed

+470
-216
lines changed

etc/backports_required_for_multiversion_tests.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,22 @@ last-continuous:
573573
ticket: SERVER-95670
574574
- test_file: jstests/sharding/query/owning_shard_expression.js
575575
ticket: SERVER-95670
576+
- test_file: jstests/core/write/update/upsert_duplicate_key_retry_collation.js
577+
ticket: SERVER-84089
578+
- test_file: jstests/change_streams/update_lookup_before_ddl.js
579+
ticket: SERVER-95976
580+
- test_file: jstests/change_streams/lookup_post_image.js
581+
ticket: SERVER-95976
582+
- test_file: jstests/sharding/change_stream_update_lookup_collation.js
583+
ticket: SERVER-95976
584+
- test_file: jstests/concurrency/fsm_workloads/upsert_unique_index_collation.js
585+
ticket: SERVER-94156
586+
- test_file: jstests/core/index/geo/geo_2d_point_near_zero.js
587+
ticket: SERVER-92930
588+
- test_file: jstests/replsets/sync_source_clears_post_failure.js
589+
ticket: SERVER-91960
590+
- test_file: jstests/sharding/catalog_cache_refresh_with_persisted_collection_cache_corrupted.js
591+
ticket: SERVER-95807
576592
- test_file: jstests/sharding/timeseries_query_extended_range.js
577593
ticket: SERVER-73641
578594
- test_file: jstests/core/timeseries/timeseries_index_partial.js
@@ -1206,6 +1222,22 @@ last-lts:
12061222
ticket: SERVER-95670
12071223
- test_file: jstests/sharding/query/owning_shard_expression.js
12081224
ticket: SERVER-95670
1225+
- test_file: jstests/core/write/update/upsert_duplicate_key_retry_collation.js
1226+
ticket: SERVER-84089
1227+
- test_file: jstests/change_streams/update_lookup_before_ddl.js
1228+
ticket: SERVER-95976
1229+
- test_file: jstests/change_streams/lookup_post_image.js
1230+
ticket: SERVER-95976
1231+
- test_file: jstests/sharding/change_stream_update_lookup_collation.js
1232+
ticket: SERVER-95976
1233+
- test_file: jstests/concurrency/fsm_workloads/upsert_unique_index_collation.js
1234+
ticket: SERVER-94156
1235+
- test_file: jstests/core/index/geo/geo_2d_point_near_zero.js
1236+
ticket: SERVER-92930
1237+
- test_file: jstests/replsets/sync_source_clears_post_failure.js
1238+
ticket: SERVER-91960
1239+
- test_file: jstests/sharding/catalog_cache_refresh_with_persisted_collection_cache_corrupted.js
1240+
ticket: SERVER-95807
12091241
- test_file: jstests/sharding/timeseries_query_extended_range.js
12101242
ticket: SERVER-73641
12111243
- test_file: jstests/core/timeseries/timeseries_index_partial.js

jstests/change_streams/lookup_post_image.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,11 @@ assertCreateCollection(db, coll.getName());
188188
// collection.
189189
assert.commandWorked(coll.insert({_id: "fullDocument is lookup 2"}));
190190

191+
// If this test is running with secondary read preference, it's necessary for the insert
192+
// to propagate to all secondary nodes and be available for majority reads before we can
193+
// assume looking up the document will succeed.
194+
FixtureHelpers.awaitLastOpCommitted(db);
195+
191196
// After a collection has been dropped and re-created, verify a change stream can be created with
192197
// 'fullDocument: updateLookup' using a resume token from before the collection was dropped.
193198
cursor = cst.startWatchingChanges({
@@ -205,13 +210,13 @@ assert.eq(latestChange.operationType, "insert");
205210
assert(latestChange.hasOwnProperty("fullDocument"));
206211
assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"});
207212

208-
// The next entry is the 'update' operation. Confirm that the next entry's post-image is null
209-
// because the original collection (i.e. the collection that the 'update' was applied to) has
210-
// been dropped and the new incarnation of the collection has a different UUID.
213+
// The next entry is the 'update' operation. Confirm that the next entry's post-image is not-null
214+
// even though the original collection has been dropped and the new incarnation of the collection
215+
// with the same name has a different UUID.
211216
latestChange = cst.getOneChange(cursor);
212217
assert.eq(latestChange.operationType, "update");
213218
assert(latestChange.hasOwnProperty("fullDocument"));
214-
assert.eq(latestChange.fullDocument, null);
219+
assert.eq(latestChange.fullDocument, {"_id": "fullDocument is lookup 2"});
215220

216221
jsTestLog("Testing full document lookup with a real getMore");
217222
assert.commandWorked(coll.insert({_id: "getMoreEnabled"}));

jstests/change_streams/oplog_rewrite/change_stream_null_existence_eq_rewrite_test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const startPoint = db.getMongo().watch().getResumeToken();
2222
const numDocs = 8;
2323

2424
// Generate a write workload for the change stream to consume.
25-
generateChangeStreamWriteWorkload(testDB, collName, numDocs);
25+
generateChangeStreamWriteWorkload(testDB, collName, numDocs, false /* includeInvalidatingEvents */);
2626

2727
// Function to generate a list of all paths to be tested from those observed in the event stream.
2828
function traverseEvent(event, outputMap, prefixPath = "") {
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Tests that the change stream with 'fullDocument: updateLookup' option performs the lookup only by
3+
* nss by default and does an additional collection UUID check when
4+
* 'matchCollectionUUIDForUpdateLookup: true' option is set.
5+
*/
6+
load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'.
7+
load("jstests/libs/collection_drop_recreate.js"); // For 'assertDropCollection',
8+
// 'assertCreateCollection.
9+
load("jstests/libs/fixture_helpers.js"); // For 'FixtureHelpers'.
10+
11+
function assertUpdateLookupResultBeforeOp(op, changeStreamOptions) {
12+
jsTest.log(`Running change stream with update lookup test after '${op}' occurred with ${
13+
tojson(changeStreamOptions)}`);
14+
15+
const collNameA = "collA";
16+
const collNameB = "collB";
17+
let cst;
18+
19+
function tearDown() {
20+
cst.cleanUp();
21+
assertDropCollection(db, collNameA);
22+
assertDropCollection(db, collNameB);
23+
}
24+
25+
assertCreateCollection(db, collNameA);
26+
cst = new ChangeStreamTest(db);
27+
28+
let cursor = cst.startWatchingChanges({
29+
pipeline: [{$changeStream: {...changeStreamOptions, fullDocument: "updateLookup"}}],
30+
collection: collNameA
31+
});
32+
33+
// Insert 'doc' into 'collA' and ensure it is seen in the change stream.
34+
const doc = {_id: 0, a: 1};
35+
assert.commandWorked(db.getCollection(collNameA).insert(doc));
36+
let expected = {
37+
documentKey: {_id: doc._id},
38+
fullDocument: doc,
39+
ns: {db: "test", coll: collNameA},
40+
operationType: "insert",
41+
};
42+
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
43+
44+
// Update the 'doc' in order to generate the update event.
45+
assert.commandWorked(db.getCollection(collNameA).update({_id: doc._id}, {$inc: {a: 1}}));
46+
const updatedDocInCollA = {...doc, a: 2};
47+
48+
// In case where a change stream is opened with the 'checkUUIDOnUpdateLookup' flag set to true,
49+
// no 'fullDocument' should be returned to the user as the collection on which 'updateLookup'
50+
// has been performed has a different UUID from the collection on which change stream has been
51+
// opened. In case of the flag being set to false or not present return the latest document on
52+
// the collection with the same name.
53+
let expectedFullDocument;
54+
if (op === 'renameCollection') {
55+
// Rename collection collA -> collB.
56+
assert.commandWorked(db.getCollection(collNameA).renameCollection(collNameB));
57+
58+
// Create new collection with the old name, "collA", (yet UUID will be different) and insert
59+
// document with the same id.
60+
assertCreateCollection(db, collNameA);
61+
const newDocInNewCollA = {
62+
...doc,
63+
b: "extra field in the new document in the new collection"
64+
};
65+
assert.commandWorked(db.getCollection(collNameA).insert(newDocInNewCollA));
66+
expectedFullDocument =
67+
changeStreamOptions.matchCollectionUUIDForUpdateLookup ? null : newDocInNewCollA;
68+
} else if (op === 'shardCollection') {
69+
if (FixtureHelpers.isSharded(db.getCollection(collNameA))) {
70+
jsTest.log("Early exit, because collection 'collA' is already sharded");
71+
tearDown();
72+
return;
73+
}
74+
75+
// Sharding the unsharded collection does not change its UUID, therefore regardless of
76+
// 'matchCollectionUUIDForUpdateLookup' being set or not, we should observe the
77+
// 'updatedDocInCollA' on updateLookup.
78+
assert.commandWorked(db.adminCommand({
79+
shardCollection: db.getCollection(collNameA).getFullName(),
80+
key: {_id: 1},
81+
}));
82+
expectedFullDocument = updatedDocInCollA;
83+
} else if (op === 'reshardCollection') {
84+
if (!FixtureHelpers.isSharded(db.getCollection(collNameA))) {
85+
jsTest.log("Early exit, because collection 'collA' is not sharded");
86+
tearDown();
87+
return;
88+
}
89+
90+
// Reshard the collection in order to generate the new collection with the same name, but
91+
// different UUID.
92+
assert.commandWorked(db.adminCommand({
93+
reshardCollection: db.getCollection(collNameA).getFullName(),
94+
key: {_id: 1},
95+
numInitialChunks: 2
96+
}));
97+
expectedFullDocument =
98+
changeStreamOptions.matchCollectionUUIDForUpdateLookup ? null : updatedDocInCollA;
99+
}
100+
101+
expected = {
102+
documentKey: {_id: doc._id},
103+
fullDocument: expectedFullDocument,
104+
ns: {db: "test", coll: collNameA},
105+
operationType: "update",
106+
};
107+
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
108+
109+
tearDown();
110+
}
111+
112+
assertUpdateLookupResultBeforeOp('renameCollection', {});
113+
assertUpdateLookupResultBeforeOp('renameCollection', {matchCollectionUUIDForUpdateLookup: false});
114+
assertUpdateLookupResultBeforeOp('renameCollection', {matchCollectionUUIDForUpdateLookup: true});
115+
116+
if (FixtureHelpers.isMongos(db)) {
117+
assertUpdateLookupResultBeforeOp('shardCollection', {});
118+
assertUpdateLookupResultBeforeOp('shardCollection',
119+
{matchCollectionUUIDForUpdateLookup: false});
120+
assertUpdateLookupResultBeforeOp('shardCollection', {matchCollectionUUIDForUpdateLookup: true});
121+
122+
assertUpdateLookupResultBeforeOp('reshardCollection', {});
123+
assertUpdateLookupResultBeforeOp('reshardCollection',
124+
{matchCollectionUUIDForUpdateLookup: false});
125+
assertUpdateLookupResultBeforeOp('reshardCollection',
126+
{matchCollectionUUIDForUpdateLookup: true});
127+
}

jstests/noPassthrough/aggregation_cursor_invalidations.js

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ res = assert.commandWorked(testDB.runCommand({
134134
from: foreignCollection.getName(),
135135
localField: 'local',
136136
foreignField: 'foreign',
137+
pipeline: [{$project: {a: 1}}],
137138
as: 'results',
138139
}
139140
},
@@ -146,10 +147,8 @@ res = assert.commandWorked(testDB.runCommand({
146147
foreignCollection.drop();
147148
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
148149
res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName});
149-
assert.commandFailedWithCode(
150-
res,
151-
[ErrorCodes.QueryPlanKilled, ErrorCodes.NamespaceNotFound],
152-
'expected getMore to fail when the foreign collection has been dropped');
150+
assert.commandWorked(res,
151+
'expected getMore to succeed despite the foreign collection being dropped');
153152

154153
// Make sure the cursors were cleaned up.
155154
assertNoOpenCursorsOnSourceCollection();
@@ -214,10 +213,9 @@ res = assert.commandWorked(testDB.runCommand({
214213
foreignCollection.drop();
215214
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
216215
res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName});
217-
assert.commandFailedWithCode(
218-
res,
219-
ErrorCodes.NamespaceNotFound,
220-
'expected getMore to fail when the foreign collection has been dropped');
216+
assert.commandWorked(res,
217+
'expected getMore to succeed despite the foreign collection being dropped');
218+
221219
// Make sure the cursors were cleaned up.
222220
assertNoOpenCursorsOnSourceCollection();
223221

@@ -246,10 +244,8 @@ res = assert.commandWorked(testDB.runCommand({
246244
foreignCollection.drop();
247245
getMoreCollName = res.cursor.ns.substr(res.cursor.ns.indexOf('.') + 1);
248246
res = testDB.runCommand({getMore: res.cursor.id, collection: getMoreCollName});
249-
assert.commandFailedWithCode(
250-
res,
251-
ErrorCodes.NamespaceNotFound,
252-
'expected getMore to fail when the foreign collection has been dropped');
247+
assert.commandWorked(res,
248+
'expected getMore to succeed despite the foreign collection being dropped');
253249

254250
// Make sure the cursors were cleaned up.
255251
assertNoOpenCursorsOnSourceCollection();

jstests/sharding/change_stream_update_lookup_collation.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ for (let nextDocKey of [{shardKey: "abc", _id: "abc_1"}, {shardKey: "ABC", _id:
101101
assert.eq(next.documentKey, nextDocKey, tojson(next));
102102
assert.docEq(Object.merge(nextDocKey, {updatedCount: 1}), next.fullDocument);
103103
}
104-
assert.eq(numIdIndexUsages(st.rs0.getPrimary()), idIndexUsagesPreIteration.shard0 + 1);
105-
assert.eq(numIdIndexUsages(st.rs1.getPrimary()), idIndexUsagesPreIteration.shard1 + 1);
104+
assert.eq(numIdIndexUsages(st.rs0.getPrimary()), idIndexUsagesPreIteration.shard0);
105+
assert.eq(numIdIndexUsages(st.rs1.getPrimary()), idIndexUsagesPreIteration.shard1);
106106

107107
changeStream.close();
108108

@@ -147,8 +147,8 @@ for (let nextDocKey of [{shardKey: "ABC", _id: "abc_1"}, {shardKey: "abc", _id:
147147
assert.eq(next.documentKey, nextDocKey, tojson(next));
148148
assert.docEq(Object.merge(nextDocKey, {updatedCount: 2}), next.fullDocument);
149149
}
150-
assert.eq(numIdIndexUsages(st.rs0.getPrimary()), idIndexUsagesPreIteration.shard0 + 1);
151-
assert.eq(numIdIndexUsages(st.rs1.getPrimary()), idIndexUsagesPreIteration.shard1 + 1);
150+
assert.eq(numIdIndexUsages(st.rs0.getPrimary()), idIndexUsagesPreIteration.shard0);
151+
assert.eq(numIdIndexUsages(st.rs1.getPrimary()), idIndexUsagesPreIteration.shard1);
152152

153153
strengthOneChangeStream.close();
154154

src/mongo/db/pipeline/document_source_change_stream.idl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,13 @@ structs:
185185
"updateDescription" field in update events will be omitted.
186186
query_shape: literal
187187

188+
matchCollectionUUIDForUpdateLookup:
189+
type: optionalBool
190+
description: An internal flag that ensures that updateLookup is performed on the
191+
collection where UUID matches the one from the corresponding change
192+
event. In case of a mismatch, null document will be returned.
193+
query_shape: literal
194+
188195
DocumentSourceChangeStreamOplogMatchSpec:
189196
strict: true
190197
description: A document used to specify the $_internalChangeStreamOplogMatch stage of an

src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,20 @@ boost::optional<Document> DocumentSourceChangeStreamAddPostImage::lookupLatestPo
204204
// Update lookup queries sent from mongoS to shards are allowed to use speculative majority
205205
// reads. Even if the lookup itself succeeded, it may not have returned any results if the
206206
// document was deleted in the time since the update op.
207-
invariant(resumeTokenData.uuid);
208-
return pExpCtx->mongoProcessInterface->lookupSingleDocument(
209-
pExpCtx, nss, *resumeTokenData.uuid, documentKey, std::move(readConcern));
207+
tassert(9797601, "UUID should be present in the resume token", resumeTokenData.uuid);
208+
try {
209+
// In case we are running $changeStreams and are performing updateLookup, we do not pass
210+
// 'collectionUUID' to avoid any UUID validation on the target collection. UUID of the
211+
// target collection should only be checked if 'matchCollectionUUIDForUpdateLookup' flag has
212+
// been passed.
213+
auto collectionUUID = pExpCtx->changeStreamSpec->getMatchCollectionUUIDForUpdateLookup()
214+
? boost::optional<UUID>(*resumeTokenData.uuid)
215+
: boost::none;
216+
return pExpCtx->mongoProcessInterface->lookupSingleDocument(
217+
pExpCtx, nss, std::move(collectionUUID), documentKey, std::move(readConcern));
218+
} catch (const ExceptionFor<ErrorCodes::TooManyMatchingDocuments>& ex) {
219+
uasserted(ErrorCodes::ChangeStreamFatalError, ex.what());
220+
}
210221
}
211222

212223
Value DocumentSourceChangeStreamAddPostImage::doSerialize(const SerializationOptions& opts) const {

src/mongo/db/pipeline/document_source_change_stream_add_post_image_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldErrorIfDocumentKeyIsNot
326326
std::make_unique<MockMongoInterface>(std::move(foreignCollection));
327327

328328
ASSERT_THROWS_CODE(
329-
lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments);
329+
lookupChangeStage->getNext(), AssertionException, ErrorCodes::ChangeStreamFatalError);
330330
}
331331

332332
TEST_F(DocumentSourceChangeStreamAddPostImageTest, ShouldPropagatePauses) {

src/mongo/db/pipeline/document_source_change_stream_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ struct MockMongoInterface final : public ExecutableStubMongoProcessInterface {
171171
boost::optional<Document> lookupSingleDocument(
172172
const boost::intrusive_ptr<ExpressionContext>& expCtx,
173173
const NamespaceString& nss,
174-
UUID collectionUUID,
174+
boost::optional<UUID> collectionUUID,
175175
const Document& documentKey,
176176
boost::optional<BSONObj> readConcern) final {
177177
Matcher matcher(documentKey.toBson(), expCtx);

0 commit comments

Comments
 (0)