Skip to content

Commit 84098b0

Browse files
zinalazevaykin
authored andcommitted
stream lookup: GetRangePartitioning() optimization (#13180)
1 parent c99a4c8 commit 84098b0

File tree

2 files changed

+28
-12
lines changed

2 files changed

+28
-12
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,26 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt
2222

2323
YQL_ENSURE(partitionInfo);
2424

25+
// Binary search of the index to start with.
26+
size_t idxStart = 0;
27+
size_t idxFinish = partitionInfo->size();
28+
while ((idxFinish - idxStart) > 1) {
29+
size_t idxCur = (idxFinish + idxStart) / 2;
30+
const auto& partCur = (*partitionInfo)[idxCur].Range->EndKeyPrefix.GetCells();
31+
YQL_ENSURE(partCur.size() <= keyColumnTypes.size());
32+
int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(),
33+
std::min(partCur.size(), range.From.size()));
34+
if (cmp < 0) {
35+
idxStart = idxCur;
36+
} else {
37+
idxFinish = idxCur;
38+
}
39+
}
40+
2541
std::vector<TCell> minusInf(keyColumnTypes.size());
2642

2743
std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition;
28-
for (size_t idx = 0; idx < partitionInfo->size(); ++idx) {
44+
for (size_t idx = idxStart; idx < partitionInfo->size(); ++idx) {
2945
TTableRange partitionRange{
3046
idx == 0 ? minusInf : (*partitionInfo)[idx - 1].Range->EndKeyPrefix.GetCells(),
3147
idx == 0 ? true : !(*partitionInfo)[idx - 1].Range->IsInclusive,
@@ -108,6 +124,12 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
108124
column.GetTypeInfo().GetPgTypeMod()
109125
});
110126
}
127+
128+
KeyColumnTypes.resize(KeyColumns.size());
129+
for (const auto& [_, columnInfo] : KeyColumns) {
130+
YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(KeyColumnTypes.size()));
131+
KeyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
132+
}
111133
}
112134

113135
TKqpStreamLookupWorker::~TKqpStreamLookupWorker() {
@@ -121,16 +143,6 @@ TTableId TKqpStreamLookupWorker::GetTableId() const {
121143
return TableId;
122144
}
123145

124-
std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() const {
125-
std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
126-
for (const auto& [_, columnInfo] : KeyColumns) {
127-
YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size()));
128-
keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
129-
}
130-
131-
return keyColumnTypes;
132-
}
133-
134146
class TKqpLookupRows : public TKqpStreamLookupWorker {
135147
public:
136148
TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ class TKqpStreamLookupWorker {
5252

5353
virtual std::string GetTablePath() const;
5454
virtual TTableId GetTableId() const;
55-
virtual std::vector<NScheme::TTypeInfo> GetKeyColumnTypes() const;
55+
56+
const std::vector<NScheme::TTypeInfo>& GetKeyColumnTypes() const {
57+
return KeyColumnTypes;
58+
}
5659

5760
virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
5861
virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
@@ -74,6 +77,7 @@ class TKqpStreamLookupWorker {
7477
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
7578
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
7679
std::vector<TSysTables::TTableColumnInfo> Columns;
80+
std::vector<NScheme::TTypeInfo> KeyColumnTypes;
7781
};
7882

7983
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,

0 commit comments

Comments
 (0)