Skip to content

Commit 4f9dad1

Browse files
committed
Protobuf TMeteringStats (#19818)
1 parent fc03836 commit 4f9dad1

18 files changed

+447
-269
lines changed

ydb/core/protos/index_builder.proto

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ message TIndexBuildScanSettings {
2828
optional uint32 MaxBatchRetries = 3 [ default = 50 ];
2929
}
3030

31+
message TMeteringStats {
32+
optional uint64 UploadRows = 1;
33+
optional uint64 UploadBytes = 2;
34+
optional uint64 ReadRows = 3;
35+
optional uint64 ReadBytes = 4;
36+
}
37+
3138
message TIndexBuildSettings {
3239
optional string source_path = 1;
3340
optional Ydb.Table.TableIndex index = 2;
@@ -140,6 +147,6 @@ message TEvUploadSampleKResponse {
140147
optional Ydb.StatusIds.StatusCode UploadStatus = 2;
141148
repeated Ydb.Issue.IssueMessage Issues = 3;
142149

143-
optional uint64 UploadRows = 4;
144-
optional uint64 UploadBytes = 5;
150+
reserved 4 to 5;
151+
optional NKikimrIndexBuilder.TMeteringStats MeteringStats = 6;
145152
}

ydb/core/protos/tx_datashard.proto

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,14 +1510,14 @@ message TEvSampleKResponse {
15101510
optional NKikimrIndexBuilder.EBuildStatus Status = 4;
15111511
repeated Ydb.Issue.IssueMessage Issues = 5;
15121512

1513-
optional uint64 ReadRows = 6;
1514-
optional uint64 ReadBytes = 7;
1515-
15161513
optional uint64 RequestSeqNoGeneration = 8;
15171514
optional uint64 RequestSeqNoRound = 9;
15181515

15191516
repeated uint64 Probabilities = 10;
15201517
repeated bytes Rows = 11;
1518+
1519+
reserved 6 to 7;
1520+
optional NKikimrIndexBuilder.TMeteringStats MeteringStats = 12;
15211521
}
15221522

15231523
enum EKMeansState {
@@ -1577,10 +1577,8 @@ message TEvLocalKMeansResponse {
15771577
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
15781578
repeated Ydb.Issue.IssueMessage Issues = 7;
15791579

1580-
optional uint64 UploadRows = 8;
1581-
optional uint64 UploadBytes = 9;
1582-
optional uint64 ReadRows = 10;
1583-
optional uint64 ReadBytes = 11;
1580+
reserved 8 to 11;
1581+
optional NKikimrIndexBuilder.TMeteringStats MeteringStats = 12;
15841582
}
15851583

15861584
message TEvReshuffleKMeansRequest {
@@ -1626,10 +1624,8 @@ message TEvReshuffleKMeansResponse {
16261624
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
16271625
repeated Ydb.Issue.IssueMessage Issues = 7;
16281626

1629-
optional uint64 UploadRows = 8;
1630-
optional uint64 UploadBytes = 9;
1631-
optional uint64 ReadRows = 10;
1632-
optional uint64 ReadBytes = 11;
1627+
reserved 8 to 11;
1628+
optional NKikimrIndexBuilder.TMeteringStats MeteringStats = 12;
16331629
}
16341630

16351631
message TEvRecomputeKMeansRequest {
@@ -1666,12 +1662,12 @@ message TEvRecomputeKMeansResponse {
16661662
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
16671663
repeated Ydb.Issue.IssueMessage Issues = 7;
16681664

1669-
optional uint64 ReadRows = 8;
1670-
optional uint64 ReadBytes = 9;
1671-
16721665
// recomputed clusters and cluster sizes (row counts for every cluster)
16731666
repeated bytes Clusters = 10;
16741667
repeated uint64 ClusterSizes = 11;
1668+
1669+
reserved 8 to 9;
1670+
optional NKikimrIndexBuilder.TMeteringStats MeteringStats = 12;
16751671
}
16761672

16771673
message TEvPrefixKMeansRequest {
@@ -1720,10 +1716,8 @@ message TEvPrefixKMeansResponse {
17201716
optional NKikimrIndexBuilder.EBuildStatus Status = 6;
17211717
repeated Ydb.Issue.IssueMessage Issues = 7;
17221718

1723-
optional uint64 UploadRows = 8;
1724-
optional uint64 UploadBytes = 9;
1725-
optional uint64 ReadRows = 10;
1726-
optional uint64 ReadBytes = 11;
1719+
reserved 8 to 11;
1720+
optional NKikimrIndexBuilder.TMeteringStats MeteringStats = 12;
17271721
}
17281722

17291723
message TEvCdcStreamScanRequest {

ydb/core/tx/datashard/build_index/common_helper.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,9 @@ class TBatchRowsUploader
142142
UploaderId = {};
143143
}
144144

145-
response.SetUploadRows(UploadRows);
146-
response.SetUploadBytes(UploadBytes);
145+
response.MutableMeteringStats()->SetUploadRows(UploadRows);
146+
response.MutableMeteringStats()->SetUploadBytes(UploadBytes);
147+
147148
if (HasBuildError) {
148149
response.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);
149150
} else if (abort != NTable::EAbort::None) {

ydb/core/tx/datashard/build_index/local_kmeans.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ class TLocalKMeansScan: public TActor<TLocalKMeansScan>, public NTable::IScan {
164164
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final
165165
{
166166
auto& record = Response->Record;
167-
record.SetReadRows(ReadRows);
168-
record.SetReadBytes(ReadBytes);
167+
record.MutableMeteringStats()->SetReadRows(ReadRows);
168+
record.MutableMeteringStats()->SetReadBytes(ReadBytes);
169169

170170
Uploader.Finish(record, abort);
171171

ydb/core/tx/datashard/build_index/prefix_kmeans.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ class TPrefixKMeansScan: public TActor<TPrefixKMeansScan>, public NTable::IScan
197197
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final
198198
{
199199
auto& record = Response->Record;
200-
record.SetReadRows(ReadRows);
201-
record.SetReadBytes(ReadBytes);
200+
record.MutableMeteringStats()->SetReadRows(ReadRows);
201+
record.MutableMeteringStats()->SetReadBytes(ReadBytes);
202202

203203
Uploader.Finish(record, abort);
204204

ydb/core/tx/datashard/build_index/recompute_kmeans.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ class TRecomputeKMeansScan: public TActor<TRecomputeKMeansScan>, public NTable::
100100
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final
101101
{
102102
auto& record = Response->Record;
103-
record.SetReadRows(ReadRows);
104-
record.SetReadBytes(ReadBytes);
103+
record.MutableMeteringStats()->SetReadRows(ReadRows);
104+
record.MutableMeteringStats()->SetReadBytes(ReadBytes);
105105

106106
if (abort != EAbort::None) {
107107
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::ABORTED);

ydb/core/tx/datashard/build_index/reshuffle_kmeans.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ class TReshuffleKMeansScan: public TActor<TReshuffleKMeansScan>, public NTable::
136136
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final
137137
{
138138
auto& record = Response->Record;
139-
record.SetReadRows(ReadRows);
140-
record.SetReadBytes(ReadBytes);
139+
record.MutableMeteringStats()->SetReadRows(ReadRows);
140+
record.MutableMeteringStats()->SetReadBytes(ReadBytes);
141141

142142
Uploader.Finish(record, abort);
143143

ydb/core/tx/datashard/build_index/sample_k.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
169169

170170
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept final {
171171
auto& record = Response->Record;
172-
record.SetReadRows(ReadRows);
173-
record.SetReadBytes(ReadBytes);
172+
record.MutableMeteringStats()->SetReadRows(ReadRows);
173+
record.MutableMeteringStats()->SetReadBytes(ReadBytes);
174174

175175
if (HasBuildError) {
176176
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::BUILD_ERROR);

ydb/core/tx/schemeshard/schemeshard__monitoring.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -850,26 +850,28 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
850850

851851
<< "LockTxId: " << info.LockTxId << Endl
852852
<< "LockTxStatus: " << NKikimrScheme::EStatus_Name(info.LockTxStatus) << Endl
853-
<< "LockTxDone " << (info.LockTxDone ? "DONE" : "not done") << Endl
853+
<< "LockTxDone: " << (info.LockTxDone ? "DONE" : "not done") << Endl
854854

855855
<< "InitiateTxId: " << info.InitiateTxId << Endl
856856
<< "InitiateTxStatus: " << NKikimrScheme::EStatus_Name(info.InitiateTxStatus) << Endl
857-
<< "InitiateTxDone " << (info.InitiateTxDone ? "DONE" : "not done") << Endl
857+
<< "InitiateTxDone: " << (info.InitiateTxDone ? "DONE" : "not done") << Endl
858858

859859
<< "ApplyTxId: " << info.ApplyTxId << Endl
860860
<< "ApplyTxStatus: " << NKikimrScheme::EStatus_Name(info.ApplyTxStatus) << Endl
861-
<< "ApplyTxDone " << (info.ApplyTxDone ? "DONE" : "not done") << Endl
861+
<< "ApplyTxDone: " << (info.ApplyTxDone ? "DONE" : "not done") << Endl
862862

863863
<< "UnlockTxId: " << info.UnlockTxId << Endl
864864
<< "UnlockTxStatus: " << NKikimrScheme::EStatus_Name(info.UnlockTxStatus) << Endl
865-
<< "UnlockTxDone " << (info.UnlockTxDone ? "DONE" : "not done") << Endl
865+
<< "UnlockTxDone: " << (info.UnlockTxDone ? "DONE" : "not done") << Endl
866866

867867
<< "SnapshotStep: " << info.SnapshotStep << Endl
868-
<< "SnapshotTxId: " << info.SnapshotTxId << Endl
869-
870-
<< "Processed: " << info.Processed << Endl
871-
<< "Billed: " << info.Billed << Endl;
868+
<< "SnapshotTxId: " << info.SnapshotTxId << Endl;
872869

870+
TString requestUnitsExplain;
871+
ui64 requestUnits = TRUCalculator::Calculate(info.Processed, requestUnitsExplain);
872+
str << "Processed: " << info.Processed.ShortDebugString() << Endl
873+
<< "Request Units: " << requestUnits << " (" << requestUnitsExplain << ")" << Endl
874+
<< "Billed: " << info.Billed.ShortDebugString() << Endl;
873875
}
874876

875877
auto getKeyTypes = [&](TPathId pathId) {
@@ -954,7 +956,7 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
954956
str << Self->Generation() << ":" << status.SeqNoRound;
955957
}
956958
TABLED() {
957-
str << status.Processed;
959+
str << status.Processed.ShortDebugString();
958960
}
959961
}
960962
str << "\n";

ydb/core/tx/schemeshard/schemeshard_billing_helpers.cpp

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,65 @@
66

77
namespace NKikimr::NSchemeShard {
88

9-
TBillingStats::TBillingStats(ui64 uploadRows, ui64 uploadBytes, ui64 readRows, ui64 readBytes)
10-
: UploadRows{uploadRows}
11-
, UploadBytes{uploadBytes}
12-
, ReadRows{readRows}
13-
, ReadBytes{readBytes}
14-
{
9+
TMeteringStats operator + (const TMeteringStats& value, const TMeteringStats& other) {
10+
TMeteringStats result = value;
11+
result += other;
12+
return result;
1513
}
1614

17-
TBillingStats TBillingStats::operator -(const TBillingStats &other) const {
18-
Y_ENSURE(UploadRows >= other.UploadRows);
19-
Y_ENSURE(UploadBytes >= other.UploadBytes);
20-
Y_ENSURE(ReadRows >= other.ReadRows);
21-
Y_ENSURE(ReadBytes >= other.ReadBytes);
15+
TMeteringStats operator - (const TMeteringStats& value, const TMeteringStats& other) {
16+
TMeteringStats result = value;
17+
result -= other;
18+
return result;
19+
}
20+
21+
TMeteringStats& operator += (TMeteringStats& value, const TMeteringStats& other) {
22+
value.SetUploadRows(value.GetUploadRows() + other.GetUploadRows());
23+
value.SetUploadBytes(value.GetUploadBytes() + other.GetUploadBytes());
24+
value.SetReadRows(value.GetReadRows() + other.GetReadRows());
25+
value.SetReadBytes(value.GetReadBytes() + other.GetReadBytes());
26+
return value;
27+
}
28+
29+
TMeteringStats& operator -= (TMeteringStats& value, const TMeteringStats& other) {
30+
const auto safeSub = [](ui64 x, ui64 y) {
31+
if (Y_LIKELY(x >= y)) {
32+
return x - y;
33+
}
34+
Y_ASSERT(false);
35+
return 0ul;
36+
};
37+
38+
value.SetUploadRows(safeSub(value.GetUploadRows(), other.GetUploadRows()));
39+
value.SetUploadBytes(safeSub(value.GetUploadBytes(), other.GetUploadBytes()));
40+
value.SetReadRows(safeSub(value.GetReadRows(), other.GetReadRows()));
41+
value.SetReadBytes(safeSub(value.GetReadBytes(), other.GetReadBytes()));
42+
return value;
43+
}
2244

23-
return {UploadRows - other.UploadRows, UploadBytes - other.UploadBytes,
24-
ReadRows - other.ReadRows, ReadBytes - other.ReadBytes};
45+
void TMeteringStatsHelper::TryFixOldFormat(TMeteringStats& value) {
46+
// old format: assign upload to read
47+
if (value.GetReadRows() == 0 && value.GetUploadRows() != 0) {
48+
value.SetReadRows(value.GetUploadRows());
49+
value.SetReadBytes(value.GetUploadBytes());
50+
}
2551
}
2652

27-
TBillingStats TBillingStats::operator +(const TBillingStats &other) const {
28-
return {UploadRows + other.UploadRows, UploadBytes + other.UploadBytes,
29-
ReadRows + other.ReadRows, ReadBytes + other.ReadBytes};
53+
TMeteringStats TMeteringStatsHelper::ZeroValue() {
54+
// this method the only purpose is to beautifully print zero stats instead of empty protobuf or with missing fields
55+
TMeteringStats value;
56+
value.SetUploadRows(0);
57+
value.SetUploadBytes(0);
58+
value.SetReadRows(0);
59+
value.SetReadBytes(0);
60+
return value;
3061
}
3162

32-
TString TBillingStats::ToString() const {
33-
return TStringBuilder()
34-
<< "{"
35-
<< " upload rows: " << UploadRows
36-
<< ", upload bytes: " << UploadBytes
37-
<< ", read rows: " << ReadRows
38-
<< ", read bytes: " << ReadBytes
39-
<< " }";
63+
bool TMeteringStatsHelper::IsZero(TMeteringStats& value) {
64+
return value.GetUploadRows() == 0
65+
&& value.GetUploadBytes() == 0
66+
&& value.GetReadRows() == 0
67+
&& value.GetReadBytes() == 0;
4068
}
4169

4270
ui64 TRUCalculator::ReadTable(ui64 bytes) {
@@ -56,11 +84,15 @@ ui64 TRUCalculator::BulkUpsert(ui64 bytes, ui64 rows) {
5684
return (Max(rows, (bytes + 1_KB - 1) / 1_KB) + 1) / 2;
5785
}
5886

59-
ui64 TRUCalculator::Calculate(const TBillingStats& stats) {
87+
ui64 TRUCalculator::Calculate(const TMeteringStats& stats, TString& explain) {
6088
// The cost of building an index is the sum of the cost of ReadTable from the source table and BulkUpsert to the index table.
6189
// https://yandex.cloud/en-ru/docs/ydb/pricing/ru-special#secondary-index
62-
return TRUCalculator::ReadTable(stats.GetReadBytes())
63-
+ TRUCalculator::BulkUpsert(stats.GetUploadBytes(), stats.GetUploadRows());
90+
ui64 readTable = TRUCalculator::ReadTable(stats.GetReadBytes());
91+
ui64 bulkUpsert = TRUCalculator::BulkUpsert(stats.GetUploadBytes(), stats.GetUploadRows());
92+
explain = TStringBuilder()
93+
<< "ReadTable: " << readTable
94+
<< ", BulkUpsert: " << bulkUpsert;
95+
return readTable + bulkUpsert;
6496
}
6597

6698
}

0 commit comments

Comments
 (0)