Skip to content

Commit b2b16e8

Browse files
speed up register blob idx (#9581)
1 parent dbe09f9 commit b2b16e8

File tree

4 files changed

+116
-32
lines changed

4 files changed

+116
-32
lines changed

ydb/core/kqp/ut/olap/sys_view_ut.cpp

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
2828
auto selectQuery = TString(R"(
2929
SELECT PathId, Kind, TabletId, Sum(Rows) as Rows
3030
FROM `/Root/olapStore/.sys/store_primary_index_portion_stats`
31+
WHERE Activity == 1
3132
GROUP BY PathId, Kind, TabletId
3233
ORDER BY TabletId, Kind, PathId
3334
)");
@@ -61,13 +62,14 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
6162
WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i * 10000, 1000);
6263
WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i * 10000, 2000);
6364
}
64-
csController->WaitCompactions(TDuration::Seconds(10));
65+
csController->WaitCompactions(TDuration::Seconds(5));
6566

6667
auto tableClient = kikimr.GetTableClient();
6768
{
6869
auto selectQuery = TString(R"(
6970
SELECT PathId, Kind, TabletId
7071
FROM `/Root/olapStore/olapTable_1/.sys/primary_index_stats`
72+
WHERE Activity = 1
7173
GROUP BY PathId, TabletId, Kind
7274
ORDER BY PathId, TabletId, Kind
7375
)");
@@ -82,6 +84,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
8284
auto selectQuery = TString(R"(
8385
SELECT PathId, Kind, TabletId
8486
FROM `/Root/olapStore/olapTable_2/.sys/primary_index_stats`
87+
WHERE Activity = 1
8588
GROUP BY PathId, TabletId, Kind
8689
ORDER BY PathId, TabletId, Kind
8790
)");
@@ -168,10 +171,10 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
168171
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, `COMPRESSION.TYPE`=`zstd`);");
169172
csController->WaitCompactions(TDuration::Seconds(10));
170173
}
171-
const ui64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1;
172-
const ui64 bytesUnpack = bytesUnpack1PK - bytesPK1;
173-
const ui64 rawBytesPack = rawBytesPackAndUnpack2PK - rawBytesUnpack1PK - rawBytesPK1;
174-
const ui64 bytesPack = bytesPackAndUnpack2PK - bytesUnpack1PK - bytesPK1;
174+
const i64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1;
175+
const i64 bytesUnpack = bytesUnpack1PK - bytesPK1;
176+
const i64 rawBytesPack = rawBytesPackAndUnpack2PK - rawBytesUnpack1PK - rawBytesPK1;
177+
const i64 bytesPack = bytesPackAndUnpack2PK - bytesUnpack1PK - bytesPK1;
175178
TStringBuilder result;
176179
result << "unpacked data: " << rawBytesUnpack << " / " << bytesUnpack << Endl;
177180
result << "packed data: " << rawBytesPack << " / " << bytesPack << Endl;
@@ -292,6 +295,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
292295
ui64 rawBytes1;
293296
ui64 bytes1;
294297
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
298+
csController->SetSmallSizeDetector(Max<ui32>());
295299
auto settings = TKikimrSettings().SetWithSampleTables(false);
296300
TKikimrRunner kikimr(settings);
297301
Tests::NCommon::TLoggerInit(kikimr).Initialize();
@@ -308,6 +312,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
308312
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=pk_int_max, TYPE=MAX, FEATURES=`{\"column_name\" : \"pk_int\"}`);");
309313
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);");
310314
csController->WaitActualization(TDuration::Seconds(40));
315+
csController->WaitCompactions(TDuration::Seconds(5));
311316
{
312317
ui64 rawBytes2;
313318
ui64 bytes2;
@@ -316,7 +321,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
316321
AFL_VERIFY(bytes2 < bytes1 * 0.5)("f1", bytes1)("f2", bytes2);
317322
std::vector<NJson::TJsonValue> stats;
318323
helper.GetStats(stats, true);
319-
AFL_VERIFY(stats.size() == 3);
324+
AFL_VERIFY(stats.size() == 3)("count", stats.size());
320325
for (auto&& i : stats) {
321326
AFL_VERIFY(i.IsArray());
322327
AFL_VERIFY(i.GetArraySafe().size() == 1);
@@ -396,6 +401,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
396401
auto selectQuery = TString(R"(
397402
SELECT SUM(BlobRangeSize) as Bytes, SUM(Rows) as Rows, PathId, TabletId
398403
FROM `/Root/olapStore/.sys/store_primary_index_stats`
404+
WHERE Activity == 1
399405
GROUP BY PathId, TabletId
400406
ORDER BY Bytes
401407
)");
@@ -409,6 +415,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
409415
auto selectQuery = TString(R"(
410416
SELECT Sum(Rows) as Rows, Kind, Sum(ColumnRawBytes) as RawBytes, PathId
411417
FROM `/Root/olapStore/.sys/store_primary_index_portion_stats`
418+
WHERE Activity == 1
412419
GROUP BY Kind, PathId
413420
ORDER BY PathId, Kind, Rows
414421
)");
@@ -529,6 +536,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
529536
auto selectQuery = TString(R"(
530537
SELECT PathId, Kind, TabletId, Sum(BlobRangeSize) as Bytes
531538
FROM `/Root/olapStore/.sys/store_primary_index_stats`
539+
WHERE Activity == 1
532540
GROUP BY PathId, Kind, TabletId
533541
ORDER BY PathId, Kind, TabletId;
534542
)");
@@ -542,6 +550,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
542550
auto selectQuery = TString(R"(
543551
SELECT PathId, Kind, TabletId, Sum(BlobRangeSize) as Bytes
544552
FROM `/Root/olapStore/.sys/store_primary_index_stats`
553+
WHERE Activity == 1
545554
GROUP BY PathId, Kind, TabletId
546555
ORDER BY PathId, Kind, TabletId;
547556
)");
@@ -569,6 +578,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
569578
SELECT PathId, Kind, TabletId
570579
FROM `/Root/olapStore/.sys/store_primary_index_stats`
571580
WHERE Kind IN ('SPLIT_COMPACTED', 'INACTIVE', 'EVICTED', 'INSERTED')
581+
AND Activity == 1
572582
GROUP BY PathId, Kind, TabletId
573583
ORDER BY PathId, Kind, TabletId;
574584
)");
@@ -700,6 +710,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
700710
auto selectQuery = TString(R"(
701711
SELECT PathId, TabletId, Kind
702712
FROM `/Root/olapStore/.sys/store_primary_index_stats`
713+
WHERE Activity == 1
703714
GROUP BY PathId, TabletId, Kind
704715
)");
705716

@@ -715,6 +726,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
715726
count(distinct(Kind)) as KindsCount,
716727
count(distinct(TabletId)) as TabletsCount
717728
FROM `/Root/olapStore/.sys/store_primary_index_stats`
729+
WHERE Activity == 1
718730
)");
719731

