Skip to content

Commit d3b4eab

Browse files
galon1MongoDB Bot
authored andcommitted
SERVER-89614 SERVER-91753 Add enum to track $out cleanup state to properly drop buckets collection during interrupt (#31968)
GitOrigin-RevId: f8582b723ddbdc1f93c7d193691d5790bfb87f76
1 parent bf8f6cb commit d3b4eab

File tree

8 files changed

+289
-58
lines changed

8 files changed

+289
-58
lines changed

jstests/aggregation/bugs/server3253.js

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
// server-3253 Unsharded support for $out
99
load('jstests/aggregation/extras/utils.js');
1010

11-
var input = db.server3253_in;
12-
var inputDoesntExist = db.server3253_doesnt_exist;
13-
var output = db.server3253_out;
14-
var cappedOutput = db.server3253_out_capped;
11+
const testDb = db.getSiblingDB("server3253");
12+
var input = testDb.server3253_in;
13+
var inputDoesntExist = testDb.server3253_doesnt_exist;
14+
var output = testDb.server3253_out;
15+
var cappedOutput = testDb.server3253_out_capped;
1516

1617
input.drop();
1718
inputDoesntExist.drop(); // never created
@@ -45,8 +46,8 @@ function test(input, pipeline, expected) {
4546
}
4647

4748
function listCollections(name) {
48-
var collectionInfosCursor = db.runCommand("listCollections", {filter: {name: name}});
49-
return new DBCommandCursor(db, collectionInfosCursor).toArray();
49+
var collectionInfosCursor = testDb.runCommand("listCollections", {filter: {name: name}});
50+
return new DBCommandCursor(testDb, collectionInfosCursor).toArray();
5051
}
5152

5253
input.insert({_id: 1});
@@ -91,7 +92,7 @@ test(input, [{$project: {c: {$concat: ["hello there ", "_id"]}}}], [
9192
]);
9293

9394
cappedOutput.drop();
94-
db.createCollection(cappedOutput.getName(), {capped: true, size: 2});
95+
testDb.createCollection(cappedOutput.getName(), {capped: true, size: 2});
9596
assertErrorCode(input, {$out: cappedOutput.getName()}, 17152);
9697

9798
// ensure everything works even if input doesn't exist.

jstests/concurrency/fsm_workloads/timeseries_agg_out.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
* does_not_support_transactions,
1515
* does_not_support_stepdowns,
1616
* requires_fcv_70,
17-
* featureFlagAggOutTimeseries
17+
* # `convertToCapped` is not supported in serverless.
18+
* command_not_supported_in_serverless,
1819
* ]
1920
*/
2021
load('jstests/concurrency/fsm_workloads/agg_out.js'); // for $super state functions

jstests/concurrency/fsm_workloads/timeseries_agg_out_interrupt_cleanup.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
* does_not_support_stepdowns,
1212
* uses_curop_agg_stage,
1313
* requires_fcv_70,
14-
* featureFlagAggOutTimeseries
1514
* ]
1615
*/
1716
load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload
@@ -58,7 +57,9 @@ var $config = extendWorkload($config, function($config, $super) {
5857
"command.drop": {
5958
$exists: false
6059
} // Exclude 'drop' command from the filter to make sure that we don't kill the the
61-
// drop command which is responsible for dropping the temporary collection.
60+
// drop command which is responsible for dropping the temporary collection in the
61+
// destructor. This won't prevent any drop commands run internally (with the same
62+
// operation context) by $out, such as in renameCollection.
6263
});
6364
};
6465

