Skip to content

Commit e079321

Browse files
kungaCyberROFL
andcommitted
Unify scan data bytes accounting with KQP in vector index build (#19794)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
1 parent 791e87d commit e079321

File tree

12 files changed

+171
-135
lines changed

12 files changed

+171
-135
lines changed

ydb/core/tx/datashard/buffer_data.h

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,67 @@
11
#pragma once
22

3-
#include "ydb/core/scheme/scheme_tablecell.h"
4-
#include "ydb/core/tx/datashard/upload_stats.h"
5-
#include "ydb/core/tx/tx_proxy/upload_rows.h"
3+
#include "scan_common.h"
4+
#include <ydb/core/scheme/scheme_tablecell.h>
5+
#include <ydb/core/tx/tx_proxy/upload_rows.h>
66

77
namespace NKikimr::NDataShard {
88

9-
class TBufferData: public IStatHolder, public TNonCopyable {
9+
// TODO: move to tx/datashard/build_index/
10+
class TBufferData: public TNonCopyable {
1011
public:
1112
TBufferData()
1213
: Rows{std::make_shared<NTxProxy::TUploadRows>()} {
1314
}
1415

15-
ui64 GetRows() const override final {
16+
std::shared_ptr<NTxProxy::TUploadRows> GetRowsData() const {
17+
return Rows;
18+
}
19+
20+
ui64 GetRows() const {
1621
return Rows->size();
1722
}
1823

19-
std::shared_ptr<NTxProxy::TUploadRows> GetRowsData() const {
20-
return Rows;
24+
bool IsEmpty() const {
25+
return Rows->empty();
2126
}
2227

23-
ui64 GetBytes() const override final {
24-
return ByteSize;
28+
ui64 GetBufferBytes() const {
29+
return BufferBytes;
30+
}
31+
32+
ui64 GetRowCellBytes() const {
33+
return RowCellBytes;
2534
}
2635

2736
void FlushTo(TBufferData& other) {
28-
Y_ABORT_UNLESS(this != &other);
29-
Y_ABORT_UNLESS(other.IsEmpty());
37+
Y_ENSURE(this != &other);
38+
Y_ENSURE(other.IsEmpty());
3039
other.Rows.swap(Rows);
31-
other.ByteSize = std::exchange(ByteSize, 0);
40+
other.BufferBytes = std::exchange(BufferBytes, 0);
41+
other.RowCellBytes = std::exchange(RowCellBytes, 0);
3242
other.LastKey = std::exchange(LastKey, {});
3343
}
3444

3545
void Clear() {
3646
Rows->clear();
37-
ByteSize = 0;
47+
BufferBytes = 0;
48+
RowCellBytes = 0;
3849
LastKey = {};
3950
}
4051

41-
void AddRow(TSerializedCellVec&& rowKey, TString&& rowValue, TSerializedCellVec&& originalKey = {}) {
42-
Rows->emplace_back(std::move(rowKey), std::move(rowValue));
43-
ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
44-
LastKey = std::move(originalKey);
52+
void AddRow(TConstArrayRef<TCell> rowKey, TConstArrayRef<TCell> rowValue, TConstArrayRef<TCell> originalKey = {}) {
53+
AddRow(rowKey, rowValue, TSerializedCellVec::Serialize(rowValue), originalKey);
4554
}
4655

47-
bool IsEmpty() const {
48-
return Rows->empty();
49-
}
50-
51-
size_t Size() const {
52-
return Rows->size();
56+
void AddRow(TConstArrayRef<TCell> rowKey, TConstArrayRef<TCell> rowValue, TString&& rowValueSerialized, TConstArrayRef<TCell> originalKey = {}) {
57+
RowCellBytes += CountRowCellBytes(rowKey, rowValue);
58+
Rows->emplace_back(TSerializedCellVec{rowKey}, std::move(rowValueSerialized));
59+
BufferBytes += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
60+
LastKey = TSerializedCellVec{originalKey};
5361
}
5462

55-
bool HasReachedLimits(size_t rowsLimit, ui64 bytesLimit) const {
56-
return Rows->size() > rowsLimit || ByteSize > bytesLimit;
63+
bool HasReachedLimits(const TIndexBuildScanSettings& scanSettings) const {
64+
return Rows->size() > scanSettings.GetMaxBatchRows() || BufferBytes > scanSettings.GetMaxBatchBytes();
5765
}
5866

5967
auto&& ExtractLastKey() {
@@ -66,7 +74,8 @@ class TBufferData: public IStatHolder, public TNonCopyable {
6674

6775
private:
6876
std::shared_ptr<NTxProxy::TUploadRows> Rows;
69-
ui64 ByteSize = 0;
77+
ui64 BufferBytes = 0;
78+
ui64 RowCellBytes = 0;
7079
TSerializedCellVec LastKey;
7180
};
7281

ydb/core/tx/datashard/build_index/common_helper.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#pragma once
22

3+
#include <ydb/core/tx/datashard/buffer_data.h>
34
#include <ydb/core/tx/datashard/datashard_impl.h>
45
#include <ydb/core/tx/datashard/scan_common.h>
6+
#include <ydb/core/tx/datashard/upload_stats.h>
57
#include <ydb/library/actors/core/log.h>
68

79
namespace NKikimr::NDataShard {
@@ -53,7 +55,7 @@ class TBatchRowsUploader
5355
}
5456

5557
UploadRows += Uploading.Buffer.GetRows();
56-
UploadBytes += Uploading.Buffer.GetBytes();
58+
UploadBytes += Uploading.Buffer.GetRowCellBytes();
5759
Uploading.Buffer.Clear();
5860
RetryCount = 0;
5961

@@ -68,7 +70,7 @@ class TBatchRowsUploader
6870
{
6971
bool hasReachedLimit = false;
7072
for (auto& [_, dst] : Destinations) {
71-
if (HasReachedLimits(dst.Buffer, ScanSettings)) {
73+
if (dst.Buffer.HasReachedLimits(ScanSettings)) {
7274
hasReachedLimit = true;
7375
break;
7476
}
@@ -88,7 +90,7 @@ class TBatchRowsUploader
8890

8991
hasReachedLimit = false;
9092
for (auto& [_, dst] : Destinations) {
91-
if (HasReachedLimits(dst.Buffer, ScanSettings)) {
93+
if (dst.Buffer.HasReachedLimits(ScanSettings)) {
9294
hasReachedLimit = true;
9395
break;
9496
}
@@ -174,7 +176,7 @@ class TBatchRowsUploader
174176
TStringBuilder result;
175177

176178
if (Uploading) {
177-
result << "UploadTable: " << Uploading.Table << " UploadBuf size: " << Uploading.Buffer.Size() << " RetryCount: " << RetryCount;
179+
result << "UploadTable: " << Uploading.Table << " UploadBuf size: " << Uploading.Buffer.GetBufferBytes() << " RetryCount: " << RetryCount;
178180
}
179181

180182
return result;
@@ -187,7 +189,7 @@ class TBatchRowsUploader
187189
return true;
188190
}
189191

190-
if (!destination.Buffer.IsEmpty() && (!byLimit || HasReachedLimits(destination.Buffer, ScanSettings))) {
192+
if (!destination.Buffer.IsEmpty() && (!byLimit || destination.Buffer.HasReachedLimits(ScanSettings))) {
191193
Uploading.Table = destination.Table;
192194
Uploading.Types = destination.Types;
193195
destination.Buffer.FlushTo(Uploading.Buffer);

ydb/core/tx/datashard/build_index/kmeans_helper.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ void AddRowToLevel(TBufferData& buffer, TClusterId parent, TClusterId child, con
4242
std::array<TCell, 1> data;
4343
data[0] = TCell{embedding};
4444

45-
buffer.AddRow(TSerializedCellVec{pk}, TSerializedCellVec::Serialize(data));
45+
buffer.AddRow(pk, data);
4646
}
4747

4848
void AddRowToData(TBufferData& buffer, TClusterId parent, TArrayRef<const TCell> sourcePk,
@@ -53,14 +53,11 @@ void AddRowToData(TBufferData& buffer, TClusterId parent, TArrayRef<const TCell>
5353
EnsureNoPostingParentFlag(parent);
5454
}
5555

56-
std::array<TCell, 1> cells;
57-
cells[0] = TCell::Make(parent);
58-
auto pk = TSerializedCellVec::Serialize(cells);
59-
TSerializedCellVec::UnsafeAppendCells(sourcePk, pk);
56+
TVector<TCell> pk(::Reserve(sourcePk.size() + 1));
57+
pk.push_back(TCell::Make(parent));
58+
pk.insert(pk.end(), sourcePk.begin(), sourcePk.end());
6059

61-
buffer.AddRow(TSerializedCellVec{std::move(pk)},
62-
TSerializedCellVec::Serialize(dataColumns),
63-
TSerializedCellVec{origKey});
60+
buffer.AddRow(pk, dataColumns, origKey);
6461
}
6562

6663
TTags MakeScanTags(const TUserTable& table, const TProtoStringType& embedding,

ydb/core/tx/datashard/build_index/local_kmeans.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ class TLocalKMeansScan: public TActor<TLocalKMeansScan>, public NTable::IScan {
217217
// LOG_T("Feed " << Debug());
218218

219219
++ReadRows;
220-
ReadBytes += CountBytes(key, row);
220+
ReadBytes += CountRowCellBytes(key, *row);
221221

222222
if (PrefixColumns && Prefix && !TCellVectorsEquals{}(Prefix.GetCells(), key.subspan(0, PrefixColumns))) {
223223
if (!FinishPrefix()) {
@@ -234,8 +234,8 @@ class TLocalKMeansScan: public TActor<TLocalKMeansScan>, public NTable::IScan {
234234
}
235235

236236
if (IsFirstPrefixFeed && IsPrefixRowsValid) {
237-
PrefixRows.AddRow(TSerializedCellVec{key}, TSerializedCellVec::Serialize(*row));
238-
if (HasReachedLimits(PrefixRows, ScanSettings)) {
237+
PrefixRows.AddRow(key, *row);
238+
if (PrefixRows.HasReachedLimits(ScanSettings)) {
239239
PrefixRows.Clear();
240240
IsPrefixRowsValid = false;
241241
}
@@ -339,7 +339,7 @@ class TLocalKMeansScan: public TActor<TLocalKMeansScan>, public NTable::IScan {
339339
IsFirstPrefixFeed = false;
340340

341341
if (IsPrefixRowsValid) {
342-
LOG_T("FinishPrefix not finished, manually feeding " << PrefixRows.Size() << " saved rows " << Debug());
342+
LOG_T("FinishPrefix not finished, manually feeding " << PrefixRows.GetRows() << " saved rows " << Debug());
343343
for (ui64 iteration = 0; ; iteration++) {
344344
for (const auto& [key, row_] : *PrefixRows.GetRowsData()) {
345345
TSerializedCellVec row(row_);

ydb/core/tx/datashard/build_index/prefix_kmeans.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ class TPrefixKMeansScan: public TActor<TPrefixKMeansScan>, public NTable::IScan
250250
// LOG_T("Feed " << Debug());
251251

252252
++ReadRows;
253-
ReadBytes += CountBytes(key, row);
253+
ReadBytes += CountRowCellBytes(key, *row);
254254

255255
if (Prefix && !TCellVectorsEquals{}(Prefix.GetCells(), key.subspan(0, PrefixColumns))) {
256256
if (!FinishPrefix()) {
@@ -263,16 +263,16 @@ class TPrefixKMeansScan: public TActor<TPrefixKMeansScan>, public NTable::IScan
263263
Prefix = TSerializedCellVec{key.subspan(0, PrefixColumns)};
264264

265265
// write {Prefix..., Parent} row to PrefixBuf:
266-
auto pk = TSerializedCellVec::Serialize(Prefix.GetCells());
267-
std::array<TCell, 1> cells;
268-
cells[0] = TCell::Make(Parent);
269-
TSerializedCellVec::UnsafeAppendCells(cells, pk);
270-
PrefixBuf->AddRow(TSerializedCellVec{std::move(pk)}, TSerializedCellVec::Serialize({}));
266+
TVector<TCell> pk(::Reserve(Prefix.GetCells().size() + 1));
267+
pk.insert(pk.end(), Prefix.GetCells().begin(), Prefix.GetCells().end());
268+
pk.push_back(TCell::Make(Parent));
269+
270+
PrefixBuf->AddRow(pk, {});
271271
}
272272

273273
if (IsFirstPrefixFeed && IsPrefixRowsValid) {
274-
PrefixRows.AddRow(TSerializedCellVec{key}, TSerializedCellVec::Serialize(*row));
275-
if (HasReachedLimits(PrefixRows, ScanSettings)) {
274+
PrefixRows.AddRow(key, *row);
275+
if (PrefixRows.HasReachedLimits(ScanSettings)) {
276276
PrefixRows.Clear();
277277
IsPrefixRowsValid = false;
278278
}
@@ -385,7 +385,7 @@ class TPrefixKMeansScan: public TActor<TPrefixKMeansScan>, public NTable::IScan
385385
IsFirstPrefixFeed = false;
386386

387387
if (IsPrefixRowsValid) {
388-
LOG_T("FinishPrefix not finished, manually feeding " << PrefixRows.Size() << " saved rows " << Debug());
388+
LOG_T("FinishPrefix not finished, manually feeding " << PrefixRows.GetRows() << " saved rows " << Debug());
389389
for (ui64 iteration = 0; ; iteration++) {
390390
for (const auto& [key, row_] : *PrefixRows.GetRowsData()) {
391391
TSerializedCellVec row(row_);

ydb/core/tx/datashard/build_index/reshuffle_kmeans.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class TReshuffleKMeansScan: public TActor<TReshuffleKMeansScan>, public NTable::
191191
// LOG_T("Feed " << Debug());
192192

193193
++ReadRows;
194-
ReadBytes += CountBytes(key, row);
194+
ReadBytes += CountRowCellBytes(key, *row);
195195

196196
Feed(key, *row);
197197

ydb/core/tx/datashard/build_index/sample_k.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
135135
// LOG_T("Feed " << Debug());
136136

137137
++ReadRows;
138-
ReadBytes += CountBytes(key, row);
138+
ReadBytes += CountRowCellBytes(key, *row);
139139

140140
Sampler.Add([&row](){
141141
return TSerializedCellVec::Serialize(*row);

ydb/core/tx/datashard/build_index/secondary_index.cpp

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
155155

156156
addRow();
157157

158-
if (!HasReachedLimits(ReadBuf, ScanSettings)) {
158+
if (!ReadBuf.HasReachedLimits(ScanSettings)) {
159159
return EScan::Feed;
160160
}
161161

@@ -271,8 +271,8 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
271271
return TStringBuilder() << "TBuildIndexScan TabletId: " << DataShardId << " Id: " << BuildIndexId
272272
<< ", requested range: " << DebugPrintRange(KeyTypes, RequestedRange.ToTableRange(), *AppData()->TypeRegistry)
273273
<< ", last acked point: " << DebugPrintPoint(KeyTypes, LastUploadedKey.GetCells(), *AppData()->TypeRegistry)
274-
<< Stats.ToString()
275-
<< UploadStatus.ToString();
274+
<< " " << Stats.ToString()
275+
<< " " << UploadStatus.ToString();
276276
}
277277

278278
EScan PageFault() noexcept override {
@@ -327,7 +327,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
327327
UploadStatus.Issues.AddIssues(ev->Get()->Issues);
328328

329329
if (UploadStatus.IsSuccess()) {
330-
Stats.Aggr(&WriteBuf);
330+
Stats.Aggr(WriteBuf.GetRows(), WriteBuf.GetRowCellBytes());
331331
LastUploadedKey = WriteBuf.ExtractLastKey();
332332

333333
//send progress
@@ -340,15 +340,16 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
340340
// TODO(mbkkt) ReleaseBuffer isn't possible, we use LastUploadedKey for logging
341341
progress->Record.SetLastKeyAck(LastUploadedKey.GetBuffer());
342342
progress->Record.SetRowsDelta(WriteBuf.GetRows());
343-
progress->Record.SetBytesDelta(WriteBuf.GetBytes());
343+
// TODO: use GetRowCellBytes method?
344+
progress->Record.SetBytesDelta(WriteBuf.GetBufferBytes());
344345
WriteBuf.Clear();
345346

346347
progress->Record.SetStatus(NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS);
347348
UploadStatusToMessage(progress->Record);
348349

349350
this->Send(ProgressActorId, progress.Release());
350351

351-
if (HasReachedLimits(ReadBuf, ScanSettings)) {
352+
if (ReadBuf.HasReachedLimits(ScanSettings)) {
352353
ReadBuf.FlushTo(WriteBuf);
353354
Upload();
354355
}
@@ -420,14 +421,15 @@ class TBuildIndexScan final: public TBuildScanUpload<NKikimrServices::TActivity:
420421
const auto rowCells = *row;
421422

422423
ReadBuf.AddRow(
423-
TSerializedCellVec(rowCells.Slice(0, TargetDataColumnPos)),
424-
TSerializedCellVec::Serialize(rowCells.Slice(TargetDataColumnPos)),
425-
TSerializedCellVec(key));
424+
rowCells.Slice(0, TargetDataColumnPos),
425+
rowCells.Slice(TargetDataColumnPos),
426+
key);
426427
});
427428
}
428429
};
429430

430431
class TBuildColumnsScan final: public TBuildScanUpload<NKikimrServices::TActivity::BUILD_COLUMNS_SCAN_ACTOR> {
432+
TVector<TCell> Value;
431433
TString ValueSerialized;
432434

433435
public:
@@ -446,21 +448,19 @@ class TBuildColumnsScan final: public TBuildScanUpload<NKikimrServices::TActivit
446448
UploadMode = NTxProxy::EUploadRowsMode::UpsertIfExists;
447449

448450
TMemoryPool valueDataPool(256);
449-
TVector<TCell> cells;
450451
TString err;
451-
Y_ABORT_UNLESS(BuildExtraColumns(cells, columnBuildSettings, err, valueDataPool));
452-
ValueSerialized = TSerializedCellVec::Serialize(cells);
452+
Y_ABORT_UNLESS(BuildExtraColumns(Value, columnBuildSettings, err, valueDataPool));
453+
ValueSerialized = TSerializedCellVec::Serialize(Value);
453454
}
454455

455456
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final {
456457
return FeedImpl(key, row, [&] {
457-
TSerializedCellVec pk(key);
458-
auto pkTarget = pk;
459-
auto valueTarget = ValueSerialized;
458+
auto valueSerializedCopy = ValueSerialized;
460459
ReadBuf.AddRow(
461-
std::move(pkTarget),
462-
std::move(valueTarget),
463-
std::move(pk));
460+
key,
461+
Value,
462+
std::move(valueSerializedCopy),
463+
key);
464464
});
465465
}
466466
};

ydb/core/tx/datashard/scan_common.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ TColumnsTypes GetAllTypes(const TUserTable& tableInfo) {
2929
return result;
3030
}
3131

32-
ui64 CountBytes(TArrayRef<const TCell> key, const NTable::TRowState& row) {
32+
// amount of read data is calculated based only on row cells as it is done in KQP part
33+
// complex scan read bytes metric would be too hard to be explained for users
34+
ui64 CountRowCellBytes(TConstArrayRef<TCell> key, TConstArrayRef<TCell> value) {
3335
ui64 bytes = 0;
3436
for (auto& cell : key) {
3537
bytes += cell.Size();
3638
}
37-
for (auto& cell : *row) {
39+
for (auto& cell : value) {
3840
bytes += cell.Size();
3941
}
4042
return bytes;

0 commit comments

Comments
 (0)