Skip to content

Commit bd967a0

Browse files
authored
do not use basic statistics if it is not fully gathered in schemeshard (#11291)
1 parent f6b4b02 commit bd967a0

File tree

6 files changed

+194
-22
lines changed

6 files changed

+194
-22
lines changed

ydb/core/protos/statistics.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ message TPathEntry {
1313
optional uint64 RowCount = 2;
1414
optional uint64 BytesSize = 3;
1515
optional bool IsColumnTable = 4;
16+
optional bool AreStatsFull = 5;
1617
}
1718

1819
message TSchemeShardStats {
1920
repeated TPathEntry Entries = 1;
21+
optional bool AreAllStatsFull = 2;
2022
}
2123

2224
// SS -> SA

ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,72 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
2121
<< ", stats size# " << stats.size());
2222

2323
NIceDb::TNiceDb db(txc.DB);
24-
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
25-
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats));
2624

27-
Self->BaseStatistics[schemeShardId] = stats;
25+
NKikimrStat::TSchemeShardStats statRecord;
26+
Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats);
27+
28+
// if statistics is sent from schemeshard for the first time or
29+
// AreAllStatsFull field is not set (schemeshard is working on previous code version) or
30+
// statistics is full for all tables
31+
// then persist incoming statistics without changes
32+
if (!Self->BaseStatistics.contains(schemeShardId) ||
33+
!statRecord.HasAreAllStatsFull() || statRecord.GetAreAllStatsFull())
34+
{
35+
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
36+
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(stats));
37+
Self->BaseStatistics[schemeShardId] = stats;
38+
39+
} else {
40+
NKikimrStat::TSchemeShardStats oldStatRecord;
41+
const auto& oldStats = Self->BaseStatistics[schemeShardId];
42+
Y_PROTOBUF_SUPPRESS_NODISCARD oldStatRecord.ParseFromString(oldStats);
43+
44+
struct TOldStats {
45+
ui64 RowCount = 0;
46+
ui64 BytesSize = 0;
47+
};
48+
THashMap<TPathId, TOldStats> oldStatsMap;
49+
50+
for (const auto& entry : oldStatRecord.GetEntries()) {
51+
auto& oldEntry = oldStatsMap[PathIdFromPathId(entry.GetPathId())];
52+
oldEntry.RowCount = entry.GetRowCount();
53+
oldEntry.BytesSize = entry.GetBytesSize();
54+
}
55+
56+
NKikimrStat::TSchemeShardStats newStatRecord;
57+
for (const auto& entry : statRecord.GetEntries()) {
58+
auto* newEntry = newStatRecord.AddEntries();
59+
*newEntry->MutablePathId() = entry.GetPathId();
60+
newEntry->SetIsColumnTable(entry.GetIsColumnTable());
61+
newEntry->SetAreStatsFull(entry.GetAreStatsFull());
62+
63+
if (entry.GetAreStatsFull()) {
64+
newEntry->SetRowCount(entry.GetRowCount());
65+
newEntry->SetBytesSize(entry.GetBytesSize());
66+
} else {
67+
auto oldIter = oldStatsMap.find(PathIdFromPathId(entry.GetPathId()));
68+
if (oldIter != oldStatsMap.end()) {
69+
newEntry->SetRowCount(oldIter->second.RowCount);
70+
newEntry->SetBytesSize(oldIter->second.BytesSize);
71+
} else {
72+
newEntry->SetRowCount(0);
73+
newEntry->SetBytesSize(0);
74+
}
75+
}
76+
}
77+
78+
TString newStats;
79+
Y_PROTOBUF_SUPPRESS_NODISCARD newStatRecord.SerializeToString(&newStats);
80+
81+
db.Table<Schema::BaseStatistics>().Key(schemeShardId).Update(
82+
NIceDb::TUpdate<Schema::BaseStatistics::Stats>(newStats));
83+
Self->BaseStatistics[schemeShardId] = newStats;
84+
}
2885

2986
if (!Self->EnableColumnStatistics) {
3087
return true;
3188
}
3289