jstests/core/timeseries/timeseries_out_non_sharded.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
* # We need a timeseries collection.
1010
* requires_timeseries,
1111
* requires_fcv_70,
12-
* featureFlagAggOutTimeseries
1312
* ]
1413
*/
1514
(function() {
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Tests that $out cleans up the buckets collections if interrupted after the rename, but before the
3+
* view is created.
4+
*
5+
* @tags: [
6+
* # We need a timeseries collection.
7+
* requires_timeseries,
8+
* requires_persistence,
9+
* ]
10+
*/
11+
(function() {
12+
"use strict";
13+
14+
load("jstests/libs/fail_point_util.js");
15+
load("jstests/libs/parallel_shell_helpers.js");
16+
17+
const st = new ShardingTest({shards: 2});
18+
19+
const dbName = jsTestName();
20+
const outCollName = 'out';
21+
const sourceCollName = 'foo';
22+
const outBucketsCollName = 'system.buckets.out';
23+
const testDB = st.s.getDB(dbName);
24+
const sourceDocument = {
25+
x: 1,
26+
t: ISODate()
27+
};
28+
const primary = st.shard0.shardName;
29+
assert.commandWorked(st.s.adminCommand({enableSharding: dbName, primaryShard: primary}));
30+
31+
function listCollections(collName) {
32+
return testDB.getCollectionNames().filter(coll => coll === collName);
33+
}
34+
35+
function resetCollections() {
36+
if (testDB[sourceCollName]) {
37+
assert(testDB[sourceCollName].drop());
38+
}
39+
if (testDB[outCollName]) {
40+
assert(testDB[outCollName].drop());
41+
}
42+
}
43+
44+
function killOp() {
45+
const adminDB = st.s.getDB("admin");
46+
const curOps = adminDB
47+
.aggregate([
48+
{$currentOp: {allUsers: true}},
49+
{
50+
$match: {
51+
"command.comment": "testComment",
52+
// The create coordinator issues fire and forget refreshes after
53+
// creating a collection. We filter these out to ensure we are
54+
// killing the correct operation.
55+
"command._flushRoutingTableCacheUpdates": {$exists: false}
56+
}
57+
}
58+
])
59+
.toArray();
60+
assert.eq(1, curOps.length, curOps);
61+
assert.commandWorked(adminDB.killOp(curOps[0].opid));
62+
}
63+
64+
function runOut(dbName, sourceCollName, targetCollName) {
65+
const cmdRes = db.getSiblingDB(dbName).runCommand({
66+
aggregate: sourceCollName,
67+
pipeline: [{$out: {db: dbName, coll: targetCollName, timeseries: {timeField: 't'}}}],
68+
cursor: {},
69+
comment: "testComment",
70+
});
71+
assert.commandFailed(cmdRes);
72+
}
73+
74+
function runOutAndInterrupt() {
75+
const fp = configureFailPoint(st.shard0.rs.getPrimary(),
76+
'outWaitAfterTempCollectionRenameBeforeView',
77+
{shouldCheckForInterrupt: true});
78+
79+
let outShell =
80+
startParallelShell(funWithArgs(runOut, dbName, sourceCollName, outCollName), st.s.port);
81+
82+
fp.wait();
83+
84+
// Verify that the buckets collection is created.
85+
let bucketCollections = listCollections(outBucketsCollName);
86+
assert.eq(1, bucketCollections.length, bucketCollections);
87+
88+
// Provoke failure.
89+
killOp();
90+
outShell();
91+
92+
// Assert that the temporary collections has been garbage collected.
93+
const tempCollections =
94+
testDB.getCollectionNames().filter(coll => coll.startsWith('tmp.agg_out'));
95+
const garbageCollectionEntries = st.s.getDB('config')['agg_temp_collections'].count();
96+
assert(tempCollections.length === 0 && garbageCollectionEntries === 0);
97+
}
98+
99+
// Validates $out should clean up the buckets collection if the command is interrupted before the
100+
// view is created.
101+
function testCreatingNewCollection() {
102+
resetCollections();
103+
104+
// Create source collection.
105+
assert.commandWorked(testDB.runCommand({create: sourceCollName}));
106+
assert.commandWorked(testDB[sourceCollName].insert(sourceDocument));
107+
108+
let bucketCollections = listCollections(outBucketsCollName);
109+
assert.eq(0, bucketCollections, bucketCollections);
110+
111+
// The output collection is a time-series collection and needs a view so $out will run on the
112+
// primary.
113+
runOutAndInterrupt();
114+
115+
// There should neither be a buckets collection nor a view.
116+
bucketCollections = listCollections(outBucketsCollName);
117+
assert.eq(0, bucketCollections.length, bucketCollections);
118+
let view = listCollections(outCollName);
119+
assert.eq(0, view.length, view);
120+
}
121+
122+
testCreatingNewCollection();
123+
124+
// Validates $out should not clean up the buckets collection if the command is interrupted when the
125+
// view exists.
126+
function testReplacingExistingCollection() {
127+
resetCollections();
128+
129+
// Create source collection.
130+
assert.commandWorked(testDB.runCommand({create: sourceCollName}));
131+
assert.commandWorked(testDB[sourceCollName].insert(sourceDocument));
132+
133+
// Create the time-series collection $out will replace.
134+
assert.commandWorked(testDB.runCommand({create: outCollName, timeseries: {timeField: "t"}}));
135+
assert.commandWorked(testDB[outCollName].insert({a: 1, t: ISODate()}));
136+
137+
let bucketCollections = listCollections(outBucketsCollName);
138+
assert.eq(1, bucketCollections.length, bucketCollections);
139+
140+
// The output collection is a time-series collection and needs a view so $out will run on the
141+
// primary.
142+
runOutAndInterrupt();
143+
144+
// There should be a buckets collection and a view.
145+
bucketCollections = listCollections(outBucketsCollName);
146+
assert.eq(1, bucketCollections.length, bucketCollections);
147+
let view = listCollections(outCollName);
148+
assert.eq(1, view.length, view);
149+
150+
// $out should replace the document inside the collection.
151+
assert.sameMembers(testDB[outCollName].find({}, {_id: 0}).toArray(), [sourceDocument]);
152+
}
153+
154+
testReplacingExistingCollection();
155+
156+
st.stop();
157+
}());

jstests/sharding/timeseries_out_sharded.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
* # We need a timeseries collection.
1111
* requires_timeseries,
1212
* requires_fcv_70,
13-
* featureFlagAggOutTimeseries
1413
* ]
1514
*/
1615
(function() {

0 commit comments

Comments
 (0)