Skip to content

Commit 8585364

Browse files
speed up scan initialization (#9086)
1 parent 89b94f9 commit 8585364

File tree

14 files changed

+159
-130
lines changed

14 files changed

+159
-130
lines changed

ydb/core/tx/columnshard/engines/insert_table/committed.h

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,24 @@ class TCommittedBlob {
5656
YDB_READONLY(ui64, SchemaVersion, 0);
5757
YDB_READONLY(ui64, RecordsCount, 0);
5858
YDB_READONLY(bool, IsDelete, false);
59-
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First);
60-
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last);
59+
NArrow::TReplaceKey First;
60+
NArrow::TReplaceKey Last;
6161
YDB_READONLY_DEF(NArrow::TSchemaSubset, SchemaSubset);
6262

6363
public:
64-
ui64 GetSize() const {
65-
return BlobRange.Size;
64+
const NArrow::TReplaceKey& GetFirst() const {
65+
return First;
6666
}
67-
68-
const NArrow::TReplaceKey& GetFirstVerified() const {
69-
Y_ABORT_UNLESS(First);
70-
return *First;
67+
const NArrow::TReplaceKey& GetLast() const {
68+
return Last;
7169
}
7270

73-
const NArrow::TReplaceKey& GetLastVerified() const {
74-
Y_ABORT_UNLESS(Last);
75-
return *Last;
71+
ui64 GetSize() const {
72+
return BlobRange.Size;
7673
}
7774

7875
TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const ui64 recordsCount,
79-
const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last, const bool isDelete,
76+
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
8077
const NArrow::TSchemaSubset& subset)
8178
: BlobRange(blobRange)
8279
, WriteInfo(snapshot)
@@ -89,7 +86,7 @@ class TCommittedBlob {
8986
}
9087

9188
TCommittedBlob(const TBlobRange& blobRange, const TInsertWriteId writeId, const ui64 schemaVersion, const ui64 recordsCount,
92-
const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last, const bool isDelete,
89+
const NArrow::TReplaceKey& first, const NArrow::TReplaceKey& last, const bool isDelete,
9390
const NArrow::TSchemaSubset& subset)
9491
: BlobRange(blobRange)
9592
, WriteInfo(writeId)

ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) {
1313
dbTable.Insert(*dataPtr);
1414
return true;
1515
} else {
16+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_insertion");
1617
return false;
1718
}
1819
}
@@ -39,11 +40,15 @@ TInsertionSummary::TCounters TInsertTable::Commit(
3940
auto* pathInfo = Summary.GetPathInfoOptional(pathId);
4041
// There could be commit after drop: propose, drop, plan
4142
if (pathInfo && pathExists(pathId)) {
43+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", data->GetPathId())(
44+
"blob_range", data->GetBlobRange().ToString());
4245
auto committed = data->Commit(planStep, txId);
4346
dbTable.Commit(committed);
4447

4548
pathInfo->AddCommitted(std::move(committed));
4649
} else {
50+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())(
51+
"blob_range", data->GetBlobRange().ToString());
4752
dbTable.Abort(*data);
4853
Summary.AddAborted(std::move(*data));
4954
}
@@ -58,6 +63,8 @@ void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& wr
5863
for (auto writeId : writeIds) {
5964
// There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId.
6065
if (std::optional<TInsertedData> data = Summary.ExtractInserted(writeId)) {
66+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "abort_insertion")("path_id", data->GetPathId())(
67+
"blob_range", data->GetBlobRange().ToString())("write_id", writeId);
6168
dbTable.EraseInserted(*data);
6269
dbTable.Abort(*data);
6370
Summary.AddAborted(std::move(*data));
@@ -108,8 +115,8 @@ bool TInsertTable::Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant
108115
return dbTable.Load(*this, loadTime);
109116
}
110117

