Skip to content

Commit 95f4df7

Browse files
authored
Datashard follower stats in sys_view top_partitions (#10849)
1 parent c536f4b commit 95f4df7

File tree

12 files changed

+352
-88
lines changed

12 files changed

+352
-88
lines changed

ydb/core/sys_view/common/schema.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ struct Schema : NIceDb::Schema {
470470
struct RowCount : Column<9, NScheme::NTypeIds::Uint64> {};
471471
struct IndexSize : Column<10, NScheme::NTypeIds::Uint64> {};
472472
struct InFlightTxCount : Column<11, NScheme::NTypeIds::Uint32> {};
473+
struct FollowerId : Column<12, NScheme::NTypeIds::Uint32> {};
473474

474475
using TKey = TableKey<IntervalEnd, Rank>;
475476
using TColumns = TableColumns<
@@ -483,7 +484,8 @@ struct Schema : NIceDb::Schema {
483484
DataSize,
484485
RowCount,
485486
IndexSize,
486-
InFlightTxCount>;
487+
InFlightTxCount,
488+
FollowerId>;
487489
};
488490

489491
struct QuerySessions : Table<13> {

ydb/core/sys_view/partition_stats/partition_stats.cpp

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,19 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
7676
const auto& domainKey = ev->Get()->DomainKey;
7777
const auto& pathId = ev->Get()->PathId;
7878

79+
SVLOG_T("TEvSysView::TEvSetPartitioning: domainKey " << domainKey
80+
<< " pathId " << pathId
81+
<< " path " << ev->Get()->Path
82+
<< " ShardIndices size " << ev->Get()->ShardIndices.size());
83+
7984
auto& tables = DomainTables[domainKey];
8085
auto tableFound = tables.Stats.find(pathId);
8186
if (tableFound != tables.Stats.end()) {
8287
auto& table = tableFound->second;
8388

8489
auto& oldPartitions = table.Partitions;
8590
std::unordered_map<TShardIdx, TPartitionStats> newPartitions;
86-
std::unordered_set<TShardIdx> overloaded;
91+
std::set<TOverloadedFollower> overloaded;
8792

8893
for (auto shardIdx : ev->Get()->ShardIndices) {
8994
auto old = oldPartitions.find(shardIdx);
@@ -92,7 +97,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
9297

9398
for (const auto& followerStat: old->second.FollowerStats) {
9499
if (IsPartitionOverloaded(followerStat.second))
95-
overloaded.insert(shardIdx);
100+
overloaded.insert({shardIdx, followerStat.first});
96101
}
97102
}
98103
}
@@ -148,12 +153,13 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
148153

149154
auto& followerStats = partitionStats.FollowerStats[followerId];
150155

156+
TOverloadedFollower overloadedFollower = {shardIdx, followerId};
151157
if (IsPartitionOverloaded(newStats)) {
152-
tables.Overloaded[pathId].insert(shardIdx);
158+
tables.Overloaded[pathId].insert(overloadedFollower);
153159
} else {
154160
auto overloadedFound = tables.Overloaded.find(pathId);
155161
if (overloadedFound != tables.Overloaded.end()) {
156-
overloadedFound->second.erase(shardIdx);
162+
overloadedFound->second.erase(overloadedFollower);
157163
if (overloadedFound->second.empty()) {
158164
tables.Overloaded.erase(pathId);
159165
}
@@ -373,15 +379,16 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
373379
struct TPartition {
374380
TPathId PathId;
375381
TShardIdx ShardIdx;
382+
ui32 FollowerId;
376383
double CPUCores;
377384
};
378385
std::vector<TPartition> sorted;
379386

380-
for (const auto& [pathId, shardIndices] : domainTables.Overloaded) {
381-
for (const auto& shardIdx : shardIndices) {
387+
for (const auto& [pathId, overloadedFollowers] : domainTables.Overloaded) {
388+
for (const TOverloadedFollower& overloadedFollower : overloadedFollowers) {
382389
const auto& table = domainTables.Stats[pathId];
383-
const auto& partition = table.Partitions.at(shardIdx).FollowerStats.at(0);
384-
sorted.emplace_back(TPartition{pathId, shardIdx, partition.GetCPUCores()});
390+
const auto& partition = table.Partitions.at(overloadedFollower.ShardIdx).FollowerStats.at(overloadedFollower.FollowerId);
391+
sorted.emplace_back(TPartition{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetCPUCores()});
385392
}
386393
}
387394

@@ -395,18 +402,21 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
395402
auto sendEvent = MakeHolder<TEvSysView::TEvSendTopPartitions>();
396403
for (const auto& entry : sorted) {
397404
const auto& table = domainTables.Stats[entry.PathId];
398-
const auto& partition = table.Partitions.at(entry.ShardIdx).FollowerStats.at(0);
405+
const auto& followerStats = table.Partitions.at(entry.ShardIdx).FollowerStats;
406+
const auto& partition = followerStats.at(entry.FollowerId);
407+
const auto& leaderPartition = followerStats.at(0);
399408

400409
auto* result = sendEvent->Record.AddPartitions();
401410
result->SetTabletId(partition.GetTabletId());
402411
result->SetPath(table.Path);
403412
result->SetPeakTimeUs(nowUs);
404413
result->SetCPUCores(partition.GetCPUCores());
405414
result->SetNodeId(partition.GetNodeId());
406-
result->SetDataSize(partition.GetDataSize());
407-
result->SetRowCount(partition.GetRowCount());
408-
result->SetIndexSize(partition.GetIndexSize());
415+
result->SetDataSize(leaderPartition.GetDataSize());
416+
result->SetRowCount(leaderPartition.GetRowCount());
417+
result->SetIndexSize(leaderPartition.GetIndexSize());
409418
result->SetInFlightTxCount(partition.GetInFlightTxCount());
419+
result->SetFollowerId(partition.GetFollowerId());
410420

411421
if (++count == TOP_PARTITIONS_COUNT) {
412422
break;
@@ -438,8 +448,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
438448
}
439449

440450
bool IsPartitionOverloaded(const NKikimrSysView::TPartitionStats& stats) const {
441-
return stats.GetCPUCores() >= OverloadedPartitionBound
442-
&& !stats.GetFollowerId();
451+
return stats.GetCPUCores() >= OverloadedPartitionBound;
443452
}
444453

445454
private:
@@ -452,8 +461,10 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
452461
double OverloadedPartitionBound = 0.7;
453462
TDuration ProcessOverloadedInterval = TDuration::Seconds(15);
454463

464+
typedef ui32 TFollowerId;
465+
455466
struct TPartitionStats {
456-
std::unordered_map<ui32, NKikimrSysView::TPartitionStats> FollowerStats;
467+
std::unordered_map<TFollowerId, NKikimrSysView::TPartitionStats> FollowerStats;
457468
};
458469

459470
struct TTableStats {
@@ -462,9 +473,22 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
462473
TString Path;
463474
};
464475

476+
struct TOverloadedFollower {
477+
TShardIdx ShardIdx;
478+
TFollowerId FollowerId;
479+
480+
bool operator<(const TOverloadedFollower &other) const {
481+
return std::tie(ShardIdx, FollowerId) < std::tie(other.ShardIdx, other.FollowerId);
482+
}
483+
484+
bool operator==(const TOverloadedFollower &other) const {
485+
return std::tie(ShardIdx, FollowerId) == std::tie(other.ShardIdx, other.FollowerId);
486+
}
487+
};
488+
465489
struct TDomainTables {
466490
std::map<TPathId, TTableStats> Stats;
467-
std::unordered_map<TPathId, std::unordered_set<TShardIdx>> Overloaded;
491+
std::unordered_map<TPathId, std::set<TOverloadedFollower>> Overloaded;
468492
};
469493
std::unordered_map<TPathId, TDomainTables> DomainTables;
470494

ydb/core/sys_view/partition_stats/top_partitions.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ struct TTopPartitionsExtractorMap :
5858
insert({S::InFlightTxCount::ColumnId, [] (const E& entry) {
5959
return TCell::Make<ui32>(entry.GetInfo().GetInFlightTxCount());
6060
}});
61+
insert({S::FollowerId::ColumnId, [] (const E& entry) {
62+
return TCell::Make<ui32>(entry.GetInfo().GetFollowerId());
63+
}});
6164
}
6265
};
6366

ydb/core/sys_view/processor/processor_impl.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,10 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) {
283283

284284
auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) {
285285
for (const auto& partition : top) {
286-
db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId()).Delete();
286+
if (partition->GetFollowerId() == 0)
287+
db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId()).Delete();
288+
else
289+
db.Table<Schema::IntervalPartitionFollowerTops>().Key((ui32)type, partition->GetTabletId(), partition->GetFollowerId()).Delete();
287290
}
288291
top.clear();
289292
};

ydb/core/sys_view/processor/schema.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@ struct TProcessorSchema : NIceDb::Schema {
9595
using TColumns = TableColumns<TypeCol, TabletId, Data>;
9696
};
9797

98+
struct IntervalPartitionFollowerTops : Table<19> {
99+
struct TypeCol : Column<1, NScheme::NTypeIds::Uint32> {
100+
static TString GetColumnName(const TString&) { return "Type"; }
101+
};
102+
struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {};
103+
struct FollowerId : Column<3, NScheme::NTypeIds::Uint32> {};
104+
struct Data : Column<4, NScheme::NTypeIds::String> {};
105+
106+
using TKey = TableKey<TypeCol, TabletId, FollowerId>;
107+
using TColumns = TableColumns<TypeCol, TabletId, FollowerId, Data>;
108+
};
109+
98110
#define RESULT_PARTITION_TABLE(TableName, TableID) \
99111
struct TableName : Table<TableID> { \
100112
struct IntervalEnd : Column<1, NScheme::NTypeIds::Timestamp> {}; \
@@ -127,6 +139,7 @@ struct TProcessorSchema : NIceDb::Schema {
127139
TopByRequestUnitsOneMinute,
128140
TopByRequestUnitsOneHour,
129141
IntervalPartitionTops,
142+
IntervalPartitionFollowerTops,
130143
TopPartitionsOneMinute,
131144
TopPartitionsOneHour
132145
>;

ydb/core/sys_view/processor/tx_init.cpp

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,47 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
8484
return true;
8585
};
8686

87+
template <typename S>
88+
bool LoadIntervalPartitionTops(NIceDb::TNiceDb& db) {
89+
auto rowset = db.Table<S>().Range().Select();
90+
if (!rowset.IsReady()) {
91+
return false;
92+
}
93+
94+
size_t partCount = 0;
95+
while (!rowset.EndOfSet()) {
96+
ui32 type = rowset.template GetValue<typename S::TypeCol>();
97+
TString data = rowset.template GetValue<typename S::Data>();
98+
99+
if (data) {
100+
auto partition = MakeHolder<NKikimrSysView::TTopPartitionsInfo>();
101+
Y_PROTOBUF_SUPPRESS_NODISCARD partition->ParseFromString(data);
102+
103+
switch ((NKikimrSysView::EStatsType)type) {
104+
case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE:
105+
Self->PartitionTopMinute.emplace_back(std::move(partition));
106+
break;
107+
case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR:
108+
Self->PartitionTopHour.emplace_back(std::move(partition));
109+
break;
110+
default:
111+
SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected partition stats type: " << type);
112+
}
113+
++partCount;
114+
}
115+
116+
if (!rowset.Next()) {
117+
return false;
118+
}
119+
}
120+
121+
SVLOG_D("[" << Self->TabletID() << "] Loading results: "
122+
<< "table# " << S::TableId
123+
<< ", partCount count# " << partCount);
124+
125+
return true;
126+
}
127+
87128

88129
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
89130
SVLOG_D("[" << Self->TabletID() << "] TTxInit::Execute");
@@ -107,6 +148,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
107148
auto reqUnitsOneMinuteRowset = db.Table<Schema::TopByRequestUnitsOneMinute>().Range().Select();
108149
auto reqUnitsOneHourRowset = db.Table<Schema::TopByRequestUnitsOneHour>().Range().Select();
109150
auto intervalPartitionTopsRowset = db.Table<Schema::IntervalPartitionTops>().Range().Select();
151+
auto intervalPartitionFollowerTopsRowset = db.Table<Schema::IntervalPartitionFollowerTops>().Range().Select();
110152
auto topPartitionsOneMinuteRowset = db.Table<Schema::TopPartitionsOneMinute>().Range().Select();
111153
auto topPartitionsOneHourRowset = db.Table<Schema::TopPartitionsOneHour>().Range().Select();
112154

@@ -126,6 +168,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
126168
!reqUnitsOneMinuteRowset.IsReady() ||
127169
!reqUnitsOneHourRowset.IsReady() ||
128170
!intervalPartitionTopsRowset.IsReady() ||
171+
!intervalPartitionFollowerTopsRowset.IsReady() ||
129172
!topPartitionsOneMinuteRowset.IsReady() ||
130173
!topPartitionsOneHourRowset.IsReady())
131174
{
@@ -390,37 +433,10 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
390433
Self->PartitionTopHour.clear();
391434
Self->PartitionTopHour.reserve(TOP_PARTITIONS_COUNT);
392435

393-
auto rowset = db.Table<Schema::IntervalPartitionTops>().Range().Select();
394-
if (!rowset.IsReady()) {
436+
if (!LoadIntervalPartitionTops<Schema::IntervalPartitionTops>(db))
437+
return false;
438+
if (!LoadIntervalPartitionTops<Schema::IntervalPartitionFollowerTops>(db))
395439
return false;
396-
}
397-
398-
size_t partCount = 0;
399-
while (!rowset.EndOfSet()) {
400-
ui32 type = rowset.GetValue<Schema::IntervalPartitionTops::TypeCol>();
401-
TString data = rowset.GetValue<Schema::IntervalPartitionTops::Data>();
402-
403-
if (data) {
404-
auto partition = MakeHolder<NKikimrSysView::TTopPartitionsInfo>();
405-
Y_PROTOBUF_SUPPRESS_NODISCARD partition->ParseFromString(data);
406-
407-
switch ((NKikimrSysView::EStatsType)type) {
408-
case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE:
409-
Self->PartitionTopMinute.emplace_back(std::move(partition));
410-
break;
411-
case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR:
412-
Self->PartitionTopHour.emplace_back(std::move(partition));
413-
break;
414-
default:
415-
SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected partition stats type: " << type);
416-
}
417-
++partCount;
418-
}
419-
420-
if (!rowset.Next()) {
421-
return false;
422-
}
423-
}
424440

425441
auto compare = [] (const auto& l, const auto& r) {
426442
return l->GetCPUCores() == r->GetCPUCores() ?
@@ -429,9 +445,6 @@ struct TSysViewProcessor::TTxInit : public TTxBase {
429445

430446
std::sort(Self->PartitionTopMinute.begin(), Self->PartitionTopMinute.end(), compare);
431447
std::sort(Self->PartitionTopHour.begin(), Self->PartitionTopHour.end(), compare);
432-
433-
SVLOG_D("[" << Self->TabletID() << "] Loading interval partition tops: "
434-
<< "partition count# " << partCount);
435448
}
436449

437450
// TopPartitions...

0 commit comments

Comments
 (0)