Skip to content

Commit f28f11b

Browse files
Fixed split bug on pkversion lease during first request if split happens (#45363)
* Fixed split bug on pkversion lease during first request * Fixed changelog
1 parent 83cd8a6 commit f28f11b

File tree

5 files changed

+130
-2
lines changed

5 files changed

+130
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.changefeed.pkversion;
5+
6+
import com.azure.cosmos.CosmosAsyncContainer;
7+
import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
8+
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
9+
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
10+
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
11+
import com.azure.cosmos.implementation.changefeed.Lease;
12+
import com.azure.cosmos.implementation.changefeed.LeaseCheckpointer;
13+
import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer;
14+
import com.azure.cosmos.implementation.changefeed.ProcessorSettings;
15+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
16+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
17+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
18+
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
19+
import com.azure.cosmos.implementation.changefeed.exceptions.FeedRangeGoneException;
20+
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
21+
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
22+
import com.fasterxml.jackson.databind.JsonNode;
23+
import org.mockito.Mockito;
24+
import org.testng.annotations.Test;
25+
import reactor.core.publisher.Flux;
26+
import reactor.test.StepVerifier;
27+
28+
import java.util.UUID;
29+
30+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
31+
32+
public class PartitionProcessorImplTests {
33+
34+
@Test
35+
public void partitionSplitHappenOnFirstRequest() {
36+
@SuppressWarnings("unchecked") ChangeFeedObserver<JsonNode> observerMock =
37+
(ChangeFeedObserver<JsonNode>) Mockito.mock(ChangeFeedObserver.class);
38+
ChangeFeedContextClient changeFeedContextClientMock = Mockito.mock(ChangeFeedContextClient.class);
39+
Mockito
40+
.when(changeFeedContextClientMock.createDocumentChangeFeedQuery(Mockito.any(), Mockito.any(),
41+
Mockito.any()))
42+
.thenReturn(Flux.error(new PartitionKeyRangeIsSplittingException()));
43+
44+
ChangeFeedState changeFeedState = this.getChangeFeedStateWithContinuationToken();
45+
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
46+
ProcessorSettings processorSettings = new ProcessorSettings(changeFeedState, containerMock);
47+
processorSettings.withMaxItemCount(10);
48+
49+
Lease leaseMock = Mockito.mock(ServiceItemLease.class);
50+
Mockito
51+
.when(leaseMock.getContinuationToken())
52+
.thenReturn(changeFeedState.getContinuation().getCurrentContinuationToken().getToken());
53+
54+
LeaseCheckpointer leaseCheckpointerMock = Mockito.mock(LeaseCheckpointer.class);
55+
PartitionCheckpointer partitionCheckpointer = new PartitionCheckpointerImpl(leaseCheckpointerMock, leaseMock);
56+
57+
PartitionProcessorImpl partitionProcessor = new PartitionProcessorImpl(
58+
observerMock,
59+
changeFeedContextClientMock,
60+
processorSettings,
61+
partitionCheckpointer,
62+
leaseMock,
63+
null
64+
);
65+
66+
StepVerifier.create(partitionProcessor.run(new CancellationTokenSource().getToken()))
67+
.verifyComplete();
68+
69+
RuntimeException runtimeException = partitionProcessor.getResultException();
70+
assertThat(runtimeException).isNotNull();
71+
assertThat(runtimeException).isInstanceOf(FeedRangeGoneException.class);
72+
FeedRangeGoneException feedRangeGoneException = (FeedRangeGoneException) runtimeException;
73+
assertThat(feedRangeGoneException.getLastContinuation()).isEqualTo(leaseMock.getContinuationToken());
74+
}
75+
76+
private ChangeFeedState getChangeFeedStateWithContinuationToken() {
77+
String containerRid = "/cols/" + UUID.randomUUID();
78+
String pkRangeId = UUID.randomUUID().toString();
79+
String continuationDummy = UUID.randomUUID().toString();
80+
String continuationJson = String.format(
81+
"{\"V\":1," +
82+
"\"Rid\":\"%s\"," +
83+
"\"Continuation\":[" +
84+
"{\"token\":\"%s\",\"range\":{\"min\":\"AA\",\"max\":\"BB\"}}," +
85+
"{\"token\":\"%s\",\"range\":{\"min\":\"CC\",\"max\":\"DD\"}}" +
86+
"]," +
87+
"\"PKRangeId\":\"%s\"}",
88+
containerRid,
89+
continuationDummy,
90+
continuationDummy,
91+
pkRangeId);
92+
93+
FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId);
94+
ChangeFeedStartFromInternal startFromSettings = ChangeFeedStartFromInternal.createFromNow();
95+
FeedRangeContinuation continuation = FeedRangeContinuation.convert(continuationJson);
96+
97+
ChangeFeedState state = new ChangeFeedStateV1(
98+
containerRid,
99+
feedRange,
100+
ChangeFeedMode.INCREMENTAL,
101+
startFromSettings,
102+
continuation);
103+
104+
return state;
105+
}
106+
}

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed an issue where child partition is getting overridden with null continuation token if a split happens during the first request of a parent partition. - See [PR 45363](https://github.com/Azure/azure-sdk-for-java/pull/45363)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionControllerImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,17 @@ private Mono<Void> getWorkerJob(
162162
}
163163

164164
private Mono<Void> handleFeedRangeGone(Lease lease, String lastContinuationToken) {
165-
lease.setContinuationToken(lastContinuationToken);
165+
if (lastContinuationToken != null) {
166+
logger.warn("Lease with token {}: with owner {}: updated with last continuation token {}",
167+
lease.getLeaseToken(),
168+
lease.getOwner(),
169+
lastContinuationToken);
170+
lease.setContinuationToken(lastContinuationToken);
171+
} else {
172+
logger.warn("Continuation token not found for split for lease with token {}: with owner {}",
173+
lease.getLeaseToken(),
174+
lease.getOwner());
175+
}
166176

167177
return this.synchronizer.getFeedRangeGoneHandler(lease)
168178
.flatMap(partitionGoneHandler -> {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionControllerImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,17 @@ private Mono<Void> getWorkerJob(
168168
}
169169

170170
private Mono<Void> handleSplit(Lease lease, String lastContinuationToken) {
171-
lease.setContinuationToken(lastContinuationToken);
171+
if (lastContinuationToken != null) {
172+
logger.warn("Partition {}, with owner: {}, updated with last continuation token: {}",
173+
lease.getLeaseToken(),
174+
lease.getOwner(),
175+
lastContinuationToken);
176+
lease.setContinuationToken(lastContinuationToken);
177+
} else {
178+
logger.warn("Continuation token not found for split for partition: {}, with owner: {}",
179+
lease.getLeaseToken(),
180+
lease.getOwner());
181+
}
172182
return this.synchronizer.splitPartition(lease)
173183
.flatMap(l -> {
174184
if (this.shouldSkipDirectLeaseAssignment) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public PartitionProcessorImpl(ChangeFeedObserver<JsonNode> observer,
6969
this.settings = settings;
7070
this.checkpointer = checkPointer;
7171
this.lease = lease;
72+
this.lastServerContinuationToken = this.lease.getContinuationToken();
7273

7374
ChangeFeedState state = settings.getStartState();
7475
this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state);

0 commit comments

Comments
 (0)