Skip to content

Commit ba6d934

Browse files
authored
Add sample k collect for first level of vector index table (#7412)
1 parent 7059c21 commit ba6d934

24 files changed

+955
-105
lines changed

ydb/core/protos/index_builder.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,14 @@ enum EBuildStatus {
117117
BUILD_ERROR = 5;
118118
BAD_REQUEST = 6;
119119
}
120+
121+
message TEvUploadSampleKResponse {
122+
optional uint64 Id = 1;
123+
124+
optional Ydb.StatusIds.StatusCode UploadStatus = 2;
125+
repeated Ydb.Issue.IssueMessage Issues = 3;
126+
127+
optional uint64 RowsDelta = 4;
128+
optional uint64 BytesDelta = 5;
129+
}
130+

ydb/core/protos/out/out.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include <ydb/public/api/protos/ydb_table.pb.h>
2+
13
#include <ydb/core/protos/blobstorage.pb.h>
24
#include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
35
#include <ydb/core/protos/blobstorage_vdisk_config.pb.h>
@@ -251,3 +253,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvAnalyzeStatusResponse_EStatus, stream, valu
251253
Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value) {
252254
stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
253255
}
256+
257+
Y_DECLARE_OUT_SPEC(, Ydb::Table::IndexBuildState_State, stream, value) {
258+
stream << IndexBuildState_State_Name(value);
259+
}

ydb/core/protos/tx_datashard.proto

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,16 +1451,20 @@ message TEvSampleKRequest {
14511451
optional NKikimrProto.TPathID PathId = 3;
14521452

14531453
optional uint64 K = 4;
1454+
// For now we use tablet id as seed value, but it's not convinient for tests
14541455
optional uint64 Seed = 5;
1456+
// We want to sample small instead of large probabilites
1457+
// because size of message will be smaller (varint)
1458+
optional uint64 MaxProbability = 6;
14551459

1456-
repeated string Columns = 6;
1457-
optional NKikimrTx.TKeyRange KeyRange = 7;
1460+
repeated string Columns = 7;
1461+
optional NKikimrTx.TKeyRange KeyRange = 8;
14581462

1459-
optional uint64 SnapshotTxId = 8;
1460-
optional uint64 SnapshotStep = 9;
1463+
optional uint64 SnapshotTxId = 9;
1464+
optional uint64 SnapshotStep = 10;
14611465

1462-
optional uint64 SeqNoGeneration = 10;
1463-
optional uint64 SeqNoRound = 11;
1466+
optional uint64 SeqNoGeneration = 11;
1467+
optional uint64 SeqNoRound = 12;
14641468
}
14651469

