Skip to content

Commit 02c8589

Browse files
authored
PQRB HTTP app for autopartitioning diagnostic and fix of error code (#7775)
1 parent 989a628 commit 02c8589

File tree

7 files changed

+194
-134
lines changed

7 files changed

+194
-134
lines changed

ydb/core/persqueue/read_balancer.cpp

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ bool TPersQueueReadBalancer::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr e
187187

188188
TString TPersQueueReadBalancer::GenerateStat() {
189189
auto& metrics = AggregatedStats.Metrics;
190-
auto balancerStatistcs = Balancer->GetStatistics();
191190

192191
TStringStream str;
193192
HTML(str) {
@@ -214,9 +213,9 @@ TString TPersQueueReadBalancer::GenerateStat() {
214213
LI() {
215214
str << "<a href=\"#partitions\" data-toggle=\"tab\">Partitions</a>";
216215
}
217-
for (auto& consumer : balancerStatistcs.Consumers) {
216+
for (auto& [consumerName, _] : Balancer->GetConsumers()) {
218217
LI() {
219-
str << "<a href=\"#c_" << EncodeAnchor(consumer.ConsumerName) << "\" data-toggle=\"tab\">" << NPersQueue::ConvertOldConsumerName(consumer.ConsumerName) << "</a>";
218+
str << "<a href=\"#c_" << EncodeAnchor(consumerName) << "\" data-toggle=\"tab\">" << NPersQueue::ConvertOldConsumerName(consumerName) << "</a>";
220219
}
221220
}
222221
}
@@ -245,7 +244,7 @@ TString TPersQueueReadBalancer::GenerateStat() {
245244
TABLE_CLASS("properties") {
246245
CAPTION() { str << "Statistics"; }
247246
TABLEBODY() {
248-
property("Active pipes", balancerStatistcs.Sessions.size());
247+
property("Active pipes", Balancer->GetSessions().size());
249248
property("Active partitions", NumActiveParts);
250249
property("Total data size", AggregatedStats.TotalDataSize);
251250
property("Reserve size", PartitionReserveSize());
@@ -264,22 +263,59 @@ TString TPersQueueReadBalancer::GenerateStat() {
264263
}
265264

266265
DIV_CLASS_ID("tab-pane fade", "partitions") {
266+
auto partitionAnchor = [&](const ui32 partitionId) {
267+
return TStringBuilder() << "P" << partitionId;
268+
};
269+
267270
TABLE_CLASS("table") {
268271
TABLEHEAD() {
269272
TABLER() {
270-
TABLEH() {str << "partition";}
271-
TABLEH() { str << "tabletId";}
272-
TABLEH() { str << "Size";}
273+
TABLEH() { str << "Partition"; }
274+
TABLEH() { str << "Status"; }
275+
TABLEH() { str << "TabletId"; }
276+
TABLEH() { str << "Parents"; }
277+
TABLEH() { str << "Children"; }
278+
TABLEH() { str << "Size"; }
273279
}
274280
}
275281
TABLEBODY() {
276282
for (auto& [partitionId, partitionInfo] : PartitionsInfo) {
277283
const auto& stats = AggregatedStats.Stats[partitionId];
284+
const auto* node = PartitionGraph.GetPartition(partitionId);
285+
TString style = node && node->Children.empty() ? "text-success" : "text-muted";
278286

279287
TABLER() {
280-
TABLED() { str << partitionId;}
288+
TABLED() {
289+
DIV_CLASS_ID(style, partitionAnchor(partitionId)) {
290+
str << partitionId;
291+
}
292+
}
293+
TABLED() {
294+
if (node) {
295+
str << (node->Children.empty() ? "Active" : "Inactive");
296+
if (node->IsRoot()) {
297+
str << " (root)";
298+
}
299+
}
300+
}
281301
TABLED() { HREF(TStringBuilder() << "?TabletID=" << partitionInfo.TabletId) { str << partitionInfo.TabletId; } }
282-
TABLED() { str << stats.DataSize;}
302+
TABLED() {
303+
if (node) {
304+
for (auto* parent : node->Parents) {
305+
HREF("#" + partitionAnchor(parent->Id)) { str << parent->Id; }
306+
str << ", ";
307+
}
308+
}
309+
}
310+
TABLED() {
311+
if (node) {
312+
for (auto* child : node->Children) {
313+
HREF("#" + partitionAnchor(child->Id)) { str << child->Id; }
314+
str << ", ";
315+
}
316+
}
317+
}
318+
TABLED() { str << stats.DataSize; }
283319
}
284320
}
285321
}

ydb/core/persqueue/read_balancer__balancing.cpp

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,10 @@ void TPartitionFamily::Merge(TPartitionFamily* other) {
411411
other->RootPartitions.clear();
412412

413413
WantedPartitions.insert(other->WantedPartitions.begin(), other->WantedPartitions.end());
414-
WantedPartitions.clear();
414+
other->WantedPartitions.clear();
415+
416+
LockedPartitions.insert(other->LockedPartitions.begin(), other->LockedPartitions.end());
417+
other->LockedPartitions.clear();
415418

416419
ChangePartitionCounters(other->ActivePartitionCount, other->InactivePartitionCount);
417420
other->ChangePartitionCounters(-other->ActivePartitionCount, -other->InactivePartitionCount);
@@ -1467,50 +1470,15 @@ TConsumer* TBalancer::GetConsumer(const TString& consumerName) {
14671470
return it->second.get();
14681471
}
14691472

1470-
const TStatistics TBalancer::GetStatistics() const {
1471-
TStatistics result;
1472-
1473-
result.Consumers.reserve(Consumers.size());
1474-
for (auto& [_, consumer] : Consumers) {
1475-
result.Consumers.push_back(TStatistics::TConsumerStatistics());
1476-
auto& c = result.Consumers.back();
1477-
1478-
c.ConsumerName = consumer->ConsumerName;
1479-
c.Partitions.reserve(GetPartitionsInfo().size());
1480-
for (auto [partitionId, partitionInfo] : GetPartitionsInfo()) {
1481-
c.Partitions.push_back(TStatistics::TConsumerStatistics::TPartitionStatistics());
1482-
auto& p = c.Partitions.back();
1483-
p.PartitionId = partitionId;
1484-
p.TabletId = partitionInfo.TabletId;
1485-
1486-
auto* family = consumer->FindFamily(partitionId);
1487-
if (family && family->Session && family->LockedPartitions.contains(partitionId)) {
1488-
p.Session = family->Session->SessionName;
1489-
p.State = 1;
1490-
}
1491-
}
1492-
}
1493-
1494-
size_t readablePartitionCount = 0;
1495-
1496-
result.Sessions.reserve(Sessions.size());
1497-
for (auto& [_, session] : Sessions) {
1498-
result.Sessions.push_back(TStatistics::TSessionStatistics());
1499-
auto& s = result.Sessions.back();
1500-
s.Session = session->SessionName;
1501-
s.ActivePartitionCount = session->ActivePartitionCount;
1502-
s.InactivePartitionCount = session->InactivePartitionCount;
1503-
s.SuspendedPartitionCount = session->ReleasingPartitionCount;
1504-
s.TotalPartitionCount = s.ActivePartitionCount + s.InactivePartitionCount;
1505-
1506-
readablePartitionCount += s.TotalPartitionCount;
1507-
}
1508-
1509-
result.FreePartitions = GetPartitionsInfo().size() - readablePartitionCount;
1473+
const std::unordered_map<TString, std::unique_ptr<TConsumer>>& TBalancer::GetConsumers() const {
1474+
return Consumers;
1475+
}
15101476

1511-
return result;
1477+
const std::unordered_map<TActorId, std::unique_ptr<TSession>>& TBalancer::GetSessions() const {
1478+
return Sessions;
15121479
}
15131480

1481+
15141482
void TBalancer::UpdateConfig(std::vector<ui32> addedPartitions, std::vector<ui32> deletedPartitions, const TActorContext& ctx) {
15151483
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
15161484
GetPrefix() << "updating configuration. Deleted partitions [" << JoinRange(", ", deletedPartitions.begin(), deletedPartitions.end())

ydb/core/persqueue/read_balancer__balancing.h

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -285,32 +285,6 @@ struct TSession {
285285
TString DebugStr() const;
286286
};
287287

288-
struct TStatistics {
289-
struct TConsumerStatistics {
290-
struct TPartitionStatistics {
291-
ui32 PartitionId;
292-
ui64 TabletId = 0;
293-
ui32 State = 0;
294-
TString Session;
295-
};
296-
297-
TString ConsumerName;
298-
std::vector<TPartitionStatistics> Partitions;
299-
};
300-
301-
struct TSessionStatistics {
302-
TString Session;
303-
size_t ActivePartitionCount;
304-
size_t InactivePartitionCount;
305-
size_t SuspendedPartitionCount;
306-
size_t TotalPartitionCount;
307-
};
308-
309-
std::vector<TConsumerStatistics> Consumers;
310-
std::vector<TSessionStatistics> Sessions;
311-
312-
size_t FreePartitions;
313-
};
314288

315289
class TBalancer {
316290
friend struct TConsumer;
@@ -328,7 +302,8 @@ class TBalancer {
328302
i32 GetLifetimeSeconds() const;
329303

330304
TConsumer* GetConsumer(const TString& consumerName);
331-
const TStatistics GetStatistics() const;
305+
const std::unordered_map<TString, std::unique_ptr<TConsumer>>& GetConsumers() const;
306+
const std::unordered_map<TActorId, std::unique_ptr<TSession>>& GetSessions() const;
332307

333308
void UpdateConfig(std::vector<ui32> addedPartitions, std::vector<ui32> deletedPartitions, const TActorContext& ctx);
334309
bool SetCommittedState(const TString& consumer, ui32 partitionId, ui32 generation, ui64 cookie, const TActorContext& ctx);

0 commit comments

Comments
 (0)