Skip to content

Commit 8a7394e

Browse files
authored
Fix balancing of partitions for special reading sessions (#10779)
1 parent 22600a9 commit 8a7394e

File tree

3 files changed

+102
-23
lines changed

3 files changed

+102
-23
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,12 @@ bool TPartitionFamily::Reset(ETargetStatus targetStatus, const TActorContext& ct
260260
GetPrefix() << " has been released for merge but target family is not exists.");
261261
return true;
262262
}
263-
Consumer.MergeFamilies(it->second.get(), this, ctx);
263+
auto* targetFamily = it->second.get();
264+
if (targetFamily->CanAttach(Partitions) && targetFamily->CanAttach(WantedPartitions)) {
265+
Consumer.MergeFamilies(targetFamily, this, ctx);
266+
} else {
267+
WantedPartitions.clear();
268+
}
264269

265270
return true;
266271
}
@@ -477,6 +482,23 @@ bool TPartitionFamily::PossibleForBalance(TSession* session) {
477482
return session->Pipe != LastPipe;
478483
}
479484

485+
template<typename TCollection>
486+
bool TPartitionFamily::CanAttach(const TCollection& partitionsIds) {
487+
if (partitionsIds.empty()) {
488+
return true;
489+
}
490+
491+
if (Consumer.WithCommonSessions) {
492+
return true;
493+
}
494+
495+
return AnyOf(SpecialSessions, [&](const auto& s) {
496+
return s.second->AllPartitionsReadable(partitionsIds);
497+
});
498+
}
499+
500+
template bool TPartitionFamily::CanAttach(const std::unordered_set<ui32>& partitionsIds);
501+
template bool TPartitionFamily::CanAttach(const std::vector<ui32>& partitionsIds);
480502

481503
void TPartitionFamily::ClassifyPartitions() {
482504
auto [activePartitionCount, inactivePartitionCount] = ClassifyPartitions(Partitions);
@@ -586,6 +608,7 @@ TConsumer::TConsumer(TBalancer& balancer, const TString& consumerName)
586608
: Balancer(balancer)
587609
, ConsumerName(consumerName)
588610
, NextFamilyId(0)
611+
, WithCommonSessions(false)
589612
, BalanceScheduled(false)
590613
{
591614
}
@@ -881,6 +904,7 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c
881904
}
882905
} else {
883906
OrderedSessions.reset();
907+
WithCommonSessions = true;
884908
}
885909
}
886910

@@ -901,6 +925,9 @@ void TConsumer::UnregisterReadingSession(TSession* session, const TActorContext&
901925
Sessions.erase(session->Pipe);
902926
if (!session->WithGroups()) {
903927
OrderedSessions.reset();
928+
WithCommonSessions = AnyOf(Sessions, [](const auto s) {
929+
return !s.second->WithGroups();
930+
});
904931
}
905932

906933
for (auto* family : Snapshot(Families)) {
@@ -920,6 +947,11 @@ void TConsumer::UnregisterReadingSession(TSession* session, const TActorContext&
920947
}
921948
}
922949
}
950+
951+
if (!family->CanAttach(family->WantedPartitions)) {
952+
targetStatus = TPartitionFamily::ETargetStatus::Destroy;
953+
}
954+
923955
if (family->Reset(targetStatus, ctx)) {
924956
UnreadableFamilies[family->Id] = family;
925957
FamiliesRequireBalancing.erase(family->Id);
@@ -1020,34 +1052,41 @@ bool TConsumer::ProccessReadingFinished(ui32 partitionId, bool wasInactive, cons
10201052
});
10211053