720732
auto rows = ExecuteScanQuery(tableClient, selectQuery);
@@ -727,6 +739,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
727739
auto selectQuery = TString(R"(
728740
SELECT PathId, count(*), sum(Rows), sum(BlobRangeSize), sum(RawBytes)
729741
FROM `/Root/olapStore/.sys/store_primary_index_stats`
742+
WHERE Activity == 1
730743
GROUP BY PathId
731744
ORDER BY PathId
732745
)");

ydb/core/tx/columnshard/engines/portions/constructor.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,45 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
3131
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("portion_id", GetPortionIdVerified());
3232
FullValidation();
3333

34+
if (BlobIdxs.size()) {
35+
auto itRecord = Records.begin();
36+
auto itIndex = Indexes.begin();
37+
auto itBlobIdx = BlobIdxs.begin();
38+
while (itRecord != Records.end() && itIndex != Indexes.end() && itBlobIdx != BlobIdxs.end()) {
39+
if (itRecord->GetAddress() < itIndex->GetAddress()) {
40+
AFL_VERIFY(itRecord->GetAddress() == itBlobIdx->GetAddress());
41+
itRecord->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
42+
++itRecord;
43+
++itBlobIdx;
44+
} else if (itIndex->GetAddress() < itRecord->GetAddress()) {
45+
if (itIndex->HasBlobData()) {
46+
++itIndex;
47+
continue;
48+
}
49+
AFL_VERIFY(itIndex->GetAddress() == itBlobIdx->GetAddress());
50+
itIndex->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
51+
++itIndex;
52+
++itBlobIdx;
53+
} else {
54+
AFL_VERIFY(false);
55+
}
56+
}
57+
for (; itRecord != Records.end() && itBlobIdx != BlobIdxs.end(); ++itRecord, ++itBlobIdx) {
58+
AFL_VERIFY(itRecord->GetAddress() == itBlobIdx->GetAddress());
59+
itRecord->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
60+
}
61+
for (; itIndex != Indexes.end() && itBlobIdx != BlobIdxs.end(); ++itIndex) {
62+
if (itIndex->HasBlobData()) {
63+
continue;
64+
}
65+
AFL_VERIFY(itIndex->GetAddress() == itBlobIdx->GetAddress());
66+
itIndex->RegisterBlobIdx(itBlobIdx->GetBlobIdx());
67+
++itBlobIdx;
68+
}
69+
AFL_VERIFY(itRecord == Records.end());
70+
AFL_VERIFY(itBlobIdx == BlobIdxs.end());
71+
}
72+
3473
result.Indexes = Indexes;
3574
result.Records = Records;
3675
result.BlobIds = BlobIds;

