Skip to content

Commit 268516f

Browse files
authored
Fix wrong active and inactive partition number (#10712)
1 parent 7286db0 commit 268516f

File tree

3 files changed

+63
-14
lines changed

3 files changed

+63
-14
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,9 @@ void TPartitionFamily::InactivatePartition(ui32 partitionId) {
396396
}
397397

398398
void TPartitionFamily::ChangePartitionCounters(ssize_t active, ssize_t inactive) {
399+
Y_VERIFY_DEBUG((ssize_t)ActivePartitionCount + active >= 0);
400+
Y_VERIFY_DEBUG((ssize_t)InactivePartitionCount + inactive >= 0);
401+
399402
ActivePartitionCount += active;
400403
InactivePartitionCount += inactive;
401404

@@ -985,7 +988,7 @@ bool TConsumer::SetCommittedState(ui32 partitionId, ui32 generation, ui64 cookie
985988
return Partitions[partitionId].SetCommittedState(generation, cookie);
986989
}
987990

988-
bool TConsumer::ProccessReadingFinished(ui32 partitionId, const TActorContext& ctx) {
991+
bool TConsumer::ProccessReadingFinished(ui32 partitionId, bool wasInactive, const TActorContext& ctx) {
989992
if (!ScalingSupport()) {
990993
return false;
991994
}
@@ -996,7 +999,9 @@ bool TConsumer::ProccessReadingFinished(ui32 partitionId, const TActorContext& c
996999
if (!family) {
9971000
return false;
9981001
}
999-
family->InactivatePartition(partitionId);
1002+
if (!wasInactive) {
1003+
family->InactivatePartition(partitionId);
1004+
}
10001005

10011006
if (!family->IsLonely() && partition.Commited) {
10021007
if (BreakUpFamily(family, partitionId, false, ctx)) {
@@ -1065,8 +1070,13 @@ void TConsumer::StartReading(ui32 partitionId, const TActorContext& ctx) {
10651070
}
10661071

10671072
auto* partition = GetPartition(partitionId);
1073+
if (!partition) {
1074+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1075+
GetPrefix() << "Reading of the partition " << partitionId << " was started by " << ConsumerName << ".");
1076+
}
10681077

1069-
if (partition && partition->StartReading()) {
1078+
auto wasInactive = partition->IsInactive();
1079+
if (partition->StartReading()) {
10701080
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
10711081
GetPrefix() << "Reading of the partition " << partitionId << " was started by " << ConsumerName << ". We stop reading from child partitions.");
10721082

@@ -1080,7 +1090,9 @@ void TConsumer::StartReading(ui32 partitionId, const TActorContext& ctx) {
10801090
return;
10811091
}
10821092

1083-
family->ActivatePartition(partitionId);
1093+
if (wasInactive) {
1094+
family->ActivatePartition(partitionId);
1095+
}
10841096

10851097
// We releasing all children's partitions because we don't start reading the partition from EndOffset
10861098
GetPartitionGraph().Travers(partitionId, [&](ui32 partitionId) {
@@ -1097,8 +1109,6 @@ void TConsumer::StartReading(ui32 partitionId, const TActorContext& ctx) {
10971109
return true;
10981110
});
10991111
} else {
1100-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1101-
GetPrefix() << "Reading of the partition " << partitionId << " was started by " << ConsumerName << ".");
11021112
}
11031113
}
11041114

@@ -1139,7 +1149,7 @@ void TConsumer::FinishReading(TEvPersQueue::TEvReadingPartitionFinishedRequest::
11391149
GetPrefix() << "Reading of the partition " << partitionId << " was finished by " << r.GetConsumer()
11401150
<< ", firstMessage=" << r.GetStartedReadingFromEndOffset() << ", " << GetSdkDebugString0(r.GetScaleAwareSDK()));
11411151

1142-
if (ProccessReadingFinished(partitionId, ctx)) {
1152+
if (ProccessReadingFinished(partitionId, false, ctx)) {
11431153
ScheduleBalance(ctx);
11441154
}
11451155
} else if (!partition.IsInactive()) {
@@ -1540,11 +1550,12 @@ bool TBalancer::SetCommittedState(const TString& consumerName, ui32 partitionId,
15401550
return false;
15411551
}
15421552

1553+
auto wasInactive = consumer->IsInactive(partitionId);
15431554
if (consumer->SetCommittedState(partitionId, generation, cookie)) {
15441555
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
15451556
GetPrefix() << "The offset of the partition " << partitionId << " was commited by " << consumerName);
15461557

1547-
if (consumer->ProccessReadingFinished(partitionId, ctx)) {
1558+
if (consumer->ProccessReadingFinished(partitionId, wasInactive, ctx)) {
15481559
consumer->ScheduleBalance(ctx);
15491560
}
15501561

ydb/core/persqueue/read_balancer__balancing.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ struct TConsumer {
226226
bool Unlock(const TActorId& sender, ui32 partitionId, const TActorContext& ctx);
227227

228228
bool SetCommittedState(ui32 partitionId, ui32 generation, ui64 cookie);
229-
bool ProccessReadingFinished(ui32 partitionId, const TActorContext& ctx);
229+
bool ProccessReadingFinished(ui32 partitionId, bool wasInactive, const TActorContext& ctx);
230230
void StartReading(ui32 partitionId, const TActorContext& ctx);
231231
void FinishReading(TEvPersQueue::TEvReadingPartitionFinishedRequest::TPtr& ev, const TActorContext& ctx);
232232

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
937937
return balancerTabletId;
938938
}
939939

940-
void SplitPartition(TTopicSdkTestSetup& setup, const TString& topicPath, ui32 partitionId) {
940+
void SplitPartitionRB(TTopicSdkTestSetup& setup, const TString& topicPath, ui32 partitionId) {
941941
auto balancerTabletId = GetBalancerTabletId(setup, topicPath);
942942
auto edge = setup.GetRuntime().AllocateEdgeActor();
943943
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(partitionId, NKikimrPQ::EScaleStatus::NEED_SPLIT));
@@ -968,8 +968,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
968968
auto tableClient = setup.MakeTableClient();
969969
auto session = tableClient.CreateSession().GetValueSync().GetSession();
970970

971-
setup.GetServer().AnnoyingClient->MkDir("/Root", "dir");
972-
973971
ExecuteQuery(session, R"(
974972
--!syntax_v1
975973
CREATE TOPIC `/Root/dir/origin`
@@ -980,7 +978,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
980978
)");
981979

982980
AssertPartitionCount(setup, "/Root/dir/origin", 1);
983-
SplitPartition(setup, "/Root/dir/origin", 0);
981+
SplitPartitionRB(setup, "/Root/dir/origin", 0);
984982
WaitAndAssertPartitionCount(setup, "/Root/dir/origin", 3);
985983
}
986984

@@ -1009,10 +1007,50 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
10091007
)");
10101008

10111009
AssertPartitionCount(setup, "/Root/origin/feed", 1);
1012-
SplitPartition(setup, "/Root/origin/feed/streamImpl", 0);
1010+
SplitPartitionRB(setup, "/Root/origin/feed/streamImpl", 0);
10131011
WaitAndAssertPartitionCount(setup, "/Root/origin/feed", 3);
10141012
}
10151013

1014+
Y_UNIT_TEST(BalancingAfterSplit_sessionsWithPartition) {
1015+
TTopicSdkTestSetup setup = CreateSetup();
1016+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
1017+
1018+
TTopicClient client = setup.MakeClient();
1019+
1020+
auto writeSession = CreateWriteSession(client, "producer-1", 0);
1021+
UNIT_ASSERT(writeSession->Write(Msg("message_1.1", 2)));
1022+
1023+
ui64 txId = 1023;
1024+
SplitPartition(setup, ++txId, 0, "a");
1025+
1026+
auto readSession0 = CreateTestReadSession({ .Name="Session-0", .Setup=setup, .Sdk = SdkVersion::Topic, .ExpectedMessagesCount = 1, .AutoCommit = false, .Partitions = {0}, .AutoPartitioningSupport = true });
1027+
1028+
readSession0->WaitAndAssertPartitions({0}, "Must read partition 0");
1029+
readSession0->WaitAllMessages();
1030+
1031+
1032+
for(size_t i = 0; i < 10; ++i) {
1033+
auto events = readSession0->GetEndedPartitionEvents();
1034+
if (events.empty()) {
1035+
Sleep(TDuration::Seconds(1));
1036+
continue;
1037+
}
1038+
readSession0->Commit();
1039+
break;
1040+
}
1041+
1042+
auto readSession1 = CreateTestReadSession({ .Name="Session-1", .Setup=setup, .Sdk = SdkVersion::Topic, .AutoCommit = false, .Partitions = {1}, .AutoPartitioningSupport = true });
1043+
readSession1->WaitAndAssertPartitions({1}, "Must read partition 1");
1044+
1045+
auto readSession2 = CreateTestReadSession({ .Name="Session-2", .Setup=setup, .Sdk = SdkVersion::Topic, .AutoCommit = false, .Partitions = {2}, .AutoPartitioningSupport = true });
1046+
readSession2->WaitAndAssertPartitions({2}, "Must read partition 2");
1047+
1048+
writeSession->Close();
1049+
readSession0->Close();
1050+
readSession1->Close();
1051+
readSession2->Close();
1052+
}
1053+
10161054
Y_UNIT_TEST(MidOfRange) {
10171055
auto AsString = [](std::vector<ui16> vs) {
10181056
TStringBuilder a;

0 commit comments

Comments
 (0)