33-
NKikimrStat::TSchemeShardStats statRecord;
34-
Y_PROTOBUF_SUPPRESS_NODISCARD statRecord.ParseFromString(stats);
35-
3690
auto& oldPathIds = Self->ScheduleTraversalsBySchemeShard[schemeShardId];
3791
std::unordered_set<TPathId> newPathIds;
3892

ydb/core/statistics/service/ut/ut_basic_statistics.cpp

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
#include <ydb/core/statistics/ut_common/ut_common.h>
22

33
#include <ydb/library/actors/testlib/test_runtime.h>
4+
#include <ydb/core/testlib/actors/block_events.h>
45

56
#include <ydb/core/statistics/events.h>
67
#include <ydb/core/statistics/service/service.h>
7-
8-
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
9-
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
10-
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
11-
12-
#include <thread>
8+
#include <ydb/core/tx/datashard/datashard.h>
139

1410
namespace NKikimr {
1511
namespace NStat {
@@ -75,6 +71,29 @@ void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId
7571
}
7672
}
7773

74+
ui64 GetRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId) {
75+
auto statServiceId = NStat::MakeStatServiceID(runtime.GetNodeId(nodeIndex));
76+
NStat::TRequest req;
77+
req.PathId = pathId;
78+
79+
auto evGet = std::make_unique<TEvStatistics::TEvGetStatistics>();
80+
evGet->StatType = NStat::EStatType::SIMPLE;
81+
evGet->StatRequests.push_back(req);
82+
83+
auto sender = runtime.AllocateEdgeActor(nodeIndex);
84+
runtime.Send(statServiceId, sender, evGet.release(), nodeIndex, true);
85+
auto evResult = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvGetStatisticsResult>(sender);
86+
87+
UNIT_ASSERT(evResult);
88+
UNIT_ASSERT(evResult->Get());
89+
UNIT_ASSERT(evResult->Get()->StatResponses.size() == 1);
90+
91+
auto rsp = evResult->Get()->StatResponses[0];
92+
auto stat = rsp.Simple;
93+
94+
return stat.RowCount;
95+
}
96+
7897
} // namespace
7998

8099
Y_UNIT_TEST_SUITE(BasicStatistics) {
@@ -183,6 +202,58 @@ Y_UNIT_TEST_SUITE(BasicStatistics) {
183202
ValidateRowCount(runtime, 1, pathId2, 6);
184203
}
185204

205+
void TestNotFullStatistics(TTestEnv& env, size_t expectedRowCount) {
206+
auto& runtime = *env.GetServer().GetRuntime();
207+
208+
auto pathId = ResolvePathId(runtime, "/Root/Database/Table");
209+
210+
TBlockEvents<TEvDataShard::TEvPeriodicTableStats> block(runtime);
211+
runtime.WaitFor("TEvPeriodicTableStats", [&]{ return block.size() >= 3; });
212+
block.Unblock(3);
213+
214+
bool firstStatsToSA = false;
215+
auto statsObserver1 = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto&){
216+
firstStatsToSA = true;
217+
});
218+
runtime.WaitFor("TEvSchemeShardStats 1", [&]{ return firstStatsToSA; });
219+
220+
UNIT_ASSERT(GetRowCount(runtime, 1, pathId) == 0);
221+
222+
block.Unblock();
223+
block.Stop();
224+
225+
bool secondStatsToSA = false;
226+
auto statsObserver2 = runtime.AddObserver<TEvStatistics::TEvSchemeShardStats>([&](auto&){
227+
secondStatsToSA = true;
228+
});
229+
runtime.WaitFor("TEvSchemeShardStats 2", [&]{ return secondStatsToSA; });
230+
231+
bool propagate = false;
232+
auto propagateObserver = runtime.AddObserver<TEvStatistics::TEvPropagateStatistics>([&](auto&){
233+
propagate = true;
234+
});
235+
runtime.WaitFor("TEvPropagateStatistics", [&]{ return propagate; });
236+
237+
UNIT_ASSERT(GetRowCount(runtime, 1, pathId) == expectedRowCount);
238+
}
239+
240+
Y_UNIT_TEST(NotFullStatisticsDatashard) {
241+
TTestEnv env(1, 1);
242+
243+
CreateDatabase(env, "Database");
244+
CreateUniformTable(env, "Database", "Table");
245+
246+
TestNotFullStatistics(env, 4);
247+
}
248+
249+
Y_UNIT_TEST(NotFullStatisticsColumnshard) {
250+
TTestEnv env(1, 1);
251+
252+
CreateDatabase(env, "Database");
253+
CreateColumnStoreTable(env, "Database", "Table", 4);
254+
255+
TestNotFullStatistics(env, 1000);
256+
}
186257
}
187258

