Skip to content

Commit 9e29569

Browse files
krutishah139MongoDB Bot
authored andcommitted
SERVER-87433 Handle failover behavior for ReshardingRecipientService during abort (#32278)
GitOrigin-RevId: 8304e51dbf0bfae3710b8932cbb00231499724fd
1 parent 12ca55a commit 9e29569

10 files changed

+473
-74
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Simulates a failover prior to removing the recipient doc while resharding is aborting from an
3+
* unrecoverable error on the donor. Resharding should abort successfully after stepUp.
4+
*
5+
* See BF-32038 for more details.
6+
* @tags: [
7+
* requires_fcv_70,
8+
* ]
9+
*/
10+
(function() {
11+
"use strict";
12+
13+
load("jstests/libs/discover_topology.js");
14+
load("jstests/libs/fail_point_util.js");
15+
load("jstests/sharding/libs/resharding_test_fixture.js");
16+
17+
const reshardingTest = new ReshardingTest({numDonors: 2, numRecipients: 2});
18+
reshardingTest.setup();
19+
20+
const donorShardNames = reshardingTest.donorShardNames;
21+
const recipientShardNames = reshardingTest.recipientShardNames;
22+
23+
const sourceCollection = reshardingTest.createShardedCollection({
24+
ns: "reshardingDb.coll",
25+
shardKeyPattern: {oldKey: 1},
26+
chunks: [
27+
{min: {oldKey: MinKey}, max: {oldKey: 10}, shard: donorShardNames[0]},
28+
{min: {oldKey: 10}, max: {oldKey: MaxKey}, shard: donorShardNames[1]},
29+
],
30+
});
31+
const mongos = sourceCollection.getMongo();
32+
const topology = DiscoverTopology.findConnectedNodes(mongos);
33+
const donor = new Mongo(topology.shards[donorShardNames[0]].primary);
34+
const recipient = new Mongo(topology.shards[recipientShardNames[0]].primary);
35+
36+
const reshardingDonorFailsBeforeObtainingTimestampFp =
37+
configureFailPoint(donor, "reshardingDonorFailsBeforeObtainingTimestamp");
38+
const hangBeforeRemovingRecipientDocFp =
39+
configureFailPoint(recipient, "removeRecipientDocFailpoint");
40+
41+
reshardingTest.withReshardingInBackground(
42+
{
43+
newShardKeyPattern: {newKey: 1},
44+
newChunks: [
45+
{min: {newKey: MinKey}, max: {newKey: 10}, shard: recipientShardNames[0]},
46+
{min: {newKey: 10}, max: {newKey: MaxKey}, shard: recipientShardNames[1]},
47+
],
48+
},
49+
() => {
50+
hangBeforeRemovingRecipientDocFp.wait();
51+
52+
const recipientDoc =
53+
recipient.getCollection('config.localReshardingOperations.recipient').findOne({
54+
ns: "reshardingDb.coll"
55+
});
56+
assert(recipientDoc != null);
57+
assert(recipientDoc.mutableState.state === "done");
58+
assert(recipientDoc.mutableState.abortReason != null);
59+
assert(recipientDoc.mutableState.abortReason.code === ErrorCodes.ReshardCollectionAborted);
60+
61+
reshardingTest.stepUpNewPrimaryOnShard(recipientShardNames[0]);
62+
const recipientRS = reshardingTest.getReplSetForShard(recipientShardNames[0]);
63+
recipientRS.awaitSecondaryNodes();
64+
recipientRS.awaitReplication();
65+
reshardingTest.retryOnceOnNetworkError(hangBeforeRemovingRecipientDocFp.off);
66+
},
67+
{expectedErrorCode: ErrorCodes.InternalError});
68+
69+
reshardingTest.teardown();
70+
})();

src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,5 +252,46 @@ TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) {
252252

253253
reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"});
254254
}
255+
256+
TEST_F(ReshardingCoordinatorObserverTest, onAllParticipantsReportingAbortReason) {
257+
auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>();
258+
auto futRecipient = reshardingObserver->awaitAllRecipientsDone();
259+
auto futDonor = reshardingObserver->awaitAllDonorsDone();
260+
ASSERT_FALSE(futRecipient.isReady());
261+
ASSERT_FALSE(futDonor.isReady());
262+
263+
auto abortReason = Status{ErrorCodes::InternalError, "We gotta abort"};
264+
auto abortReasonFromCoordinator =
265+
Status{ErrorCodes::ReshardCollectionAborted, "Recieved abort from coordinator"};
266+
267+
auto recipientShards0 =
268+
makeMockRecipientsInState(RecipientStateEnum::kAwaitingFetchTimestamp, Timestamp(1, 1));
269+
std::vector<DonorShardEntry> donorShards0{
270+
{resharding::makeDonorShard(
271+
ShardId{"s1"}, DonorStateEnum::kError, Timestamp(1, 1), abortReason)},
272+
{resharding::makeDonorShard(
273+
ShardId{"s2"}, DonorStateEnum::kPreparingToDonate, Timestamp(1, 1))}};
274+
275+
auto coordinatorDoc0 =
276+
makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards0, abortReason);
277+
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
278+
ASSERT_FALSE(futRecipient.isReady());
279+
ASSERT_FALSE(futDonor.isReady());
280+
281+
auto recipientShards1 = makeMockRecipientsInState(
282+
RecipientStateEnum::kDone, Timestamp(1, 1), abortReasonFromCoordinator);
283+
std::vector<DonorShardEntry> donorShards1{
284+
{resharding::makeDonorShard(
285+
ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)},
286+
{resharding::makeDonorShard(
287+
ShardId{"s2"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReasonFromCoordinator)}};
288+
289+
auto coordinatorDoc1 =
290+
makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards1, abortReason);
291+
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
292+
ASSERT_TRUE(futRecipient.isReady());
293+
ASSERT_TRUE(futDonor.isReady());
294+
}
295+
255296
} // namespace
256297
} // namespace mongo

