Skip to content

Commit 800705f

Browse files
authored
YQ-3943 Fix TopicPartitionsLimitPerNode limit (#12302)
1 parent 9216a5b commit 800705f

File tree

2 files changed

+59
-30
lines changed

2 files changed

+59
-30
lines changed

ydb/core/fq/libs/row_dispatcher/coordinator.cpp

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,22 +52,40 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
5252

5353
const ui64 PrintStatePeriodSec = 300;
5454

55-
struct TPartitionKey {
55+
struct TTopicKey {
5656
TString Endpoint;
5757
TString Database;
5858
TString TopicName;
59-
ui64 PartitionId;
6059

6160
size_t Hash() const noexcept {
6261
ui64 hash = std::hash<TString>()(Endpoint);
6362
hash = CombineHashes<ui64>(hash, std::hash<TString>()(Database));
6463
hash = CombineHashes<ui64>(hash, std::hash<TString>()(TopicName));
64+
return hash;
65+
}
66+
bool operator==(const TTopicKey& other) const {
67+
return Endpoint == other.Endpoint && Database == other.Database
68+
&& TopicName == other.TopicName;
69+
}
70+
};
71+
72+
struct TTopicKeyHash {
73+
int operator()(const TTopicKey& k) const {
74+
return k.Hash();
75+
}
76+
};
77+
78+
struct TPartitionKey {
79+
TTopicKey Topic;
80+
ui64 PartitionId;
81+
82+
size_t Hash() const noexcept {
83+
ui64 hash = Topic.Hash();
6584
hash = CombineHashes<ui64>(hash, std::hash<ui64>()(PartitionId));
6685
return hash;
6786
}
6887
bool operator==(const TPartitionKey& other) const {
69-
return Endpoint == other.Endpoint && Database == other.Database
70-
&& TopicName == other.TopicName && PartitionId == other.PartitionId;
88+
return Topic == other.Topic && PartitionId == other.PartitionId;
7189
}
7290
};
7391

@@ -156,7 +174,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
156174
const TString Tenant;
157175
TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers;
158176
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
159-
THashMap<TString, TTopicInfo> TopicsInfo;
177+
THashMap<TTopicKey, TTopicInfo, TTopicKeyHash> TopicsInfo;
160178
std::unordered_map<TActorId, TCoordinatorRequest> PendingReadActors;
161179
TCoordinatorMetrics Metrics;
162180
THashSet<TActorId> InterconnectSessions;
@@ -196,7 +214,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
196214

197215
void AddRowDispatcher(NActors::TActorId actorId, bool isLocal);
198216
void PrintInternalState();
199-
TTopicInfo& GetOrCreateTopicInfo(const TString& topicName);
217+
TTopicInfo& GetOrCreateTopicInfo(const TTopicKey& topic);
200218
std::optional<TActorId> GetAndUpdateLocation(const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached
201219
bool ComputeCoordinatorRequest(TActorId readActorId, const TCoordinatorRequest& request);
202220
void UpdatePendingReadActors();
@@ -288,12 +306,12 @@ void TActorCoordinator::PrintInternalState() {
288306

289307
str << "\nLocations:\n";
290308
for (auto& [key, actorId] : PartitionLocations) {
291-
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
309+
str << " " << key.Topic.Endpoint << " / " << key.Topic.Database << " / " << key.Topic.TopicName << ", partId " << key.PartitionId << ", row dispatcher actor id: " << actorId << "\n";
292310
}
293311

294312
str << "\nPending partitions:\n";
295-
for (const auto& [topicName, topicInfo] : TopicsInfo) {
296-
str << " " << topicName << ", pending partitions: " << topicInfo.PendingPartitions.size() << "\n";
313+
for (const auto& [topic, topicInfo] : TopicsInfo) {
314+
str << " " << topic.TopicName << " (" << topic.Endpoint << "), pending partitions: " << topicInfo.PendingPartitions.size() << "\n";
297315
}
298316

299317
LOG_ROW_DISPATCHER_DEBUG(str.Str());
@@ -337,18 +355,18 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt
337355
Metrics.IsActive->Set(isActive);
338356
}
339357

340-
TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TString& topicName) {
341-
const auto it = TopicsInfo.find(topicName);
358+
TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo(const TTopicKey& topic) {
359+
const auto it = TopicsInfo.find(topic);
342360
if (it != TopicsInfo.end()) {
343361
return it->second;
344362
}
345-
return TopicsInfo.insert({topicName, TTopicInfo(Metrics, topicName)}).first->second;
363+
return TopicsInfo.insert({topic, TTopicInfo(Metrics, topic.TopicName)}).first->second;
346364
}
347365

348366
std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation(const TPartitionKey& key) {
349367
Y_ENSURE(!PartitionLocations.contains(key));
350368

351-
auto& topicInfo = GetOrCreateTopicInfo(key.TopicName);
369+
auto& topicInfo = GetOrCreateTopicInfo(key.Topic);
352370

353371
TActorId bestLocation;
354372
ui64 bestNumberPartitions = std::numeric_limits<ui64>::max();
@@ -391,17 +409,14 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
391409
UpdateInterconnectSessions(ev->InterconnectSession);
392410

393411
TStringStream str;
394-
str << "TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: ";
395-
for (auto& partitionId : ev->Get()->Record.GetPartitionId()) {
396-
str << partitionId << ", ";
397-
}
398-
LOG_ROW_DISPATCHER_DEBUG(str.Str());
412+
LOG_ROW_DISPATCHER_INFO("TEvCoordinatorRequest from " << ev->Sender.ToString() << ", " << source.GetTopicPath() << ", partIds: " << JoinSeq(", ", ev->Get()->Record.GetPartitionId()));
399413
Metrics.IncomingRequests->Inc();
400414

401415
TCoordinatorRequest request = {.Cookie = ev->Cookie, .Record = ev->Get()->Record};
402416
if (ComputeCoordinatorRequest(ev->Sender, request)) {
403417
PendingReadActors.erase(ev->Sender);
404418
} else {
419+
LOG_ROW_DISPATCHER_INFO("All nodes are overloaded, add request into pending queue");
405420
// All nodes are overloaded, add request into pending queue
406421
// We save only last request from each read actor
407422
PendingReadActors[ev->Sender] = request;
@@ -416,7 +431,8 @@ bool TActorCoordinator::ComputeCoordinatorRequest(TActorId readActorId, const TC
416431
bool hasPendingPartitions = false;
417432
TMap<NActors::TActorId, TSet<ui64>> tmpResult;
418433
for (auto& partitionId : request.Record.GetPartitionId()) {
419-
TPartitionKey key{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), partitionId};
434+
TTopicKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath()};
435+
TPartitionKey key {topicKey, partitionId};
420436
auto locationIt = PartitionLocations.find(key);
421437
NActors::TActorId rowDispatcherId;
422438
if (locationIt != PartitionLocations.end()) {

ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class TFixture : public NUnitTest::TBaseFixture {
3535

3636
NConfig::TRowDispatcherCoordinatorConfig config;
3737
config.SetCoordinationNodePath("RowDispatcher");
38+
config.SetTopicPartitionsLimitPerNode(1);
3839
auto& database = *config.MutableDatabase();
3940
database.SetEndpoint("YDB_ENDPOINT");
4041
database.SetDatabase("YDB_DATABASE");
@@ -59,12 +60,12 @@ class TFixture : public NUnitTest::TBaseFixture {
5960
}
6061

6162
NYql::NPq::NProto::TDqPqTopicSource BuildPqTopicSourceSettings(
62-
TString topic)
63+
TString endpoint, TString topic)
6364
{
6465
NYql::NPq::NProto::TDqPqTopicSource settings;
6566
settings.SetTopicPath(topic);
6667
settings.SetConsumerName("PqConsumer");
67-
settings.SetEndpoint("Endpoint");
68+
settings.SetEndpoint(endpoint);
6869
settings.MutableToken()->SetName("token");
6970
settings.SetDatabase("Database");
7071
return settings;
@@ -84,9 +85,9 @@ class TFixture : public NUnitTest::TBaseFixture {
8485
//UNIT_ASSERT(eventHolder.Get() != nullptr);
8586
}
8687

87-
void MockRequest(NActors::TActorId readActorId, TString topicName, const std::vector<ui64>& partitionId) {
88+
void MockRequest(NActors::TActorId readActorId, TString endpoint, TString topicName, const std::vector<ui64>& partitionId) {
8889
auto event = new NFq::TEvRowDispatcher::TEvCoordinatorRequest(
89-
BuildPqTopicSourceSettings(topicName),
90+
BuildPqTopicSourceSettings(endpoint, topicName),
9091
partitionId);
9192
Runtime.Send(new NActors::IEventHandle(Coordinator, readActorId, event));
9293
}
@@ -107,8 +108,6 @@ class TFixture : public NUnitTest::TBaseFixture {
107108
NActors::TActorId RowDispatcher2Id;
108109
NActors::TActorId ReadActor1;
109110
NActors::TActorId ReadActor2;
110-
111-
NYql::NPq::NProto::TDqPqTopicSource Source1 = BuildPqTopicSourceSettings("Source1");
112111
};
113112

114113
Y_UNIT_TEST_SUITE(CoordinatorTests) {
@@ -121,17 +120,17 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {
121120
Ping(id);
122121
}
123122

124-
MockRequest(ReadActor1, "topic1", {0});
123+
MockRequest(ReadActor1, "endpoint", "topic1", {0});
125124
auto result1 = ExpectResult(ReadActor1);
126125

127-
MockRequest(ReadActor2, "topic1", {0});
126+
MockRequest(ReadActor2, "endpoint", "topic1", {0});
128127
auto result2 = ExpectResult(ReadActor2);
129128

130129
UNIT_ASSERT(result1.PartitionsSize() == 1);
131130
UNIT_ASSERT(result2.PartitionsSize() == 1);
132131
UNIT_ASSERT(google::protobuf::util::MessageDifferencer::Equals(result1, result2));
133132

134-
MockRequest(ReadActor2, "topic1", {1});
133+
MockRequest(ReadActor2, "endpoint", "topic1", {1});
135134
auto result3 = ExpectResult(ReadActor2);
136135

137136
TActorId actualRowDispatcher1 = ActorIdFromProto(result1.GetPartitions(0).GetActorId());
@@ -151,15 +150,29 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {
151150
auto newDispatcher2Id = Runtime.AllocateEdgeActor(1);
152151
Ping(newDispatcher2Id);
153152

154-
MockRequest(ReadActor1, "topic1", {0});
153+
MockRequest(ReadActor1, "endpoint", "topic1", {0});
155154
auto result4 = ExpectResult(ReadActor1);
156155

157-
MockRequest(ReadActor2, "topic1", {1});
156+
MockRequest(ReadActor2, "endpoint", "topic1", {1});
158157
auto result5 = ExpectResult(ReadActor2);
159158

160159
UNIT_ASSERT(!google::protobuf::util::MessageDifferencer::Equals(result1, result4)
161160
|| !google::protobuf::util::MessageDifferencer::Equals(result3, result5));
162161
}
162+
163+
Y_UNIT_TEST_F(RouteTwoTopicWichSameName, TFixture) {
164+
ExpectCoordinatorChangesSubscribe();
165+
TSet<NActors::TActorId> rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId};
166+
for (auto id : rowDispatcherIds) {
167+
Ping(id);
168+
}
169+
170+
MockRequest(ReadActor1, "endpoint1", "topic1", {0, 1, 2});
171+
ExpectResult(ReadActor1);
172+
173+
MockRequest(ReadActor2, "endpoint2", "topic1", {3});
174+
ExpectResult(ReadActor2);
175+
}
163176
}
164177

165178
}

0 commit comments

Comments
 (0)