10221054
if (partition.NeedReleaseChildren()) {
1055+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1056+
GetPrefix() << "Attache partitions [" << JoinRange(", ", newPartitions.begin(), newPartitions.end()) << "] to " << family->DebugStr());
10231057
for (auto id : newPartitions) {
1024-
auto* node = GetPartitionGraph().GetPartition(id);
1025-
bool allParentsMerged = true;
1026-
if (node->Parents.size() > 1) {
1027-
// The partition was obtained as a result of the merge.
1028-
for (auto* c : node->Parents) {
1029-
auto* other = FindFamily(c->Id);
1030-
if (!other) {
1031-
allParentsMerged = false;
1032-
continue;
1033-
}
1058+
if (family->CanAttach(std::vector{id})) {
1059+
auto* node = GetPartitionGraph().GetPartition(id);
1060+
bool allParentsMerged = true;
1061+
if (node->Parents.size() > 1) {
1062+
// The partition was obtained as a result of the merge.
1063+
for (auto* c : node->Parents) {
1064+
auto* other = FindFamily(c->Id);
1065+
if (!other) {
1066+
allParentsMerged = false;
1067+
continue;
1068+
}
10341069

1035-
if (other != family) {
1036-
auto [f, v] = MergeFamilies(family, other, ctx);
1037-
allParentsMerged = v;
1038-
family = f;
1070+
if (other != family) {
1071+
auto [f, v] = MergeFamilies(family, other, ctx);
1072+
allParentsMerged = v;
1073+
family = f;
1074+
}
10391075
}
10401076
}
1041-
}
10421077

1043-
if (allParentsMerged) {
1044-
auto* other = FindFamily(id);
1045-
if (other && other != family) {
1046-
auto [f, _] = MergeFamilies(family, other, ctx);
1047-
family = f;
1048-
} else {
1049-
family->AttachePartitions({id}, ctx);
1078+
if (allParentsMerged) {
1079+
auto* other = FindFamily(id);
1080+
if (other && other != family) {
1081+
auto [f, _] = MergeFamilies(family, other, ctx);
1082+
family = f;
1083+
} else {
1084+
family->AttachePartitions({id}, ctx);
1085+
}
10501086
}
1087+
} else {
1088+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
1089+
GetPrefix() << "Can't attache partition " << id << " to " << family->DebugStr());
10511090
}
10521091
}
10531092
} else {

ydb/core/persqueue/read_balancer__balancing.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ struct TPartitionFamily {
127127
void InactivatePartition(ui32 partitionId);
128128

129129
bool PossibleForBalance(TSession* session);
130+
template<typename TCollection>
131+
bool CanAttach(const TCollection& partitionsIds);
130132

131133
TString DebugStr() const;
132134

@@ -186,6 +188,7 @@ struct TConsumer {
186188
// All reading sessions in which the family is currently being read.
187189
std::unordered_map<TActorId, TSession*> Sessions;
188190
std::optional<TOrderedSessions> OrderedSessions;
191+
bool WithCommonSessions;
189192

190193
// Families is not reading now.
191194
std::unordered_map<size_t, TPartitionFamily*> UnreadableFamilies;

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
10511051
readSession2->Close();
10521052
}
10531053

1054+
Y_UNIT_TEST(ReBalancingAfterSplit_sessionsWithPartition) {
1055+
TTopicSdkTestSetup setup = CreateSetup();
1056+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 2, 100);
1057+
1058+
TTopicClient client = setup.MakeClient();
1059+
1060+
auto writeSession = CreateWriteSession(client, "producer-1", 0);
1061+
UNIT_ASSERT(writeSession->Write(Msg("message_1.1", 2)));
1062+
writeSession->Close();
1063+
1064+
ui64 txId = 1023;
1065+
SplitPartition(setup, ++txId, 0, "a");
1066+
1067+
auto readSession1 = CreateTestReadSession({ .Name="Session-1", .Setup=setup, .Sdk = SdkVersion::Topic, .AutoCommit = false, .Partitions = {1}, .AutoPartitioningSupport = true });
1068+
auto readSession0 = CreateTestReadSession({ .Name="Session-0", .Setup=setup, .Sdk = SdkVersion::Topic, .ExpectedMessagesCount = 1, .AutoCommit = false, .Partitions = {0}, .AutoPartitioningSupport = true });
1069+
1070+
readSession0->WaitAndAssertPartitions({0}, "Must read partition 0");
1071+
readSession0->WaitAllMessages();
1072+
1073+
for(size_t i = 0; i < 10; ++i) {
1074+
auto events = readSession0->GetEndedPartitionEvents();
1075+
if (events.empty()) {
1076+
Sleep(TDuration::Seconds(1));
1077+
continue;
1078+
}
1079+
readSession0->Commit();
1080+
break;
1081+
}
1082+
1083+
readSession0->Close();
1084+
1085+
readSession0 = CreateTestReadSession({ .Name="Session-0", .Setup=setup, .Sdk = SdkVersion::Topic, .AutoCommit = false, .Partitions = {0}, .AutoPartitioningSupport = true });
1086+
readSession0->WaitAndAssertPartitions({0}, "Must read partition 0 because no more readers of it");
1087+
1088+
readSession0->Close();
1089+
}
1090+
10541091
Y_UNIT_TEST(MidOfRange) {
10551092
auto AsString = [](std::vector<ui16> vs) {
10561093
TStringBuilder a;

0 commit comments

Comments
 (0)