Skip to content

Commit 4226868

Browse files
refactoring for ranges control (#7758)
1 parent 0a33939 commit 4226868

File tree

11 files changed

+352
-178
lines changed

11 files changed

+352
-178
lines changed

ydb/core/tx/columnshard/engines/predicate/container.cpp

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -115,59 +115,61 @@ bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) {
115115
}
116116
}
117117

118-
std::optional<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo) {
118+
TConclusion<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(
119+
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
119120
if (!object || object->Empty()) {
120121
return TPredicateContainer(NArrow::ECompareType::GREATER_OR_EQUAL);
121122
} else {
122123
if (!object->Good()) {
123124
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'from' predicate");
124-
return {};
125+
return TConclusionStatus::Fail("not good 'from' predicate");
125126
}
126127
if (!object->IsFrom()) {
127128
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'from' predicate not is from");
128-
return {};
129+
return TConclusionStatus::Fail("'from' predicate not is from");
129130
}
130-
if (indexInfo) {
131+
if (pkSchema) {
131132
auto cNames = object->ColumnNames();
132133
i32 countSortingFields = 0;
133-
for (i32 i = 0; i < indexInfo->GetReplaceKey()->num_fields(); ++i) {
134-
if (i < (int)cNames.size() && cNames[i] == indexInfo->GetReplaceKey()->field(i)->name()) {
134+
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
135+
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
135136
++countSortingFields;
136137
} else {
137138
break;
138139
}
139140
}
140-
Y_ABORT_UNLESS(countSortingFields == object->Batch->num_columns());
141+
AFL_VERIFY(countSortingFields == object->Batch->num_columns())("count", countSortingFields)("object", object->Batch->num_columns());
141142
}
142-
return TPredicateContainer(object, indexInfo ? ExtractKey(*object, indexInfo->GetReplaceKey()) : nullptr);
143+
return TPredicateContainer(object, pkSchema ? ExtractKey(*object, pkSchema) : nullptr);
143144
}
144145
}
145146

146-
std::optional<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo) {
147+
TConclusion<TPredicateContainer> TPredicateContainer::BuildPredicateTo(
148+
std::shared_ptr<TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
147149
if (!object || object->Empty()) {
148150
return TPredicateContainer(NArrow::ECompareType::LESS_OR_EQUAL);
149151
} else {
150152
if (!object->Good()) {
151153
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'to' predicate");
152-
return {};
154+
return TConclusionStatus::Fail("not good 'to' predicate");
153155
}
154156
if (!object->IsTo()) {
155157
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'to' predicate not is to");
156-
return {};
158+
return TConclusionStatus::Fail("'to' predicate not is to");
157159
}
158-
if (indexInfo) {
160+
if (pkSchema) {
159161
auto cNames = object->ColumnNames();
160162
i32 countSortingFields = 0;
161-
for (i32 i = 0; i < indexInfo->GetReplaceKey()->num_fields(); ++i) {
162-
if (i < (int)cNames.size() && cNames[i] == indexInfo->GetReplaceKey()->field(i)->name()) {
163+
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
164+
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
163165
++countSortingFields;
164166
} else {
165167
break;
166168
}
167169
}
168170
Y_ABORT_UNLESS(countSortingFields == object->Batch->num_columns());
169171
}
170-
return TPredicateContainer(object, indexInfo ? TPredicateContainer::ExtractKey(*object, indexInfo->GetReplaceKey()) : nullptr);
172+
return TPredicateContainer(object, pkSchema ? TPredicateContainer::ExtractKey(*object, pkSchema) : nullptr);
171173
}
172174
}
173175

ydb/core/tx/columnshard/engines/predicate/container.h

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
#pragma once
22
#include "predicate.h"
3+
34
#include <ydb/core/formats/arrow/arrow_filter.h>
45
#include <ydb/core/formats/arrow/replace_key.h>
6+
57
#include <ydb/library/accessor/accessor.h>
8+
69
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
10+
711
#include <optional>
812

913
namespace NKikimr::NOlap {
@@ -45,6 +49,9 @@ class TPredicateContainer {
4549
}
4650

4751
public:
52+
NArrow::ECompareType GetCompareType() const {
53+
return CompareType;
54+
}
4855

4956
const std::shared_ptr<NArrow::TReplaceKey>& GetReplaceKey() const {
5057
return ReplaceKey;
@@ -55,8 +62,8 @@ class TPredicateContainer {
5562
}
5663

5764
template <class TArrayColumn>
58-
std::optional<typename TArrayColumn::value_type> Get(const ui32 colIndex, const ui32 rowIndex,
59-
const std::optional<typename TArrayColumn::value_type> defaultValue = {}) const {
65+
std::optional<typename TArrayColumn::value_type> Get(
66+
const ui32 colIndex, const ui32 rowIndex, const std::optional<typename TArrayColumn::value_type> defaultValue = {}) const {
6067
if (!Object) {
6168
return defaultValue;
6269
} else {
@@ -80,13 +87,15 @@ class TPredicateContainer {
8087
return TPredicateContainer(NArrow::ECompareType::GREATER_OR_EQUAL);
8188
}
8289

83-
static std::optional<TPredicateContainer> BuildPredicateFrom(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
90+
static TConclusion<TPredicateContainer> BuildPredicateFrom(
91+
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema);
8492

8593
static TPredicateContainer BuildNullPredicateTo() {
8694
return TPredicateContainer(NArrow::ECompareType::LESS_OR_EQUAL);
8795
}
8896

89-
static std::optional<TPredicateContainer> BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
97+
static TConclusion<TPredicateContainer> BuildPredicateTo(
98+
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema);
9099

91100
NKikimr::NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const {
92101
if (!Object) {
@@ -96,4 +105,4 @@ class TPredicateContainer {
96105
}
97106
};
98107

99-
}
108+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/predicate/filter.cpp

Lines changed: 128 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
#include "filter.h"
2+
3+
#include <ydb/core/formats/arrow/serializer/native.h>
4+
25
#include <ydb/library/actors/core/log.h>
36

47
namespace NKikimr::NOlap {
@@ -14,43 +17,50 @@ NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(const arrow::Datum&
1417
return result;
1518
}
1619

17-
bool TPKRangesFilter::Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo) {
20+
TConclusionStatus TPKRangesFilter::Add(
21+
std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const std::shared_ptr<arrow::Schema>& pkSchema) {
1822
if ((!f || f->Empty()) && (!t || t->Empty())) {
19-
return true;
23+
return TConclusionStatus::Success();
24+
}
25+
auto fromContainerConclusion = TPredicateContainer::BuildPredicateFrom(f, pkSchema);
26+
if (fromContainerConclusion.IsFail()) {
27+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect from container")(
28+
"from", fromContainerConclusion.GetErrorMessage());
29+
return fromContainerConclusion;
2030
}
21-
auto fromContainer = TPredicateContainer::BuildPredicateFrom(f, indexInfo);
22-
auto toContainer = TPredicateContainer::BuildPredicateTo(t, indexInfo);
23-
if (!fromContainer || !toContainer) {
24-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect from/to containers")("from", !!fromContainer)("to", !!toContainer);
25-
return false;
31+
auto toContainerConclusion = TPredicateContainer::BuildPredicateTo(t, pkSchema);
32+
if (toContainerConclusion.IsFail()) {
33+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "incorrect to container")(
34+
"from", toContainerConclusion.GetErrorMessage());
35+
return toContainerConclusion;
2636
}
2737
if (SortedRanges.size() && !FakeRanges) {
2838
if (ReverseFlag) {
29-
if (fromContainer->CrossRanges(SortedRanges.front().GetPredicateTo())) {
39+
if (fromContainerConclusion->CrossRanges(SortedRanges.front().GetPredicateTo())) {
3040
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not sorted sequence");
31-
return false;
41+
return TConclusionStatus::Fail("not sorted sequence");
3242
}
3343
} else {
34-
if (fromContainer->CrossRanges(SortedRanges.back().GetPredicateTo())) {
44+
if (fromContainerConclusion->CrossRanges(SortedRanges.back().GetPredicateTo())) {
3545
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not sorted sequence");
36-
return false;
46+
return TConclusionStatus::Fail("not sorted sequence");
3747
}
3848
}
3949
}
40-
auto pkRangeFilter = TPKRangeFilter::Build(std::move(*fromContainer), std::move(*toContainer));
41-
if (!pkRangeFilter) {
42-
return false;
50+
auto pkRangeFilterConclusion = TPKRangeFilter::Build(fromContainerConclusion.DetachResult(), toContainerConclusion.DetachResult());
51+
if (pkRangeFilterConclusion.IsFail()) {
52+
return pkRangeFilterConclusion;
4353
}
4454
if (FakeRanges) {
4555
FakeRanges = false;
4656
SortedRanges.clear();
4757
}
4858
if (ReverseFlag) {
49-
SortedRanges.emplace_front(std::move(*pkRangeFilter));
59+
SortedRanges.emplace_front(pkRangeFilterConclusion.DetachResult());
5060
} else {
51-
SortedRanges.emplace_back(std::move(*pkRangeFilter));
61+
SortedRanges.emplace_back(pkRangeFilterConclusion.DetachResult());
5262
}
53-
return true;
63+
return TConclusionStatus::Success();
5464
}
5565

5666
TString TPKRangesFilter::DebugString() const {
@@ -84,6 +94,15 @@ bool TPKRangesFilter::IsPortionInUsage(const TPortionInfo& info) const {
8494
return SortedRanges.empty();
8595
}
8696

97+
bool TPKRangesFilter::CheckPoint(const NArrow::TReplaceKey& point) const {
98+
for (auto&& i : SortedRanges) {
99+
if (i.CheckPoint(point)) {
100+
return true;
101+
}
102+
}
103+
return SortedRanges.empty();
104+
}
105+
87106
TPKRangeFilter::EUsageClass TPKRangesFilter::IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const {
88107
for (auto&& i : SortedRanges) {
89108
switch (i.IsPortionInPartialUsage(start, end)) {
@@ -99,11 +118,101 @@ TPKRangeFilter::EUsageClass TPKRangesFilter::IsPortionInPartialUsage(const NArro
99118
}
100119

101120
TPKRangesFilter::TPKRangesFilter(const bool reverse)
102-
: ReverseFlag(reverse)
103-
{
121+
: ReverseFlag(reverse) {
104122
auto range = TPKRangeFilter::Build(TPredicateContainer::BuildNullPredicateFrom(), TPredicateContainer::BuildNullPredicateTo());
105123
Y_ABORT_UNLESS(range);
106124
SortedRanges.emplace_back(*range);
107125
}
108126

127+
std::shared_ptr<arrow::RecordBatch> TPKRangesFilter::SerializeToRecordBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const {
128+
auto fullSchema = NArrow::TStatusValidator::GetValid(
129+
pkSchema->AddField(pkSchema->num_fields(), std::make_shared<arrow::Field>(".ydb_operation_type", arrow::uint32())));
130+
auto builders = NArrow::MakeBuilders(fullSchema, SortedRanges.size() * 2);
131+
for (auto&& i : SortedRanges) {
132+
for (ui32 idx = 0; idx < (ui32)pkSchema->num_fields(); ++idx) {
133+
if (idx < i.GetPredicateFrom().GetReplaceKey()->Size()) {
134+
AFL_VERIFY(NArrow::Append(
135+
*builders[idx], i.GetPredicateFrom().GetReplaceKey()->Column(idx), i.GetPredicateFrom().GetReplaceKey()->GetPosition()));
136+
} else {
137+
NArrow::TStatusValidator::Validate(builders[idx]->AppendNull());
138+
}
139+
}
140+
NArrow::Append<arrow::UInt32Type>(*builders[pkSchema->num_fields()], (ui32)i.GetPredicateFrom().GetCompareType());
141+
142+
for (ui32 idx = 0; idx < (ui32)pkSchema->num_fields(); ++idx) {
143+
if (idx < i.GetPredicateTo().GetReplaceKey()->Size()) {
144+
AFL_VERIFY(NArrow::Append(
145+
*builders[idx], i.GetPredicateTo().GetReplaceKey()->Column(idx), i.GetPredicateTo().GetReplaceKey()->GetPosition()));
146+
} else {
147+
NArrow::TStatusValidator::Validate(builders[idx]->AppendNull());
148+
}
149+
}
150+
NArrow::Append<arrow::UInt32Type>(*builders[pkSchema->num_fields()], (ui32)i.GetPredicateTo().GetCompareType());
151+
}
152+
return arrow::RecordBatch::Make(fullSchema, SortedRanges.size() * 2, NArrow::Finish(std::move(builders)));
109153
}
154+
155+
std::shared_ptr<NKikimr::NOlap::TPKRangesFilter> TPKRangesFilter::BuildFromRecordBatchLines(
156+
const std::shared_ptr<arrow::RecordBatch>& batch, const bool reverse) {
157+
std::shared_ptr<TPKRangesFilter> result = std::make_shared<TPKRangesFilter>(reverse);
158+
for (ui32 i = 0; i < batch->num_rows(); ++i) {
159+
auto batchRow = batch->Slice(i, 1);
160+
auto pFrom = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::GreaterEqual, batchRow);
161+
auto pTo = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::LessEqual, batchRow);
162+
result->Add(pFrom, pTo, batch->schema()).Validate();
163+
}
164+
return result;
165+
}
166+
167+
std::shared_ptr<NKikimr::NOlap::TPKRangesFilter> TPKRangesFilter::BuildFromRecordBatchFull(
168+
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse) {
169+
std::shared_ptr<TPKRangesFilter> result = std::make_shared<TPKRangesFilter>(reverse);
170+
auto pkBatch = NArrow::TColumnOperator().Adapt(batch, pkSchema).DetachResult();
171+
auto c = batch->GetColumnByName(".ydb_operation_type");
172+
AFL_VERIFY(c);
173+
AFL_VERIFY(c->type_id() == arrow::Type::UINT32);
174+
auto cUi32 = static_pointer_cast<arrow::UInt32Array>(c);
175+
for (ui32 i = 0; i < batch->num_rows();) {
176+
std::shared_ptr<NOlap::TPredicate> pFrom;
177+
std::shared_ptr<NOlap::TPredicate> pTo;
178+
{
179+
auto batchRow = TPredicate::CutNulls(batch->Slice(i, 1));
180+
NKernels::EOperation op = (NKernels::EOperation)cUi32->Value(i);
181+
if (op == NKernels::EOperation::GreaterEqual || op == NKernels::EOperation::Greater) {
182+
pFrom = std::make_shared<NOlap::TPredicate>(op, batchRow);
183+
} else if (op == NKernels::EOperation::Equal) {
184+
pFrom = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::GreaterEqual, batchRow);
185+
} else {
186+
AFL_VERIFY(false);
187+
}
188+
if (op != NKernels::EOperation::Equal) {
189+
++i;
190+
}
191+
}
192+
{
193+
auto batchRow = TPredicate::CutNulls(batch->Slice(i, 1));
194+
NKernels::EOperation op = (NKernels::EOperation)cUi32->Value(i);
195+
if (op == NKernels::EOperation::LessEqual || op == NKernels::EOperation::Less) {
196+
pTo = std::make_shared<NOlap::TPredicate>(op, batchRow);
197+
} else if (op == NKernels::EOperation::Equal) {
198+
pTo = std::make_shared<NOlap::TPredicate>(NKernels::EOperation::LessEqual, batchRow);
199+
} else {
200+
AFL_VERIFY(false);
201+
}
202+
}
203+
result->Add(pFrom, pTo, pkSchema).Validate();
204+
}
205+
return result;
206+
}
207+
208+
std::shared_ptr<NKikimr::NOlap::TPKRangesFilter> TPKRangesFilter::BuildFromString(
209+
const TString& data, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse) {
210+
auto batch = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TNativeSerializer().Deserialize(data));
211+
return BuildFromRecordBatchFull(batch, pkSchema, reverse);
212+
}
213+
214+
TString TPKRangesFilter::SerializeToString(const std::shared_ptr<arrow::Schema>& pkSchema) const {
215+
return NArrow::NSerialization::TNativeSerializer().SerializeFull(SerializeToRecordBatch(pkSchema));
216+
}
217+
218+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/predicate/filter.h

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@ class TPKRangesFilter {
99
bool FakeRanges = true;
1010
std::deque<TPKRangeFilter> SortedRanges;
1111
bool ReverseFlag = false;
12+
1213
public:
1314
TPKRangesFilter(const bool reverse);
1415

16+
[[nodiscard]] TConclusionStatus Add(
17+
std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const std::shared_ptr<arrow::Schema>& pkSchema);
18+
std::shared_ptr<arrow::RecordBatch> SerializeToRecordBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const;
19+
TString SerializeToString(const std::shared_ptr<arrow::Schema>& pkSchema) const;
20+
1521
bool IsEmpty() const {
1622
return SortedRanges.empty() || FakeRanges;
1723
}
@@ -39,11 +45,10 @@ class TPKRangesFilter {
3945

4046
bool IsPortionInUsage(const TPortionInfo& info) const;
4147
TPKRangeFilter::EUsageClass IsPortionInPartialUsage(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const;
48+
bool CheckPoint(const NArrow::TReplaceKey& point) const;
4249

4350
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
4451

45-
[[nodiscard]] bool Add(std::shared_ptr<NOlap::TPredicate> f, std::shared_ptr<NOlap::TPredicate> t, const TIndexInfo* indexInfo);
46-
4752
std::set<std::string> GetColumnNames() const {
4853
std::set<std::string> result;
4954
for (auto&& i : SortedRanges) {
@@ -57,6 +62,30 @@ class TPKRangesFilter {
5762
TString DebugString() const;
5863

5964
std::set<ui32> GetColumnIds(const TIndexInfo& indexInfo) const;
65+
66+
static std::shared_ptr<TPKRangesFilter> BuildFromRecordBatchLines(const std::shared_ptr<arrow::RecordBatch>& batch, const bool reverse);
67+
68+
static std::shared_ptr<TPKRangesFilter> BuildFromRecordBatchFull(
69+
const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse);
70+
static std::shared_ptr<TPKRangesFilter> BuildFromString(
71+
const TString& data, const std::shared_ptr<arrow::Schema>& pkSchema, const bool reverse);
72+
73+
template <class TProto>
74+
static TConclusion<TPKRangesFilter> BuildFromProto(const TProto& proto, const bool reverse, const std::vector<TNameTypeInfo>& ydbPk) {
75+
TPKRangesFilter result(reverse);
76+
for (auto& protoRange : proto.GetRanges()) {
77+
TSerializedTableRange range(protoRange);
78+
auto fromPredicate = std::make_shared<TPredicate>();
79+
auto toPredicate = std::make_shared<TPredicate>();
80+
TSerializedTableRange serializedRange(protoRange);
81+
std::tie(*fromPredicate, *toPredicate) = TPredicate::DeserializePredicatesRange(serializedRange, ydbPk);
82+
auto status = result.Add(fromPredicate, toPredicate, NArrow::TStatusValidator::GetValid(NArrow::MakeArrowSchema(ydbPk)));
83+
if (status.IsFail()) {
84+
return status;
85+
}
86+
}
87+
return result;
88+
}
6089
};
6190

6291
}

0 commit comments

Comments
 (0)