111-
std::vector<TCommittedBlob> TInsertTable::Read(
112-
ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const {
118+
std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot,
119+
const std::shared_ptr<arrow::Schema>& pkSchema, const TPKRangesFilter* pkRangesFilter) const {
113120
const TPathInfo* pInfo = Summary.GetPathInfoOptional(pathId);
114121
if (!pInfo) {
115122
return {};
@@ -120,15 +127,23 @@ std::vector<TCommittedBlob> TInsertTable::Read(
120127

121128
for (const auto& data : pInfo->GetCommitted()) {
122129
if (lockId || data.GetSnapshot() <= reqSnapshot) {
130+
auto start = data.GetMeta().GetFirstPK(pkSchema);
131+
auto finish = data.GetMeta().GetLastPK(pkSchema);
132+
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
133+
continue;
134+
}
123135
result.emplace_back(TCommittedBlob(data.GetBlobRange(), data.GetSnapshot(), data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
124-
data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema),
125-
data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
136+
start, finish, data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
126137
}
127138
}
128139
if (lockId) {
129140
for (const auto& [writeId, data] : pInfo->GetInserted()) {
130-
result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(),
131-
data.GetMeta().GetFirstPK(pkSchema), data.GetMeta().GetLastPK(pkSchema),
141+
auto start = data.GetMeta().GetFirstPK(pkSchema);
142+
auto finish = data.GetMeta().GetLastPK(pkSchema);
143+
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {
144+
continue;
145+
}
146+
result.emplace_back(TCommittedBlob(data.GetBlobRange(), writeId, data.GetSchemaVersion(), data.GetMeta().GetNumRows(), start, finish,
132147
data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete, data.GetMeta().GetSchemaSubset()));
133148
}
134149
}

ydb/core/tx/columnshard/engines/insert_table/insert_table.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
#include <ydb/core/tx/columnshard/counters/insert_table.h>
1010

1111
namespace NKikimr::NOlap {
12-
12+
class TPKRangesFilter;
1313
class IDbWrapper;
1414

1515
/// Use one table for inserted and committed blobs:
@@ -57,6 +57,7 @@ class TInsertTableAccessor {
5757
return Summary.AddInserted(std::move(data), load);
5858
}
5959
bool AddAborted(TInsertedData&& data, const bool load) {
60+
AFL_VERIFY_DEBUG(!Summary.ExtractInserted(data.GetInsertWriteId()));
6061
if (load) {
6162
AddBlobLink(data.GetBlobRange().BlobId);
6263
}
@@ -114,8 +115,8 @@ class TInsertTable: public TInsertTableAccessor {
114115
void EraseAbortedOnExecute(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr<IBlobsDeclareRemovingAction>& blobsAction);
115116
void EraseAbortedOnComplete(const TInsertedData& key);
116117

117-
std::vector<TCommittedBlob> Read(
118-
ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const;
118+
std::vector<TCommittedBlob> Read(ui64 pathId, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot,
119+
const std::shared_ptr<arrow::Schema>& pkSchema, const TPKRangesFilter* pkRangesFilter) const;
119120
bool Load(NIceDb::TNiceDb& db, IDbWrapper& dbTable, const TInstant loadTime);
120121

121122
TInsertWriteId BuildNextWriteId(NTabletFlatExecutor::TTransactionContext& txc);

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,9 @@ bool TInsertionSummary::HasCommitted(const TCommittedData& data) {
166166
const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData&& data, const bool load /*= false*/) {
167167
const TInsertWriteId writeId = data.GetInsertWriteId();
168168
Counters.Aborted.Add(data.BlobSize(), load);
169+
AFL_VERIFY_DEBUG(!Inserted.contains(writeId));
169170
auto insertInfo = Aborted.emplace(writeId, std::move(data));
170-
Y_ABORT_UNLESS(insertInfo.second);
171+
AFL_VERIFY(insertInfo.second)("write_id", writeId);
171172
return &insertInfo.first->second;
172173
}
173174

@@ -191,6 +192,7 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedDat
191192
const ui32 dataSize = data.BlobSize();
192193
const ui64 pathId = data.GetPathId();
193194
auto insertInfo = Inserted.emplace(writeId, std::move(data));
195+
AFL_VERIFY_DEBUG(!Aborted.contains(writeId));
194196
if (insertInfo.second) {
195197
OnNewInserted(GetPathInfo(pathId), dataSize, load);
196198
return &insertInfo.first->second;

ydb/core/tx/columnshard/engines/portions/portion_info.cpp

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,38 +32,57 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
3232
return result;
3333
}
3434

35-
ui64 TPortionInfo::GetColumnRawBytes(const std::vector<ui32>& columnIds, const bool validation) const {
36-
return GetColumnRawBytes(std::set<ui32>(columnIds.begin(), columnIds.end()), validation);
35+
ui64 TPortionInfo::GetColumnRawBytes(const std::set<ui32>& entityIds, const bool validation) const {
36+
ui64 sum = 0;
37+
const auto aggr = [&](const TColumnRecord& r) {
38+
sum += r.GetMeta().GetRawBytes();
39+
};
40+
AggregateIndexChunksData(aggr, Records, &entityIds, validation);
41+
return sum;
3742
}
3843

39-
ui64 TPortionInfo::GetColumnRawBytes(const std::optional<std::set<ui32>>& entityIds, const bool validation) const {
44+
ui64 TPortionInfo::GetColumnBlobBytes(const std::set<ui32>& entityIds, const bool validation) const {
45+
ui64 sum = 0;
46+
const auto aggr = [&](const TColumnRecord& r) {
47+
sum += r.GetBlobRange().GetSize();
48+
};
49+
AggregateIndexChunksData(aggr, Records, &entityIds, validation);
50+
return sum;
51+
}
52+
53+
ui64 TPortionInfo::GetColumnRawBytes(const bool validation) const {
4054
ui64 sum = 0;
4155
const auto aggr = [&](const TColumnRecord& r) {
4256
sum += r.GetMeta().GetRawBytes();
4357
};
44-
AggregateIndexChunksData(aggr, Records, entityIds, validation);
58+
AggregateIndexChunksData(aggr, Records, nullptr, validation);
4559
return sum;
4660
}
4761

48-
ui64 TPortionInfo::GetColumnBlobBytes(const std::optional<std::set<ui32>>& entityIds, const bool validation) const {
62+
ui64 TPortionInfo::GetColumnBlobBytes(const bool validation) const {
4963
ui64 sum = 0;
5064
const auto aggr = [&](const TColumnRecord& r) {
5165
sum += r.GetBlobRange().GetSize();
5266
};
53-
AggregateIndexChunksData(aggr, Records, entityIds, validation);
67+
AggregateIndexChunksData(aggr, Records, nullptr, validation);
5468
return sum;
5569
}
5670

57-
ui64 TPortionInfo::GetColumnBlobBytes(const std::vector<ui32>& columnIds, const bool validation) const {
58-
return GetColumnBlobBytes(std::set<ui32>(columnIds.begin(), columnIds.end()), validation);
71+
ui64 TPortionInfo::GetIndexRawBytes(const std::set<ui32>& entityIds, const bool validation) const {
72+
ui64 sum = 0;
73+
const auto aggr = [&](const TIndexChunk& r) {
74+
sum += r.GetRawBytes();
75+
};
76+
AggregateIndexChunksData(aggr, Indexes, &entityIds, validation);
77+
return sum;
5978
}
6079

61-
ui64 TPortionInfo::GetIndexRawBytes(const std::optional<std::set<ui32>>& entityIds, const bool validation) const {
80+
ui64 TPortionInfo::GetIndexRawBytes(const bool validation) const {
6281
ui64 sum = 0;
6382
const auto aggr = [&](const TIndexChunk& r) {
6483
sum += r.GetRawBytes();
6584
};
66-
AggregateIndexChunksData(aggr, Indexes, entityIds, validation);
85+
AggregateIndexChunksData(aggr, Indexes, nullptr, validation);
6786
return sum;
6887
}
6988

ydb/core/tx/columnshard/engines/portions/portion_info.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class TPortionInfo {
9898
}
9999

100100
template <class TAggregator, class TChunkInfo>
101-
static void AggregateIndexChunksData(const TAggregator& aggr, const std::vector<TChunkInfo>& chunks, const std::optional<std::set<ui32>>& columnIds, const bool validation) {
101+
static void AggregateIndexChunksData(const TAggregator& aggr, const std::vector<TChunkInfo>& chunks, const std::set<ui32>* columnIds, const bool validation) {
102102
if (columnIds) {
103103
auto itColumn = columnIds->begin();
104104
auto itRecord = chunks.begin();
@@ -537,7 +537,8 @@ class TPortionInfo {
537537
return result;
538538
}
539539

540-
ui64 GetIndexRawBytes(const std::optional<std::set<ui32>>& columnIds = {}, const bool validation = true) const;
540+
ui64 GetIndexRawBytes(const std::set<ui32>& columnIds, const bool validation = true) const;
541+
ui64 GetIndexRawBytes(const bool validation = true) const;
541542
ui64 GetIndexBlobBytes() const noexcept {
542543
ui64 sum = 0;
543544
for (const auto& rec : Indexes) {
@@ -546,11 +547,11 @@ class TPortionInfo {
546547
return sum;
547548
}
548549

549-
ui64 GetColumnRawBytes(const std::vector<ui32>& columnIds, const bool validation = true) const;
550-
ui64 GetColumnRawBytes(const std::optional<std::set<ui32>>& columnIds = {}, const bool validation = true) const;
550+
ui64 GetColumnRawBytes(const std::set<ui32>& columnIds, const bool validation = true) const;
551+
ui64 GetColumnRawBytes(const bool validation = true) const;
551552

552-
ui64 GetColumnBlobBytes(const std::vector<ui32>& columnIds, const bool validation = true) const;
553-
ui64 GetColumnBlobBytes(const std::optional<std::set<ui32>>& columnIds = {}, const bool validation = true) const;
553+
ui64 GetColumnBlobBytes(const std::set<ui32>& columnIds, const bool validation = true) const;
554+
ui64 GetColumnBlobBytes(const bool validation = true) const;
554555

555556
ui64 GetTotalBlobBytes() const noexcept {
556557
return GetIndexBlobBytes() + GetColumnBlobBytes();

ydb/core/tx/columnshard/engines/predicate/range.cpp

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,7 @@ NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(const arrow::Datum& d
4040
}
4141

4242
bool TPKRangeFilter::IsPortionInUsage(const TPortionInfo& info) const {
43-
if (const auto& from = PredicateFrom.GetReplaceKey()) {
44-
const auto& portionEnd = info.IndexKeyEnd();
45-
const int commonSize = std::min(from->Size(), portionEnd.Size());
46-
if (std::is_gt(from->ComparePartNotNull(portionEnd, commonSize))) {
47-
return false;
48-
}
49-
}
50-
51-
if (const auto& to = PredicateTo.GetReplaceKey()) {
52-
const auto& portionStart = info.IndexKeyStart();
53-
const int commonSize = std::min(to->Size(), portionStart.Size());
54-
if (std::is_lt(to->ComparePartNotNull(portionStart, commonSize))) {
55-
return false;
56-
}
57-
}
58-
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("start", info.IndexKeyStart().DebugString())("end", info.IndexKeyEnd().DebugString())(
59-
// "from", PredicateFrom.DebugString())("to", PredicateTo.DebugString());
60-
61-
return true;
43+
return IsPortionInPartialUsage(info.IndexKeyStart(), info.IndexKeyEnd()) != TPKRangeFilter::EUsageClass::DontUsage;
6244
}
6345

6446
TPKRangeFilter::EUsageClass TPKRangeFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const {

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ ISnapshotSchema::TPtr TReadMetadataBase::GetLoadSchemaVerified(const TPortionInf
2525

2626
std::vector<TCommittedBlob> TDataStorageAccessor::GetCommitedBlobs(const TReadDescription& readDescription,
2727
const std::shared_ptr<arrow::Schema>& pkSchema, const std::optional<ui64> lockId, const TSnapshot& reqSnapshot) const {
28-
return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema));
28+
AFL_VERIFY(readDescription.PKRangesFilter);
29+
return std::move(InsertTable->Read(readDescription.PathId, lockId, reqSnapshot, pkSchema, &*readDescription.PKRangesFilter));
2930
}
3031

3132
} // namespace NKikimr::NOlap::NReader

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ struct TReadMetadata : public TReadMetadataBase {
106106
return GetProgram().HasProcessingColumnIds();
107107
}
108108

109+
ui64 GetPathId() const {
110+
return PathId;
111+
}
112+
109113
std::shared_ptr<TSelectInfo> SelectInfo;
110114
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
111115
std::vector<TCommittedBlob> CommittedBlobs;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,16 @@ ui64 TSpecialReadContext::GetMemoryForSources(const THashMap<ui32, std::shared_p
3232
std::shared_ptr<TFetchingScript> TSpecialReadContext::GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) {
3333
const bool needSnapshots = !source->GetExclusiveIntervalOnly() || ReadMetadata->GetRequestSnapshot() < source->GetRecordSnapshotMax() ||
3434
!source->IsSourceInMemory();
35-
bool partialUsageByPK = false;
36-
{
37-
const TPKRangeFilter::EUsageClass usage =
38-
ReadMetadata->GetPKRangesFilter().IsPortionInPartialUsage(source->GetStartReplaceKey(), source->GetFinishReplaceKey());
39-
switch (usage) {
35+
const bool partialUsageByPK = [&]() {
36+
switch (source->GetUsageClass()) {
4037
case TPKRangeFilter::EUsageClass::PartialUsage:
41-
partialUsageByPK = true;
42-
break;
38+
return true;
4339
case TPKRangeFilter::EUsageClass::DontUsage:
44-
partialUsageByPK = true;
45-
break;
40+
return true;
4641
case TPKRangeFilter::EUsageClass::FullUsage:
47-
partialUsageByPK = false;
48-
break;
42+
return false;
4943
}
50-
}
44+
}();
5145
const bool useIndexes = (IndexChecker ? source->HasIndexes(IndexChecker->GetIndexIds()) : false);
5246
const bool isWholeExclusiveSource = source->GetExclusiveIntervalOnly() && source->IsSourceInMemory();
5347
const bool hasDeletions = source->GetHasDeletions();

0 commit comments

Comments
 (0)