188259
} // NSysView

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7405,7 +7405,7 @@ void TSchemeShard::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&
74057405
StatisticsAggregatorId = TTabletId(entry.DomainInfo->Params.GetStatisticsAggregator());
74067406
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
74077407
"Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, StatisticsAggregatorId=" << StatisticsAggregatorId
7408-
<< ", at schemeshard: " << TabletID());
7408+
<< ", at schemeshard: " << TabletID());
74097409
ConnectToSA();
74107410
}
74117411
}
@@ -7414,13 +7414,16 @@ void TSchemeShard::Handle(TEvPrivate::TEvSendBaseStatsToSA::TPtr&, const TActorC
74147414
TDuration delta = SendBaseStatsToSA();
74157415
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
74167416
"Schedule next SendBaseStatsToSA in " << delta
7417-
<< ", at schemeshard: " << TabletID());
7417+
<< ", at schemeshard: " << TabletID());
74187418
ctx.Schedule(delta, new TEvPrivate::TEvSendBaseStatsToSA());
74197419
}
74207420

74217421
void TSchemeShard::InitializeStatistics(const TActorContext& ctx) {
74227422
ResolveSA();
7423-
ctx.Schedule(TDuration::Seconds(30), new TEvPrivate::TEvSendBaseStatsToSA());
7423+
// since columnshard statistics is now sent once in a minute,
7424+
// we expect that in most cases we will gather full stats
7425+
// before sending them to StatisticsAggregator
7426+
ctx.Schedule(TDuration::Seconds(120), new TEvPrivate::TEvSendBaseStatsToSA());
74247427
}
74257428

