Skip to content

Commit 6d04fb7

Browse files
authored
use binsearch to find predicate bound in portion (#16867)
1 parent d98dbe4 commit 6d04fb7

File tree

12 files changed

+55
-247
lines changed

12 files changed

+55
-247
lines changed

ydb/core/formats/arrow/arrow_filter.cpp

Lines changed: 0 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -16,127 +16,6 @@ namespace NKikimr::NArrow {
1616

1717
#define Y_VERIFY_OK(status) Y_ABORT_UNLESS(status.ok(), "%s", status.ToString().c_str())
1818

19-
namespace {
20-
enum class ECompareResult : i8 {
21-
LESS = -1,
22-
BORDER = 0,
23-
GREATER = 1
24-
};
25-
26-
template <typename TArray>
27-
inline auto GetValue(const std::shared_ptr<TArray>& array, int pos) {
28-
return array->GetView(pos);
29-
}
30-
31-
template <typename T>
32-
inline void UpdateCompare(const T& value, const T& border, ECompareResult& res) {
33-
if (res == ECompareResult::BORDER) {
34-
if constexpr (std::is_same_v<T, arrow::util::string_view>) {
35-
size_t minSize = (value.size() < border.size()) ? value.size() : border.size();
36-
int cmp = memcmp(value.data(), border.data(), minSize);
37-
if (cmp < 0) {
38-
res = ECompareResult::LESS;
39-
} else if (cmp > 0) {
40-
res = ECompareResult::GREATER;
41-
} else {
42-
UpdateCompare(value.size(), border.size(), res);
43-
}
44-
} else {
45-
if (value < border) {
46-
res = ECompareResult::LESS;
47-
} else if (value > border) {
48-
res = ECompareResult::GREATER;
49-
}
50-
}
51-
}
52-
}
53-
54-
template <typename TArray, typename T>
55-
bool CompareImpl(const std::shared_ptr<arrow::Array>& column, const T& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
56-
bool hasBorder = false;
57-
ECompareResult* res = &rowsCmp[0];
58-
auto array = std::static_pointer_cast<TArray>(column);
59-
60-
for (int i = 0; i < array->length(); ++i, ++res) {
61-
UpdateCompare(GetValue(array, i), border, *res);
62-
hasBorder = hasBorder || (*res == ECompareResult::BORDER);
63-
}
64-
return !hasBorder;
65-
}
66-
67-
template <typename TArray, typename T>
68-
bool CompareImpl(const std::shared_ptr<arrow::ChunkedArray>& column, const T& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
69-
bool hasBorder = false;
70-
ECompareResult* res = &rowsCmp[0];
71-
72-
for (auto& chunk : column->chunks()) {
73-
auto array = std::static_pointer_cast<TArray>(chunk);
74-
75-
for (int i = 0; i < chunk->length(); ++i, ++res) {
76-
UpdateCompare(GetValue(array, i), border, *res);
77-
hasBorder = hasBorder || (*res == ECompareResult::BORDER);
78-
}
79-
}
80-
return !hasBorder;
81-
}
82-
83-
/// @return true in case we have no borders in compare: no need for future keys, allow early exit
84-
template <typename TArray>
85-
bool Compare(const arrow::Datum& column, const std::shared_ptr<arrow::Array>& borderArray, std::vector<NArrow::ECompareResult>& rowsCmp) {
86-
auto border = GetValue(std::static_pointer_cast<TArray>(borderArray), 0);
87-
88-
switch (column.kind()) {
89-
case arrow::Datum::ARRAY:
90-
return CompareImpl<TArray>(column.make_array(), border, rowsCmp);
91-
case arrow::Datum::CHUNKED_ARRAY:
92-
return CompareImpl<TArray>(column.chunked_array(), border, rowsCmp);
93-
default:
94-
break;
95-
}
96-
Y_ABORT_UNLESS(false);
97-
return false;
98-
}
99-
100-
bool SwitchCompare(const arrow::Datum& column, const std::shared_ptr<arrow::Array>& border, std::vector<NArrow::ECompareResult>& rowsCmp) {
101-
Y_ABORT_UNLESS(border->length() == 1);
102-
103-
// first time it's empty
104-
if (rowsCmp.empty()) {
105-
rowsCmp.resize(column.length(), ECompareResult::BORDER);
106-
}
107-
108-
return SwitchArrayType(column, [&](const auto& type) -> bool {
109-
using TWrap = std::decay_t<decltype(type)>;
110-
using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
111-
return Compare<TArray>(column, border, rowsCmp);
112-
});
113-
}
114-
115-
template <typename T>
116-
void CompositeCompare(std::shared_ptr<T> some, std::shared_ptr<arrow::RecordBatch> borderBatch, std::vector<NArrow::ECompareResult>& rowsCmp) {
117-
AFL_VERIFY(some);
118-
AFL_VERIFY(borderBatch);
119-
auto key = borderBatch->schema()->fields();
120-
AFL_VERIFY(key.size());
121-
122-
for (size_t i = 0; i < key.size(); ++i) {
123-
auto& field = key[i];
124-
auto typeId = field->type()->id();
125-
auto column = some->GetColumnByName(field->name());
126-
std::shared_ptr<arrow::Array> border = borderBatch->GetColumnByName(field->name());
127-
AFL_VERIFY(column)("schema1", some->schema()->ToString())("schema2", borderBatch->schema()->ToString())("f", field->name());
128-
AFL_VERIFY(border)("schema1", some->schema()->ToString())("schema2", borderBatch->schema()->ToString())("f", field->name());
129-
AFL_VERIFY(some->schema()->GetFieldByName(field->name())->type()->id() == typeId)("schema1", some->schema()->ToString())(
130-
"schema2", borderBatch->schema()->ToString())("f", field->name());
131-
132-
if (SwitchCompare(column, border, rowsCmp)) {
133-
break; // early exit in case we have all rows compared: no borders, can omit key tail
134-
}
135-
}
136-
}
137-
138-
} // namespace
139-
14019
TColumnFilter::TSlicesIterator::TSlicesIterator(const TColumnFilter& owner, const std::optional<ui32> start, const std::optional<ui32> count)
14120
: Owner(owner)
14221
, StartIndex(start)
@@ -307,61 +186,6 @@ ui32 TColumnFilter::CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const
307186
return f - s;
308187
}
309188

310-
NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(
311-
const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType) {
312-
std::vector<ECompareResult> cmps;
313-
314-
switch (datum.kind()) {
315-
case arrow::Datum::ARRAY:
316-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::ARRAY);
317-
SwitchCompare(datum, border.make_array(), cmps);
318-
break;
319-
case arrow::Datum::CHUNKED_ARRAY:
320-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::ARRAY);
321-
SwitchCompare(datum, border.make_array(), cmps);
322-
break;
323-
case arrow::Datum::RECORD_BATCH:
324-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::RECORD_BATCH);
325-
CompositeCompare(datum.record_batch(), border.record_batch(), cmps);
326-
break;
327-
case arrow::Datum::TABLE:
328-
Y_ABORT_UNLESS(border.kind() == arrow::Datum::RECORD_BATCH);
329-
CompositeCompare(datum.table(), border.record_batch(), cmps);
330-
break;
331-
default:
332-
Y_ABORT_UNLESS(false);
333-
break;
334-
}
335-
336-
std::vector<bool> bits;
337-
bits.reserve(cmps.size());
338-
339-
switch (compareType) {
340-
case ECompareType::LESS:
341-
for (size_t i = 0; i < cmps.size(); ++i) {
342-
bits.emplace_back(cmps[i] < ECompareResult::BORDER);
343-
}
344-
break;
345-
case ECompareType::LESS_OR_EQUAL:
346-
for (size_t i = 0; i < cmps.size(); ++i) {
347-
bits.emplace_back(cmps[i] <= ECompareResult::BORDER);
348-
}
349-
break;
350-
case ECompareType::GREATER:
351-
for (size_t i = 0; i < cmps.size(); ++i) {
352-
bits.emplace_back(cmps[i] > ECompareResult::BORDER);
353-
}
354-
break;
355-
case ECompareType::GREATER_OR_EQUAL:
356-
for (size_t i = 0; i < cmps.size(); ++i) {
357-
bits.emplace_back(cmps[i] >= ECompareResult::BORDER);
358-
}
359-
break;
360-
}
361-
362-
return NArrow::TColumnFilter(std::move(bits));
363-
}
364-
365189
template <class TData>
366190
void ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const TColumnFilter::TApplyContext& context) {
367191
if (!batch || !batch->num_rows()) {

ydb/core/formats/arrow/arrow_filter.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,6 @@ class TColumnFilter {
266266
TColumnFilter And(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
267267
TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
268268

269-
// It makes a filter using composite predicate
270-
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);
271-
272269
class TApplyContext {
273270
private:
274271
YDB_READONLY_DEF(std::optional<ui32>, StartPos);

ydb/core/formats/arrow/ut/ut_arrow.cpp

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -480,21 +480,6 @@ std::vector<TDataRow> TestRows() {
480480
return rows;
481481
}
482482

483-
bool CheckFilter(const std::vector<bool>& f, size_t count, bool value) {
484-
for (size_t i = 0; i < f.size(); ++i) {
485-
if (i < count) {
486-
if (f[i] != value) {
487-
return false;
488-
}
489-
} else {
490-
if (f[i] == value) {
491-
return false;
492-
}
493-
}
494-
}
495-
return true;
496-
}
497-
498483
std::shared_ptr<arrow::Table> MakeTable1000() {
499484
TDataRowTableBuilder builder;
500485

@@ -672,38 +657,6 @@ Y_UNIT_TEST_SUITE(ArrowTest) {
672657
}
673658
}
674659

675-
Y_UNIT_TEST(KeyComparison) {
676-
auto table = MakeTable1000();
677-
678-
std::shared_ptr<arrow::RecordBatch> border; // {2, 3, 4}
679-
{
680-
arrow::ScalarVector scalars{
681-
std::make_shared<arrow::Int8Scalar>(2),
682-
std::make_shared<arrow::Int16Scalar>(3),
683-
std::make_shared<arrow::Int32Scalar>(4),
684-
};
685-
686-
std::vector<std::shared_ptr<arrow::Array>> columns;
687-
for (auto scalar : scalars) {
688-
auto res = arrow::MakeArrayFromScalar(*scalar, 1);
689-
UNIT_ASSERT(res.ok());
690-
columns.push_back(*res);
691-
}
692-
693-
border = arrow::RecordBatch::Make(table->schema(), 1, columns);
694-
}
695-
696-
const NArrow::TColumnFilter lt = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::LESS);
697-
const NArrow::TColumnFilter le = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::LESS_OR_EQUAL);
698-
const NArrow::TColumnFilter gt = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER);
699-
const NArrow::TColumnFilter ge = NArrow::TColumnFilter::MakePredicateFilter(table, border, NArrow::ECompareType::GREATER_OR_EQUAL);
700-
701-
UNIT_ASSERT(CheckFilter(lt.BuildSimpleFilter(), 234, true));
702-
UNIT_ASSERT(CheckFilter(le.BuildSimpleFilter(), 235, true));
703-
UNIT_ASSERT(CheckFilter(gt.BuildSimpleFilter(), 235, false));
704-
UNIT_ASSERT(CheckFilter(ge.BuildSimpleFilter(), 234, false));
705-
}
706-
707660
Y_UNIT_TEST(SortWithCompositeKey) {
708661
std::shared_ptr<arrow::Table> table = Shuffle(MakeTable1000());
709662

ydb/core/tx/columnshard/engines/predicate/container.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,36 @@ TConclusion<TPredicateContainer> TPredicateContainer::BuildPredicateTo(
179179
}
180180
}
181181

182+
NArrow::TColumnFilter TPredicateContainer::BuildFilter(const std::shared_ptr<NArrow::TGeneralContainer>& data) const {
183+
if (!Object) {
184+
auto result = NArrow::TColumnFilter::BuildAllowFilter();
185+
result.Add(true, data->GetRecordsCount());
186+
return result;
187+
}
188+
if (!data->GetRecordsCount()) {
189+
return NArrow::TColumnFilter::BuildAllowFilter();
190+
}
191+
auto sortingFields = Object->Batch->schema()->field_names();
192+
auto position = NArrow::NMerger::TRWSortableBatchPosition(data, 0, sortingFields, {}, false);
193+
const auto border = NArrow::NMerger::TSortableBatchPosition(Object->Batch, 0, sortingFields, {}, false);
194+
const bool needUppedBound = CompareType == NArrow::ECompareType::LESS_OR_EQUAL || CompareType == NArrow::ECompareType::GREATER;
195+
const auto findBound = position.FindBound(position, 0, data->GetRecordsCount() - 1, border, needUppedBound);
196+
const ui64 rowsBeforeBound = findBound ? findBound->GetPosition() : data->GetRecordsCount();
197+
198+
auto filter = NArrow::TColumnFilter::BuildAllowFilter();
199+
switch (CompareType) {
200+
case NArrow::ECompareType::LESS:
201+
case NArrow::ECompareType::LESS_OR_EQUAL:
202+
filter.Add(true, rowsBeforeBound);
203+
filter.Add(false, data->GetRecordsCount() - rowsBeforeBound);
204+
break;
205+
case NArrow::ECompareType::GREATER:
206+
case NArrow::ECompareType::GREATER_OR_EQUAL:
207+
filter.Add(false, rowsBeforeBound);
208+
filter.Add(true, data->GetRecordsCount() - rowsBeforeBound);
209+
break;
210+
}
211+
return filter;
182212
}
213+
214+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/predicate/container.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#pragma once
22
#include "predicate.h"
33

4+
#include <ydb/core/formats/arrow/accessor/abstract/accessor.h>
45
#include <ydb/core/formats/arrow/arrow_filter.h>
6+
#include <ydb/core/formats/arrow/common/container.h>
7+
#include <ydb/core/formats/arrow/reader/position.h>
58

69
#include <ydb/library/accessor/accessor.h>
710
#include <ydb/library/conclusion/result.h>
@@ -115,12 +118,7 @@ class TPredicateContainer {
115118
static TConclusion<TPredicateContainer> BuildPredicateTo(
116119
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema);
117120

118-
NKikimr::NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const {
119-
if (!Object) {
120-
return NArrow::TColumnFilter::BuildAllowFilter();
121-
}
122-
return NArrow::TColumnFilter::MakePredicateFilter(data, Object->Batch, CompareType);
123-
}
121+
NArrow::TColumnFilter BuildFilter(const std::shared_ptr<NArrow::TGeneralContainer>& data) const;
124122
};
125123

126124
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/predicate/filter.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77

88
namespace NKikimr::NOlap {
99

10-
NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(const arrow::Datum& data) const {
10+
NKikimr::NArrow::TColumnFilter TPKRangesFilter::BuildFilter(const std::shared_ptr<NArrow::TGeneralContainer>& data) const {
1111
if (SortedRanges.empty()) {
1212
return NArrow::TColumnFilter::BuildAllowFilter();
1313
}
14-
NArrow::TColumnFilter result = SortedRanges.front().BuildFilter(data);
15-
for (ui32 i = 1; i < SortedRanges.size(); ++i) {
16-
result = result.Or(SortedRanges[i].BuildFilter(data));
14+
15+
auto result = NArrow::TColumnFilter::BuildDenyFilter();
16+
for (const auto& range : SortedRanges) {
17+
result = result.Or(range.BuildFilter(data));
1718
}
1819
return result;
1920
}

ydb/core/tx/columnshard/engines/predicate/filter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class TPKRangesFilter {
6464
TPKRangeFilter::EUsageClass GetUsageClass(const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& end) const;
6565
bool CheckPoint(const NArrow::TReplaceKey& point) const;
6666

67-
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
67+
NArrow::TColumnFilter BuildFilter(const std::shared_ptr<NArrow::TGeneralContainer>& data) const;
6868

6969
std::set<std::string> GetColumnNames() const {
7070
std::set<std::string> result;

ydb/core/tx/columnshard/engines/predicate/range.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ std::set<std::string> TPKRangeFilter::GetColumnNames() const {
3434
return result;
3535
}
3636

37-
NKikimr::NArrow::TColumnFilter TPKRangeFilter::BuildFilter(const arrow::Datum& data) const {
38-
NArrow::TColumnFilter result = PredicateTo.BuildFilter(data);
37+
NArrow::TColumnFilter TPKRangeFilter::BuildFilter(const std::shared_ptr<NArrow::TGeneralContainer>& data) const {
38+
auto result = PredicateTo.BuildFilter(data);
3939
return result.And(PredicateFrom.BuildFilter(data));
4040
}
4141

ydb/core/tx/columnshard/engines/predicate/range.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class TPKRangeFilter {
3636

3737
static TConclusion<TPKRangeFilter> Build(TPredicateContainer&& from, TPredicateContainer&& to);
3838

39-
NArrow::TColumnFilter BuildFilter(const arrow::Datum& data) const;
39+
NArrow::TColumnFilter BuildFilter(const std::shared_ptr<NArrow::TGeneralContainer>& data) const;
4040

4141
bool IsUsed(const TPortionInfo& info) const;
4242
bool CheckPoint(const NArrow::TReplaceKey& point) const;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ namespace NKikimr::NOlap::NReader::NPlain {
1414

1515
TConclusion<bool> TPredicateFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
1616
auto filter = source->GetContext()->GetReadMetadata()->GetPKRangesFilter().BuildFilter(
17-
source->GetStageData().GetTable()->ToTable(source->GetContext()->GetReadMetadata()->GetPKRangesFilter().GetColumnIds(
18-
source->GetContext()->GetReadMetadata()->GetResultSchema()->GetIndexInfo()),
19-
source->GetContext()->GetCommonContext()->GetResolver(), true));
17+
source->GetStageData().GetTable()->ToGeneralContainer(source->GetContext()->GetCommonContext()->GetResolver(),
18+
source->GetContext()->GetReadMetadata()->GetPKRangesFilter().GetColumnIds(
19+
source->GetContext()->GetReadMetadata()->GetResultSchema()->GetIndexInfo()),
20+
true));
2021
source->MutableStageData().AddFilter(filter);
2122
return true;
2223
}

0 commit comments

Comments
 (0)