ydb/core/tx/columnshard/engines/portions/constructor.h

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,26 @@ class TPortionInfoConstructor {
2929
YDB_ACCESSOR_DEF(std::vector<TColumnRecord>, Records);
3030
std::vector<TUnifiedBlobId> BlobIds;
3131

32+
class TAddressBlobId {
33+
private:
34+
TChunkAddress Address;
35+
YDB_READONLY(TBlobRangeLink16::TLinkId, BlobIdx, 0);
36+
37+
public:
38+
const TChunkAddress& GetAddress() const {
39+
return Address;
40+
}
41+
42+
TAddressBlobId(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx)
43+
: Address(address)
44+
, BlobIdx(blobIdx)
45+
{
46+
47+
}
48+
};
49+
std::vector<TAddressBlobId> BlobIdxs;
50+
bool NeedBlobIdxsSort = false;
51+
3252
public:
3353
void SetPortionId(const ui64 value) {
3454
AFL_VERIFY(value);
@@ -216,6 +236,16 @@ class TPortionInfoConstructor {
216236
return linkRange.RestoreRange(GetBlobId(linkRange.GetBlobIdxVerified()));
217237
}
218238

239+
const TBlobRange RestoreBlobRangeSlow(const TBlobRangeLink16& linkRange, const TChunkAddress& address) const {
240+
for (auto&& i : BlobIdxs) {
241+
if (i.GetAddress() == address) {
242+
return linkRange.RestoreRange(GetBlobId(i.GetBlobIdx()));
243+
}
244+
}
245+
AFL_VERIFY(false);
246+
return TBlobRange();
247+
}
248+
219249
const TUnifiedBlobId& GetBlobId(const TBlobRangeLink16::TLinkId linkId) const {
220250
AFL_VERIFY(linkId < BlobIds.size());
221251
return BlobIds[linkId];
@@ -226,19 +256,10 @@ class TPortionInfoConstructor {
226256
}
227257

228258
void RegisterBlobIdx(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx) {
229-
for (auto&& i : Records) {
230-
if (i.GetColumnId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) {
231-
i.RegisterBlobIdx(blobIdx);
232-
return;
233-
}
259+
if (BlobIdxs.size() && address < BlobIdxs.back().GetAddress()) {
260+
NeedBlobIdxsSort = true;
234261
}
235-
for (auto&& i : Indexes) {
236-
if (i.GetIndexId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) {
237-
i.RegisterBlobIdx(blobIdx);
238-
return;
239-
}
240-
}
241-
AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString());
262+
BlobIdxs.emplace_back(address, blobIdx);
242263
}
243264

244265
TString DebugString() const {
@@ -265,26 +286,37 @@ class TPortionInfoConstructor {
265286
std::sort(Indexes.begin(), Indexes.end(), pred);
266287
CheckChunksOrder(Indexes);
267288
}
289+
if (NeedBlobIdxsSort) {
290+
auto pred = [](const TAddressBlobId& l, const TAddressBlobId& r) {
291+
return l.GetAddress() < r.GetAddress();
292+
};
293+
std::sort(BlobIdxs.begin(), BlobIdxs.end(), pred);
294+
}
268295
}
269296

270297
void FullValidation() const {
271298
AFL_VERIFY(Records.size());
272299
CheckChunksOrder(Records);
273300
CheckChunksOrder(Indexes);
274-
std::set<ui32> blobIdxs;
275-
for (auto&& i : Records) {
276-
blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified());
277-
}
278-
for (auto&& i : Indexes) {
279-
if (i.HasBlobRange()) {
280-
blobIdxs.emplace(i.GetBlobRangeVerified().GetBlobIdxVerified());
281-
}
282-
}
283-
if (BlobIds.size()) {
284-
AFL_VERIFY(BlobIds.size() == blobIdxs.size());
285-
AFL_VERIFY(BlobIds.size() == *blobIdxs.rbegin() + 1);
301+
if (BlobIdxs.size()) {
302+
AFL_VERIFY(BlobIdxs.size() <= Records.size() + Indexes.size())("blobs", BlobIdxs.size())("records", Records.size())(
303+
"indexes", Indexes.size());
286304
} else {
287-
AFL_VERIFY(blobIdxs.empty());
305+
std::set<ui32> blobIdxs;
306+
for (auto&& i : Records) {
307+
blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified());
308+
}
309+
for (auto&& i : Indexes) {
310+
if (i.HasBlobRange()) {
311+
blobIdxs.emplace(i.GetBlobRangeVerified().GetBlobIdxVerified());
312+
}
313+
}
314+
if (BlobIds.size()) {
315+
AFL_VERIFY(BlobIds.size() == blobIdxs.size());
316+
AFL_VERIFY(BlobIds.size() == *blobIdxs.rbegin() + 1);
317+
} else {
318+
AFL_VERIFY(blobIdxs.empty());
319+
}
288320
}
289321
}
290322

ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ void AddIdsToBlobs(std::vector<TWritePortionInfoWithBlobsResult>& portions, NBlo
284284
blobsData.emplace(blobId, b.GetResultBlob());
285285
}
286286
for (auto&& rec : portion.GetPortionConstructor().GetRecords()) {
287-
auto range = portion.GetPortionConstructor().RestoreBlobRange(rec.BlobRange);
287+
auto range = portion.GetPortionConstructor().RestoreBlobRangeSlow(rec.BlobRange, rec.GetAddress());
288288
auto it = blobsData.find(range.BlobId);
289289
AFL_VERIFY(it != blobsData.end());
290290
const TString& data = it->second;

0 commit comments

Comments
 (0)