Skip to content

Commit 6fc18f8

Browse files
Speed up SIMPLE scanner (#12164)
1 parent 1cf8c98 commit 6fc18f8

36 files changed

+633
-904
lines changed

ydb/core/formats/arrow/accessor/plain/accessor.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,10 @@ class TChunkAccessor {
4343
return (ui64)ChunkedArray->num_chunks();
4444
}
4545
ui64 GetChunkLength(const ui32 idx) const {
46-
return (ui64)ChunkedArray->chunk(idx)->length();
46+
return (ui64)ChunkedArray->chunks()[idx]->length();
4747
}
4848
void OnArray(const ui32 idx, const ui32 startPosition) const {
49-
const auto& arr = ChunkedArray->chunk(idx);
50-
*Result = IChunkedArray::TLocalDataAddress(arr, startPosition, idx);
49+
*Result = IChunkedArray::TLocalDataAddress(ChunkedArray->chunk(idx), startPosition, idx);
5150
}
5251
};
5352

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,11 +1397,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
13971397
for (auto&& i : PortionsByPath) {
13981398
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first);
13991399
for (auto&& p : i.second) {
1400-
{
1401-
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
1402-
if (!rowset.IsReady()) {
1403-
reask = true;
1404-
}
1400+
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
1401+
continue;
14051402
}
14061403
{
14071404
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
@@ -1433,8 +1430,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14331430
}
14341431
}
14351432
}
1436-
{
1437-
std::vector<NOlap::TIndexChunkLoadContext> indexes;
1433+
std::vector<NOlap::TIndexChunkLoadContext> indexes;
1434+
if (p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
14381435
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
14391436
if (!rowset.IsReady()) {
14401437
return false;
@@ -1445,8 +1442,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
14451442
return false;
14461443
}
14471444
}
1448-
constructor.SetIndexes(std::move(indexes));
14491445
}
1446+
constructor.SetIndexes(std::move(indexes));
14501447
FetchedAccessors.emplace_back(std::move(constructor));
14511448
i.second.pop_back();
14521449
}

ydb/core/tx/columnshard/data_accessor/request.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,11 @@ class TPathFetchingState {
145145
AFL_VERIFY(Portions.emplace(portion->GetPortionId(), portion).second);
146146
}
147147