src/mongo/db/s/resharding/resharding_donor_service.cpp

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ MONGO_FAIL_POINT_DEFINE(reshardingPauseDonorBeforeCatalogCacheRefresh);
7575
MONGO_FAIL_POINT_DEFINE(reshardingPauseDonorAfterBlockingReads);
7676
MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsAfterTransitionToDonatingOplogEntries);
7777
MONGO_FAIL_POINT_DEFINE(removeDonorDocFailpoint);
78+
MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsBeforeObtainingTimestamp);
7879

7980
using namespace fmt::literals;
8081

@@ -119,18 +120,6 @@ Timestamp generateMinFetchTimestamp(OperationContext* opCtx, const NamespaceStri
119120
return generatedOpTime.getTimestamp();
120121
}
121122

122-
/**
123-
* Returns whether it is possible for the donor to be in 'state' when resharding will indefinitely
124-
* abort.
125-
*/
126-
bool inPotentialAbortScenario(const DonorStateEnum& state) {
127-
// Regardless of whether resharding will abort or commit, the donor will eventually reach state
128-
// kDone.
129-
// Additionally, if the donor is in state kError, it is guaranteed that the coordinator will
130-
// eventually begin the abort process.
131-
return state == DonorStateEnum::kError || state == DonorStateEnum::kDone;
132-
}
133-
134123
/**
135124
* Fulfills the promise if it is not already. Otherwise, does nothing.
136125
*/
@@ -146,6 +135,17 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp, Status error)
146135
}
147136
}
148137

138+
/**
139+
* Returns whether it is possible for the donor to be in 'state' when resharding will indefinitely
140+
* abort.
141+
*/
142+
bool inPotentialAbortScenario(const DonorStateEnum& state) {
143+
// Regardless of whether resharding will abort or commit, the donor will eventually reach state
144+
// kDone. Additionally, if the donor is in state kError, it is guaranteed that the coordinator
145+
// will eventually begin the abort process.
146+
return state == DonorStateEnum::kError || state == DonorStateEnum::kDone;
147+
}
148+
149149
class ExternalStateImpl : public ReshardingDonorService::DonorStateMachineExternalState {
150150
public:
151151
ShardId myShardId(ServiceContext* serviceContext) const override {
@@ -378,7 +378,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardin
378378
}
379379

380380
// If aborted, the donor must be allowed to transition to done from any state.
381-
_transitionState(DonorStateEnum::kDone);
381+
_transitionToDone(aborted);
382382
}
383383