74267429
void TSchemeShard::ResolveSA() {
@@ -7494,30 +7497,56 @@ TDuration TSchemeShard::SendBaseStatsToSA() {
74947497
}
74957498

74967499
int count = 0;
7500+
bool areAllStatsFull = true;
74977501

74987502
NKikimrStat::TSchemeShardStats record;
74997503
for (const auto& [pathId, tableInfo] : Tables) {
7500-
const auto& aggregated = tableInfo->GetStats().Aggregated;
7504+
const auto& stats = tableInfo->GetStats();
7505+
const auto& aggregated = stats.Aggregated;
7506+
bool areStatsFull = stats.AreStatsFull();
7507+
75017508
auto* entry = record.AddEntries();
75027509
auto* entryPathId = entry->MutablePathId();
75037510
entryPathId->SetOwnerId(pathId.OwnerId);
75047511
entryPathId->SetLocalId(pathId.LocalPathId);
7505-
entry->SetRowCount(aggregated.RowCount);
7506-
entry->SetBytesSize(aggregated.DataSize);
7512+
entry->SetRowCount(areStatsFull ? aggregated.RowCount : 0);
7513+
entry->SetBytesSize(areStatsFull ? aggregated.DataSize : 0);
75077514
entry->SetIsColumnTable(false);
7515+
entry->SetAreStatsFull(areStatsFull);
7516+
areAllStatsFull = areAllStatsFull && areStatsFull;
7517+
75087518
++count;
75097519
}
7520+
75107521
auto columnTablesPathIds = ColumnTables.GetAllPathIds();
75117522
for (const auto& pathId : columnTablesPathIds) {
75127523
const auto& tableInfo = ColumnTables.GetVerified(pathId);
7513-
const auto& aggregated = tableInfo->Stats.Aggregated;
7524+
const auto& stats = tableInfo->GetStats();
7525+
const TTableAggregatedStats* aggregatedStats = nullptr;
7526+
7527+
// stats are stored differently for standalone and non-standalone column tables
7528+
if (tableInfo->IsStandalone()) {
7529+
aggregatedStats = &stats;
7530+
} else {
7531+
auto it = stats.TableStats.find(pathId);
7532+
if (it == stats.TableStats.end()) {
7533+
continue;
7534+
}
7535+
aggregatedStats = &it->second;
7536+
}
7537+
const auto& aggregated = aggregatedStats->Aggregated;
7538+
bool areStatsFull = aggregatedStats->AreStatsFull();
7539+
75147540
auto* entry = record.AddEntries();
75157541
auto* entryPathId = entry->MutablePathId();
75167542
entryPathId->SetOwnerId(pathId.OwnerId);
75177543
entryPathId->SetLocalId(pathId.LocalPathId);
7518-
entry->SetRowCount(aggregated.RowCount);
7519-
entry->SetBytesSize(aggregated.DataSize);
7544+
entry->SetRowCount(areStatsFull ? aggregated.RowCount : 0);
7545+
entry->SetBytesSize(areStatsFull ? aggregated.DataSize : 0);
75207546
entry->SetIsColumnTable(true);
7547+
entry->SetAreStatsFull(areStatsFull);
7548+
areAllStatsFull = areAllStatsFull && areStatsFull;
7549+
75217550
++count;
75227551
}
75237552

@@ -7528,8 +7557,9 @@ TDuration TSchemeShard::SendBaseStatsToSA() {
75287557
return TDuration::Seconds(30);
75297558
}
75307559

7560+
record.SetAreAllStatsFull(areAllStatsFull);
7561+
75317562
TString stats;
7532-
stats.clear();
75337563
Y_PROTOBUF_SUPPRESS_NODISCARD record.SerializeToString(&stats);
75347564

75357565
auto event = std::make_unique<NStat::TEvStatistics::TEvSchemeShardStats>();

ydb/core/tx/schemeshard/schemeshard_info_types.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1636,6 +1636,8 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) {
16361636
TPartitionStats newAggregatedStats;
16371637
newAggregatedStats.PartCount = newPartitioning.size();
16381638
ui64 cpuTotal = 0;
1639+
THashSet<TShardIdx> newUpdatedStats;
1640+
16391641
for (const auto& np : newPartitioning) {
16401642
auto idx = np.ShardIdx;
16411643
auto& newStats(newPartitionStats[idx]);
@@ -1658,6 +1660,10 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) {
16581660
newAggregatedStats.WriteThroughput += newStats.WriteThroughput;
16591661
newAggregatedStats.ReadIops += newStats.ReadIops;
16601662
newAggregatedStats.WriteIops += newStats.WriteIops;
1663+
1664+
if (Stats.PartitionStats.contains(idx) && Stats.UpdatedStats.contains(idx)) {
1665+
newUpdatedStats.insert(idx);
1666+
}
16611667
}
16621668
newAggregatedStats.SetCurrentRawCpuUsage(cpuTotal, AppData()->TimeProvider->Now());
16631669
newAggregatedStats.LastAccessTime = Stats.Aggregated.LastAccessTime;
@@ -1684,6 +1690,7 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) {
16841690

16851691
Stats.PartitionStats.swap(newPartitionStats);
16861692
Stats.Aggregated = newAggregatedStats;
1693+
Stats.UpdatedStats.swap(newUpdatedStats);
16871694
Partitions.swap(newPartitioning);
16881695
PreserializedTablePartitions.clear();
16891696
PreserializedTablePartitionsNoKeys.clear();
@@ -1790,6 +1797,8 @@ void TTableAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPart
17901797
Aggregated.TxCompleteLag = Max(Aggregated.TxCompleteLag, ps.second.TxCompleteLag);
17911798
}
17921799
}
1800+
1801+
UpdatedStats.insert(datashardIdx);
17931802
}
17941803

17951804
void TAggregatedStats::UpdateTableStats(TShardIdx shardIdx, const TPathId& pathId, const TPartitionStats& newStats) {

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,12 @@ struct TTableAggregatedStats {
328328
THashMap<TShardIdx, TPartitionStats> PartitionStats;
329329
size_t PartitionStatsUpdated = 0;
330330

331+
THashSet<TShardIdx> UpdatedStats;
332+
333+
bool AreStatsFull() const {
334+
return Aggregated.PartCount && UpdatedStats.size() == Aggregated.PartCount;
335+
}
336+
331337
void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats);
332338
};
333339

0 commit comments

Comments
 (0)