148-
void AddAccessor(const TPortionDataAccessor& accessor) {
148+
void AddAccessor(
149+
const TPortionDataAccessor& accessor, const std::optional<std::set<ui32>>& columnIds, const std::optional<std::set<ui32>>& indexIds) {
149150
AFL_VERIFY(Stage == EFetchStage::Fetching);
150151
AFL_VERIFY(Portions.erase(accessor.GetPortionInfo().GetPortionId()));
151-
AFL_VERIFY(PortionAccessors.emplace(accessor.GetPortionInfo().GetPortionId(), accessor).second);
152+
AFL_VERIFY(PortionAccessors.emplace(accessor.GetPortionInfo().GetPortionId(), accessor.Extract(columnIds, indexIds)).second);
152153
if (Portions.empty()) {
153154
AFL_VERIFY(Stage == EFetchStage::Fetching);
154155
Stage = EFetchStage::Fetched;
@@ -176,8 +177,8 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
176177
THashMap<ui64, TPathFetchingState> PathIdStatus;
177178
THashSet<ui64> PathIds;
178179
TDataAccessorsResult AccessorsByPathId;
179-
std::optional<std::vector<ui32>> ColumnIds;
180-
std::optional<std::vector<ui32>> IndexIds;
180+
YDB_READONLY_DEF(std::optional<std::set<ui32>>, ColumnIds);
181+
std::optional<std::set<ui32>> IndexIds;
181182

182183
TAtomicCounter PreparingCount = 0;
183184
TAtomicCounter FetchingCount = 0;
@@ -197,6 +198,11 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
197198
}
198199

199200
public:
201+
void SetColumnIds(const std::set<ui32>& columnIds) {
202+
AFL_VERIFY(!ColumnIds);
203+
ColumnIds = columnIds;
204+
}
205+
200206
TString DebugString() const {
201207
TStringBuilder sb;
202208
sb << "request_id=" << RequestId << ";";
@@ -291,7 +297,7 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
291297
{
292298
auto itStatus = PathIdStatus.find(pathId);
293299
AFL_VERIFY(itStatus != PathIdStatus.end());
294-
itStatus->second.AddAccessor(accessor);
300+
itStatus->second.AddAccessor(accessor, ColumnIds, IndexIds);
295301
if (itStatus->second.IsFinished()) {
296302
AFL_VERIFY(FetchingCount.Dec() >= 0);
297303
ReadyCount.Inc();

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> TPortionDataAccessor::TP
770770
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> columns;
771771
std::vector<std::shared_ptr<arrow::Field>> fields;
772772
for (auto&& i : Columns) {
773-
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("column", i.GetField()->ToString())("id", i.GetColumnId());
773+
// NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("column", i.GetField()->ToString())("column_id", i.GetColumnId());
774774
if (sequentialColumnIds.contains(i.GetColumnId())) {
775775
columns.emplace_back(i.AssembleForSeqAccess());
776776
} else {

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,61 @@ class TPortionDataAccessor {
4545
(Indexes ? (Indexes->size() * sizeof(TIndexChunk)) : 0);
4646
}
4747

48+
class TExtractContext {
49+
private:
50+
YDB_ACCESSOR_DEF(std::optional<std::set<ui32>>, ColumnIds);
51+
YDB_ACCESSOR_DEF(std::optional<std::set<ui32>>, IndexIds);
52+
53+
public:
54+
TExtractContext() = default;
55+
};
56+
57+
TPortionDataAccessor Extract(const std::optional<std::set<ui32>>& columnIds, const std::optional<std::set<ui32>>& indexIds) const {
58+
return Extract(TExtractContext().SetColumnIds(columnIds).SetIndexIds(indexIds));
59+
}
60+
61+
TPortionDataAccessor Extract(const TExtractContext& context) const {
62+
AFL_VERIFY(Records);
63+
std::vector<TColumnRecord> extractedRecords;
64+
if (context.GetColumnIds()) {
65+
auto itRec = Records->begin();
66+
auto itExt = context.GetColumnIds()->begin();
67+
while (itRec != Records->end() && itExt != context.GetColumnIds()->end()) {
68+
if (itRec->GetEntityId() == *itExt) {
69+
extractedRecords.emplace_back(*itRec);
70+
++itRec;
71+
} else if (itRec->GetEntityId() < *itExt) {
72+
++itRec;
73+
} else {
74+
++itExt;
75+
}
76+
}
77+
} else {
78+
extractedRecords = *Records;
79+
}
80+
81+
AFL_VERIFY(Indexes);
82+
std::vector<TIndexChunk> extractedIndexes;
83+
if (context.GetIndexIds()) {
84+
auto itIdx = Indexes->begin();
85+
auto itExt = context.GetIndexIds()->begin();
86+
while (itIdx != Indexes->end() && itExt != context.GetIndexIds()->end()) {
87+
if (itIdx->GetEntityId() == *itExt) {
88+
extractedIndexes.emplace_back(*itIdx);
89+
++itIdx;
90+
} else if (itIdx->GetEntityId() < *itExt) {
91+
++itIdx;
92+
} else {
93+
++itExt;
94+
}
95+
}
96+
} else {
97+
extractedIndexes = *Indexes;
98+
}
99+
100+
return TPortionDataAccessor(PortionInfo, std::move(extractedRecords), std::move(extractedIndexes), false);
101+
}
102+
48103
const std::vector<TColumnRecord>& TestGetRecords() const {
49104
AFL_VERIFY(Records);
50105
return std::move(*Records);

ydb/core/tx/columnshard/engines/reader/actor/actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
7373
}
7474

7575
void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
76-
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN) ("SelfId", SelfId())(
77-
"TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen));
76+
// TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN) ("SelfId", SelfId())(
77+
// "TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen));
7878
auto g = Stats->MakeGuard("bootstrap");
7979
ScanActorId = ctx.SelfID;
8080

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/columns_set.cpp renamed to ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include <util/string/join.h>
33
#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
44

5-
namespace NKikimr::NOlap::NReader::NPlain {
5+
namespace NKikimr::NOlap::NReader::NCommon {
66

77
TString TColumnsSet::DebugString() const {
88
return TStringBuilder() << "("

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/columns_set.h renamed to ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
#include <util/string/join.h>
88

9-
namespace NKikimr::NOlap::NReader::NSimple {
9+
namespace NKikimr::NOlap::NReader::NCommon {
1010

1111
enum class EMemType {
1212
Blob,
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#include "fetched_data.h"
2+
3+
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
4+
#include <ydb/library/formats/arrow/common/validation.h>
5+
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
6+
7+
namespace NKikimr::NOlap::NReader::NCommon {
8+
9+
void TFetchedData::SyncTableColumns(const std::vector<std::shared_ptr<arrow::Field>>& fields, const ISnapshotSchema& schema) {
10+
for (auto&& i : fields) {
11+
if (Table->GetSchema()->GetFieldByName(i->name())) {
12+
continue;
13+
}
14+
Table
15+
->AddField(i, std::make_shared<NArrow::NAccessor::TTrivialArray>(NArrow::TThreadSimpleArraysCache::Get(
16+
i->type(), schema.GetExternalDefaultValueVerified(i->name()), Table->num_rows())))
17+
.Validate();
18+
}
19+
}
20+
21+
} // namespace NKikimr::NOlap

0 commit comments

Comments
 (0)