Skip to content

Commit e7b25a7

Browse files
authored
Randomize order of sessions (#8359)
1 parent 01f56bd commit e7b25a7

File tree

4 files changed

+49
-22
lines changed

4 files changed

+49
-22
lines changed

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@
66
namespace NKikimr::NPQ::NBalancing {
77

88

9+
struct LowLoadSessionComparator {
10+
bool operator()(const TSession* lhs, const TSession* rhs) const;
11+
};
12+
13+
using TLowLoadOrderedSessions = std::set<TSession*, LowLoadSessionComparator>;
14+
15+
16+
917
//
1018
// TPartition
1119
//
@@ -868,6 +876,8 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c
868876
CreateFamily({partitionId}, ctx);
869877
}
870878
}
879+
} else {
880+
OrderedSessions.reset();
871881
}
872882
}
873883

@@ -886,6 +896,9 @@ std::vector<TPartitionFamily*> Snapshot(const std::unordered_map<size_t, const s
886896
void TConsumer::UnregisterReadingSession(TSession* session, const TActorContext& ctx) {
887897
auto pipe = session->Pipe;
888898
Sessions.erase(session->Pipe);
899+
if (!session->WithGroups()) {
900+
OrderedSessions.reset();
901+
}
889902

890903
for (auto* family : Snapshot(Families)) {
891904
auto special = family->SpecialSessions.erase(pipe);
@@ -1156,11 +1169,11 @@ void TConsumer::ScheduleBalance(const TActorContext& ctx) {
11561169
ctx.Send(Balancer.TopicActor.SelfId(), new TEvPQ::TEvBalanceConsumer(ConsumerName));
11571170
}
11581171

1159-
TOrderedSessions OrderSessions(
1172+
TLowLoadOrderedSessions OrderSessions(
11601173
const std::unordered_map<TActorId, TSession*>& values,
11611174
std::function<bool (const TSession*)> predicate = [](const TSession*) { return true; }
11621175
) {
1163-
TOrderedSessions result;
1176+
TLowLoadOrderedSessions result;
11641177
for (auto& [_, v] : values) {
11651178
if (predicate(v)) {
11661179
result.insert(v);
@@ -1244,7 +1257,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
12441257
}
12451258
}
12461259

1247-
TOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) {
1260+
TLowLoadOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) {
12481261
return !session->WithGroups();
12491262
});
12501263

@@ -1253,7 +1266,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
12531266
auto families = OrderFamilies(UnreadableFamilies);
12541267
for (auto it = families.rbegin(); it != families.rend(); ++it) {
12551268
auto* family = *it;
1256-
TOrderedSessions specialSessions;
1269+
TLowLoadOrderedSessions specialSessions;
12571270
auto& sessions = (family->IsCommon()) ? commonSessions : (specialSessions = OrderSessions(family->SpecialSessions));
12581271

12591272
auto sit = sessions.begin();
@@ -1297,7 +1310,11 @@ void TConsumer::Balance(const TActorContext& ctx) {
12971310
GetPrefix() << "start rebalancing. familyCount=" << familyCount << ", sessionCount=" << commonSessions.size()
12981311
<< ", desiredFamilyCount=" << desiredFamilyCount << ", allowPlusOne=" << allowPlusOne);
12991312

1300-
for (auto it = commonSessions.rbegin(); it != commonSessions.rend(); ++it) {
1313+
if (!OrderedSessions) {
1314+
OrderedSessions.emplace();
1315+
OrderedSessions->insert(commonSessions.begin(), commonSessions.end());
1316+
}
1317+
for (auto it = OrderedSessions->begin(); it != OrderedSessions->end(); ++it) {
13011318
auto* session = *it;
13021319
auto targerFamilyCount = desiredFamilyCount + (allowPlusOne ? 1 : 0);
13031320
auto families = OrderFamilies(session->Families);
@@ -1308,7 +1325,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
13081325
}
13091326
}
13101327

1311-
if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) {
1328+
if (allowPlusOne) {
13121329
--allowPlusOne;
13131330
}
13141331
}
@@ -1397,7 +1414,8 @@ TSession::TSession(const TActorId& pipe)
13971414
, InactivePartitionCount(0)
13981415
, ReleasingPartitionCount(0)
13991416
, ActiveFamilyCount(0)
1400-
, ReleasingFamilyCount(0) {
1417+
, ReleasingFamilyCount(0)
1418+
, Order(RandomNumber<size_t>()) {
14011419
}
14021420

14031421
bool TSession::WithGroups() const { return !Partitions.empty(); }
@@ -1850,18 +1868,23 @@ bool TPartitionFamilyComparator::operator()(const TPartitionFamily* lhs, const T
18501868
}
18511869

18521870
bool SessionComparator::operator()(const TSession* lhs, const TSession* rhs) const {
1871+
if (lhs->Order != rhs->Order) {
1872+
return lhs->Order < rhs->Order;
1873+
}
1874+
return lhs->SessionName < rhs->SessionName;
1875+
}
1876+
1877+
1878+
bool LowLoadSessionComparator::operator()(const TSession* lhs, const TSession* rhs) const {
18531879
if (lhs->ActiveFamilyCount != rhs->ActiveFamilyCount) {
18541880
return lhs->ActiveFamilyCount < rhs->ActiveFamilyCount;
18551881
}
1856-
if (lhs->ActivePartitionCount != rhs->ActivePartitionCount) {
1857-
return lhs->ActivePartitionCount < rhs->ActivePartitionCount;
1858-
}
1859-
if (lhs->InactivePartitionCount != rhs->InactivePartitionCount) {
1860-
return lhs->InactivePartitionCount < rhs->InactivePartitionCount;
1861-
}
18621882
if (lhs->Partitions.size() != rhs->Partitions.size()) {
18631883
return lhs->Partitions.size() < rhs->Partitions.size();
18641884
}
1885+
if (lhs->Order != rhs->Order) {
1886+
return lhs->Order < rhs->Order;
1887+
}
18651888
return lhs->SessionName < rhs->SessionName;
18661889
}
18671890

ydb/core/persqueue/read_balancer__balancing.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ struct TConsumer {
185185
std::unordered_map<ui32, TPartitionFamily*> PartitionMapping;
186186
// All reading sessions in which the family is currently being read.
187187
std::unordered_map<TActorId, TSession*> Sessions;
188+
std::optional<TOrderedSessions> OrderedSessions;
188189

189190
// Families is not reading now.
190191
std::unordered_map<size_t, TPartitionFamily*> UnreadableFamilies;
@@ -276,6 +277,8 @@ struct TSession {
276277
// The partition families that are being read by this session.
277278
std::unordered_map<size_t, TPartitionFamily*> Families;
278279

280+
size_t Order;
281+
279282
// true if client connected to read from concret partitions
280283
bool WithGroups() const;
281284

ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ void TPQReadService::TSession::SendEvent(IEventBase* ev) {
154154
void TPQReadService::TSession::CreateActor(std::unique_ptr<NPersQueue::TTopicsListController>&& topicsHandler) {
155155
auto classifier = Proxy->GetClassifier();
156156

157+
auto g(Guard(Lock));
157158
auto* actor = new TReadSessionActor(this, *topicsHandler, Cookie, SchemeCache, NewSchemeCache, Counters,
158159
classifier ? classifier->ClassifyAddress(GetPeerName()) : "unknown");
159160
ui32 poolId = Proxy->ActorSystem->AppData<::NKikimr::TAppData>()->UserPoolId;
@@ -204,10 +205,10 @@ ui64 TPQReadService::NextCookie() {
204205

205206
void TPQReadService::ReleaseSession(ui64 cookie) {
206207
auto g(Guard(Lock));
207-
bool erased = Sessions.erase(cookie);
208-
if (erased)
208+
if (Sessions.erase(cookie)) {
209+
g.Release();
209210
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,-1,0));
210-
211+
}
211212
}
212213

213214
void TPQReadService::CheckClusterChange(const TString& localCluster, const bool) {

ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,9 @@ void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) {
124124
std::unique_ptr<IEventBase> e;
125125
e.reset(ev);
126126

127-
TGuard<TSpinLock> lock(Lock);
127+
auto lock(Guard(Lock));
128128
if (ActorId) {
129+
lock.Release();
129130
Proxy->ActorSystem->Send(ActorId, e.release());
130131
}
131132
}
@@ -161,11 +162,10 @@ ui64 TPQWriteServiceImpl::NextCookie() {
161162

162163

163164
void TPQWriteServiceImpl::ReleaseSession(TSessionRef session) {
164-
with_lock (Lock) {
165-
bool erased = Sessions.erase(session->GetCookie());
166-
if (erased) {
167-
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0));
168-
}
165+
auto lock(Guard(Lock));
166+
if (Sessions.erase(session->GetCookie())) {
167+
lock.Release();
168+
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0));
169169
}
170170
}
171171

0 commit comments

Comments
 (0)