384384
{
@@ -611,6 +611,12 @@ void ReshardingDonorService::DonorStateMachine::
611611
_externalState->waitForCollectionFlush(opCtx.get(), _metadata.getTempReshardingNss());
612612
}
613613

614+
reshardingDonorFailsBeforeObtainingTimestamp.execute([&](const BSONObj& data) {
615+
auto errmsgElem = data["errmsg"];
616+
StringData errmsg = errmsgElem ? errmsgElem.checkAndGetStringData() : "Failing for test"_sd;
617+
uasserted(ErrorCodes::InternalError, errmsg);
618+
});
619+
614620
Timestamp minFetchTimestamp = [this] {
615621
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
616622
return generateMinFetchTimestamp(opCtx.get(), _metadata.getSourceNss());
@@ -847,12 +853,12 @@ void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenTrans
847853
opCtx.get(), _metadata.getSourceNss(), _metadata.getSourceUUID());
848854
}
849855

850-
_transitionState(DonorStateEnum::kDone);
856+
_transitionToDone(false /* aborted */);
851857
}
852858

853859
void ReshardingDonorService::DonorStateMachine::_transitionState(DonorStateEnum newState) {
854860
invariant(newState != DonorStateEnum::kDonatingInitialData &&
855-
newState != DonorStateEnum::kError);
861+
newState != DonorStateEnum::kError && newState != DonorStateEnum::kDone);
856862

857863
auto newDonorCtx = _donorCtx;
858864
newDonorCtx.setState(newState);
@@ -864,6 +870,10 @@ void ReshardingDonorService::DonorStateMachine::_transitionState(DonorShardConte
864870
auto oldState = _donorCtx.getState();
865871
auto newState = newDonorCtx.getState();
866872

873+
// The donor state machine enters the kError state on unrecoverable errors and so we don't
874+
// expect it to ever transition from kError except to kDone.
875+
invariant(oldState != DonorStateEnum::kError || newState == DonorStateEnum::kDone);
876+
867877
_updateDonorDocument(std::move(newDonorCtx));
868878

869879
_metrics->onStateTransition(oldState, newState);
@@ -894,6 +904,16 @@ void ReshardingDonorService::DonorStateMachine::_transitionToError(Status abortR
894904
_transitionState(std::move(newDonorCtx));
895905
}
896906

907+
void ReshardingDonorService::DonorStateMachine::_transitionToDone(bool aborted) {
908+
auto newDonorCtx = _donorCtx;
909+
newDonorCtx.setState(DonorStateEnum::kDone);
910+
if (aborted) {
911+
resharding::emplaceTruncatedAbortReasonIfExists(newDonorCtx,
912+
resharding::coordinatorAbortedError());
913+
}
914+
_transitionState(std::move(newDonorCtx));
915+
}
916+
897917
/**
898918
* Returns a query filter of the form
899919
* {
@@ -1069,6 +1089,13 @@ CancellationToken ReshardingDonorService::DonorStateMachine::_initAbortSource(
10691089
_abortSource = CancellationSource(stepdownToken);
10701090
}
10711091

1092+
if (_donorCtx.getState() == DonorStateEnum::kDone && _donorCtx.getAbortReason()) {
1093+
// A donor in state kDone with an abortReason is indication that the coordinator
1094+
// has persisted the decision and called abort on all participants. Canceling the
1095+
// _abortSource to avoid repeating the future chain.
1096+
_abortSource->cancel();
1097+
}
1098+
10721099
if (auto future = _coordinatorHasDecisionPersisted.getFuture(); future.isReady()) {
10731100
if (auto status = future.getNoThrow(); !status.isOK()) {
10741101
// onReshardingFieldsChanges() missed canceling _abortSource because _initAbortSource()

src/mongo/db/s/resharding/resharding_donor_service.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ class ReshardingDonorService::DonorStateMachine final
204204
// Transitions the on-disk and in-memory state to DonorStateEnum::kError.
205205
void _transitionToError(Status abortReason);
206206

207+
// Transitions the on-disk and in-memory state to DonorStateEnum::kDone.
208+
void _transitionToDone(bool aborted);
209+
207210
BSONObj _makeQueryForCoordinatorUpdate(const ShardId& shardId, DonorStateEnum newState);
208211

209212
ExecutorFuture<void> _updateCoordinator(

0 commit comments

Comments
 (0)