14661470
message TEvSampleKResponse {
@@ -1472,13 +1476,14 @@ message TEvSampleKResponse {
14721476
optional NKikimrIndexBuilder.EBuildStatus Status = 4;
14731477
repeated Ydb.Issue.IssueMessage Issues = 5;
14741478

1475-
optional NKikimrTx.TKeyRange RequestedKeyRange = 6;
1479+
optional uint64 RowsDelta = 6;
1480+
optional uint64 BytesDelta = 7;
14761481

1477-
optional uint64 RequestSeqNoGeneration = 7;
1478-
optional uint64 RequestSeqNoRound = 8;
1482+
optional uint64 RequestSeqNoGeneration = 8;
1483+
optional uint64 RequestSeqNoRound = 9;
14791484

1480-
repeated uint64 Probabilities = 9;
1481-
repeated bytes Rows = 10;
1485+
repeated uint64 Probabilities = 10;
1486+
repeated bytes Rows = 11;
14821487
}
14831488

14841489
message TEvCdcStreamScanRequest {

ydb/core/scheme/scheme_tablecell.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,7 @@ namespace {
122122
if (!SerializeCellVecInit(cells, resultBuffer, resultCells))
123123
return;
124124

125-
size_t size = sizeof(ui16);
126-
for (auto& cell : cells)
127-
size += sizeof(TCellHeader) + cell.Size();
125+
auto size = TSerializedCellVec::SerializedSize(cells);
128126

129127
resultBuffer.ReserveAndResize(size);
130128
char* resultBufferData = resultBuffer.Detach();
@@ -254,6 +252,14 @@ TString TSerializedCellVec::Serialize(TConstArrayRef<TCell> cells) {
254252
return result;
255253
}
256254

255+
size_t TSerializedCellVec::SerializedSize(TConstArrayRef<TCell> cells) {
256+
size_t size = sizeof(ui16) + sizeof(TCellHeader) * cells.size();
257+
for (auto& cell : cells) {
258+
size += cell.Size();
259+
}
260+
return size;
261+
}
262+
257263
bool TSerializedCellVec::DoTryParse(const TString& data) {
258264
return TryDeserializeCellVec(data, Buf, Cells);
259265
}

ydb/core/scheme/scheme_tablecell.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,8 @@ class TSerializedCellVec {
567567

568568
static TString Serialize(TConstArrayRef<TCell> cells);
569569

570+
static size_t SerializedSize(TConstArrayRef<TCell> cells);
571+
570572
const TString &GetBuffer() const { return Buf; }
571573

572574
TString ReleaseBuffer() {

ydb/core/tx/columnshard/defs.cpp

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,6 @@
22

33
namespace NKikimr::NColumnShard {
44

5-
namespace {
6-
const constexpr ui64 MAX_BLOB_SIZE_LIMIT = 8 * 1024 * 1024;
7-
ui64 MaxBlobSize = MAX_BLOB_SIZE_LIMIT;
8-
}
9-
10-
ui64 TLimits::GetMaxBlobSize() {
11-
return MaxBlobSize;
12-
}
13-
14-
ui64 TLimits::GetBlobSizeLimit() {
15-
return MAX_BLOB_SIZE_LIMIT;
16-
}
175

186
void TLimits::SetMaxBlobSize(const ui64 value) {
197
Y_ABORT_UNLESS(value <= MAX_BLOB_SIZE_LIMIT);

ydb/core/tx/columnshard/defs.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,23 @@ namespace NKikimr::NColumnShard {
1515
using TLogThis = TCtorLogger<NKikimrServices::TX_COLUMNSHARD>;
1616

1717
struct TLimits {
18+
private:
19+
static constexpr ui64 MAX_BLOB_SIZE_LIMIT = 8 * 1024 * 1024;
20+
static inline ui64 MaxBlobSize = MAX_BLOB_SIZE_LIMIT;
21+
22+
public:
1823
static constexpr const ui32 MIN_SMALL_BLOBS_TO_INSERT = 200;
1924
static constexpr const ui32 MIN_BYTES_TO_INSERT = 4 * 1024 * 1024;
2025
static constexpr const ui64 MAX_BYTES_TO_INSERT = 16 * 1024 * 1024;
2126
static constexpr const ui32 MAX_TX_RECORDS = 100000;
2227
static constexpr const ui64 MAX_BLOBS_TO_DELETE = NOlap::TCompactionLimits::MAX_BLOBS_TO_DELETE;
2328

24-
static ui64 GetBlobSizeLimit();
25-
static ui64 GetMaxBlobSize();
29+
static constexpr ui64 GetBlobSizeLimit() {
30+
return MAX_BLOB_SIZE_LIMIT;
31+
}
32+
static ui64 GetMaxBlobSize() {
33+
return MaxBlobSize;
34+
}
2635
static void SetMaxBlobSize(const ui64 value);
2736

2837
class TMaxBlobSizeGuard: TNonCopyable {

ydb/core/tx/datashard/build_index.cpp

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ namespace NKikimr::NDataShard {
2828
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream)
2929

3030
using TColumnsTypes = THashMap<TString, NScheme::TTypeInfo>;
31-
using TTypes = TVector<std::pair<TString, Ydb::Type>>;
32-
using TRows = TVector<std::pair<TSerializedCellVec, TString>>;
31+
using TTypes = NTxProxy::TUploadTypes;
32+
using TRows = NTxProxy::TUploadRows;
3333

3434
static TColumnsTypes GetAllTypes(const TUserTable& tableInfo) {
3535
TColumnsTypes result;
@@ -119,42 +119,6 @@ bool BuildExtraColumns(TVector<TCell>& cells, const NKikimrIndexBuilder::TColumn
119119
return true;
120120
}
121121

122-
struct TStatus {
123-
Ydb::StatusIds::StatusCode StatusCode = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
124-
NYql::TIssues Issues;
125-
126-
bool IsNone() const {
127-
return StatusCode == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
128-
}
129-
130-
bool IsSuccess() const {
131-
return StatusCode == Ydb::StatusIds::SUCCESS;
132-
}
133-
134-
bool IsRetriable() const {
135-
return StatusCode == Ydb::StatusIds::UNAVAILABLE || StatusCode == Ydb::StatusIds::OVERLOADED;
136-
}
137-
138-
TString ToString() const {
139-
return TStringBuilder()
140-
<< "Status {"
141-
<< " Code: " << Ydb::StatusIds_StatusCode_Name(StatusCode)
142-
<< " Issues: " << Issues.ToString()
143-
<< " }";
144-
}
145-
};
146-
147-
struct TUploadLimits {
148-
ui64 BatchRowsLimit = 500;
149-
ui64 BatchBytesLimit = 1u << 23; // 8MB
150-
ui32 MaxUploadRowsRetryCount = 50;
151-
ui32 BackoffCeiling = 3;
152-
153-
TDuration GetTimeoutBackouff(ui32 retryNo) const {
154-
return TDuration::Seconds(1u << Max(retryNo, BackoffCeiling));
155-
}
156-
};
157-
158122
class TBufferData: public IStatHolder, public TNonCopyable {
159123
public:
160124
TBufferData()
@@ -258,7 +222,7 @@ class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable
258222
ui64 RetryCount = 0;
259223

260224
TUploadMonStats Stats = TUploadMonStats("tablets", "build_index_upload");
261-
TStatus UploadStatus;
225+
TUploadStatus UploadStatus;
262226

263227
TBuildScanUpload(ui64 buildIndexId,
264228
const TString& target,

ydb/core/tx/datashard/datashard_ut_sample_k.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardSampleKScan) {
6262
rec.SetSnapshotTxId(snapshot.TxId);
6363
rec.SetSnapshotStep(snapshot.Step);
6464

65+
rec.SetMaxProbability(std::numeric_limits<uint64_t>::max());
6566
rec.SetSeed(1337);
6667
if (!rec.HasK()) {
6768
rec.SetK(1);
@@ -104,6 +105,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardSampleKScan) {
104105
rec.SetSnapshotTxId(snapshot.TxId);
105106
rec.SetSnapshotStep(snapshot.Step);
106107

108+
rec.SetMaxProbability(std::numeric_limits<uint64_t>::max());
107109
rec.SetSeed(seed);
108110
rec.SetK(k);
109111
};

ydb/core/tx/datashard/sample_k.cpp

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
3636
const TSerializedTableRange RequestedRange;
3737
const ui64 K;
3838

39-
IDriver* Driver = nullptr;
40-
4139
struct TProbability {
4240
ui64 P = 0;
4341
ui64 I = 0;
@@ -46,11 +44,18 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
4644
auto operator<=>(const TProbability&) const noexcept = default;
4745
};
4846

49-
TReallyFastRng32 Rng;
47+
ui64 RowsCount = 0;
48+
ui64 RowsBytes = 0;
49+
50+
// We are using binary heap, because we don't want to do batch processing here,
51+
// serialization is more expensive than compare
5052
ui64 MaxProbability = 0;
53+
TReallyFastRng32 Rng;
5154
std::vector<TProbability> MaxRows;
5255
std::vector<TString> DataRows;
5356

57+
IDriver* Driver = nullptr;
58+
5459
public:
5560
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
5661
return NKikimrServices::TActivity::SAMPLE_K_SCAN_ACTOR;
@@ -61,6 +66,7 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
6166
const TSerializedTableRange& range,
6267
ui64 k,
6368
ui64 seed,
69+
ui64 maxProbability,
6470
TProtoColumnsCRef columns,
6571
const TUserTable& tableInfo)
6672
: TActor(&TThis::StateWork)
@@ -71,8 +77,10 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
7177
, TableRange(tableInfo.Range)
7278
, RequestedRange(range)
7379
, K(k)
80+
, MaxProbability(maxProbability)
7481
, Rng(seed)
7582
{
83+
Y_ASSERT(MaxProbability != 0);
7684
}
7785

7886
~TSampleKScan() final = default;
@@ -109,19 +117,32 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
109117

110118
EScan Feed(TArrayRef<const TCell> key, const TRow& row) noexcept final {
111119
LOG_T("Feed key " << DebugPrintPoint(KeyTypes, key, *AppData()->TypeRegistry) << " " << Debug());
120+
++RowsCount;
121+
122+
const auto probability = GetProbability();
123+
if (probability > MaxProbability) {
124+
// TODO(mbkkt) it's not nice that we need to compute this, probably can be precomputed in TRow
125+
RowsBytes += TSerializedCellVec::SerializedSize(*row);
126+
return EScan::Feed;
127+
}
128+
129+
auto serialized = TSerializedCellVec::Serialize(*row);
130+
RowsBytes += serialized.size();
112131

113-
const auto probability = Rng.GenRand64();
114132
if (DataRows.size() < K) {
115133
MaxRows.push_back({probability, DataRows.size()});
116-
DataRows.emplace_back(TSerializedCellVec::Serialize(*row));
134+
DataRows.emplace_back(std::move(serialized));
117135
if (DataRows.size() == K) {
118136
std::make_heap(MaxRows.begin(), MaxRows.end());
119137
MaxProbability = MaxRows.front().P;
120138
}
121-
} else if (probability < MaxProbability) {
122-
ReplaceRow(row, probability);
139+
} else {
140+
ReplaceRow(std::move(serialized), probability);
123141
}
124142

143+
if (MaxProbability == 0) {
144+
return EScan::Final;
145+
}
125146
return EScan::Feed;
126147
}
127148

@@ -167,21 +188,35 @@ class TSampleKScan final: public TActor<TSampleKScan>, public NTable::IScan {
167188
}
168189
}
169190

170-
void ReplaceRow(const TRow& row, ui64 p) {
191+
void ReplaceRow(TString&& row, ui64 p) {
192+
// TODO(mbkkt) use tournament tree to make less compare and swaps
171193
std::pop_heap(MaxRows.begin(), MaxRows.end());
172-
DataRows[MaxRows.back().I] = TSerializedCellVec::Serialize(*row);
194+
DataRows[MaxRows.back().I] = std::move(row);
173195
MaxRows.back().P = p;
174196
std::push_heap(MaxRows.begin(), MaxRows.end());
175197
MaxProbability = MaxRows.front().P;
176198
}
177199

178200
void FillResponse() {
179201
std::sort(MaxRows.begin(), MaxRows.end());
202+
auto& record = Response->Record;
180203
for (auto& [p, i] : MaxRows) {
181-
Response->Record.AddProbabilities(p);
182-
Response->Record.AddRows(std::move(DataRows[i]));
204+
record.AddProbabilities(p);
205+
record.AddRows(std::move(DataRows[i]));
206+
}
207+
record.SetRowsDelta(RowsCount);
208+
record.SetBytesDelta(RowsBytes);
209+
record.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE);
210+
}
211+
212+
ui64 GetProbability() {
213+
while (true) {
214+
auto p = Rng.GenRand64();
215+
// We exclude max ui64 from generated probabilities, so we can use this value as initial max
216+
if (Y_LIKELY(p != std::numeric_limits<ui64>::max())) {
217+
return p;
218+
}
183219
}
184-
Response->Record.SetStatus(NKikimrIndexBuilder::EBuildStatus::DONE);
185220
}
186221
};
187222

@@ -317,6 +352,7 @@ void TDataShard::HandleSafe(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TAc
317352
requestedRange,
318353
record.GetK(),
319354
record.GetSeed(),
355+
record.GetMaxProbability(),
320356
record.GetColumns(),
321357
userTable),
322358
ev->Cookie,

0 commit comments

Comments
 (0)