Skip to content

Commit 0912518

Browse files
authored
find bounds in CS containers with duplicates (#16723)
1 parent 8f3a9de commit 0912518

File tree

9 files changed

+127
-40
lines changed

9 files changed

+127
-40
lines changed

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
214214
return result;
215215
}
216216

217-
void TMergePartialStream::SkipToLowerBound(const TSortableBatchPosition& pos, const bool include) {
217+
void TMergePartialStream::SkipToBound(const TSortableBatchPosition& pos, const bool lower) {
218218
if (SortHeap.Empty()) {
219219
return;
220220
}
@@ -224,13 +224,13 @@ void TMergePartialStream::SkipToLowerBound(const TSortableBatchPosition& pos, co
224224
if (cmpResult == std::partial_ordering::greater) {
225225
break;
226226
}
227-
if (cmpResult == std::partial_ordering::equivalent && include) {
227+
if (cmpResult == std::partial_ordering::equivalent && lower) {
228228
break;
229229
}
230230
const TSortableBatchPosition::TFoundPosition skipPos = SortHeap.MutableCurrent().SkipToLower(pos);
231231
AFL_DEBUG(NKikimrServices::ARROW_HELPER)("pos", pos.DebugJson().GetStringRobust())("heap", SortHeap.Current().GetKeyColumns().DebugJson().GetStringRobust());
232232
if (skipPos.IsEqual()) {
233-
if (!include && !SortHeap.MutableCurrent().Next()) {
233+
if (!lower && !SortHeap.MutableCurrent().Next()) {
234234
SortHeap.RemoveTop();
235235
} else {
236236
SortHeap.UpdateTop();

ydb/core/formats/arrow/reader/merger.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class TMergePartialStream {
5353
}
5454

5555
void PutControlPoint(const TSortableBatchPosition& point, const bool deepCopy);
56-
void SkipToLowerBound(const TSortableBatchPosition& pos, const bool include);
56+
void SkipToBound(const TSortableBatchPosition& pos, const bool lower);
5757

5858
void SetPossibleSameVersion(const bool value) {
5959
PossibleSameVersionFlag = value;

ydb/core/formats/arrow/reader/position.cpp

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,51 +18,51 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
1818
return result;
1919
}
2020

21-
std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(TRWSortableBatchPosition& position,
22-
const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) {
21+
std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindBound(TRWSortableBatchPosition& position,
22+
const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool upper) {
2323
ui64 posStart = posStartExt;
2424
ui64 posFinish = posFinishExt;
2525
auto guard = position.CreateAsymmetricAccessGuard();
26+
const auto cond = upper ?
27+
[](const std::partial_ordering cmp) {
28+
return cmp == std::partial_ordering::greater;
29+
} :
30+
[](const std::partial_ordering cmp) {
31+
return cmp == std::partial_ordering::greater || cmp == std::partial_ordering::equivalent;
32+
};
33+
2634
{
2735
AFL_VERIFY(guard.InitSortingPosition(posStart));
2836
auto cmp = position.Compare(forFound);
29-
if (cmp == std::partial_ordering::greater) {
30-
return TFoundPosition::Greater(posStart);
31-
} else if (cmp == std::partial_ordering::equivalent) {
32-
return TFoundPosition::Equal(posStart);
37+
if (cond(cmp)) {
38+
return TFoundPosition(posStart, cmp);
3339
}
3440
}
3541
{
3642
AFL_VERIFY(guard.InitSortingPosition(posFinish));
3743
auto cmp = position.Compare(forFound);
38-
if (cmp == std::partial_ordering::less) {
39-
return TFoundPosition::Less(posFinish);
40-
} else if (cmp == std::partial_ordering::equivalent) {
41-
return TFoundPosition::Equal(posFinish);
44+
if (!cond(cmp)) {
45+
return std::nullopt;
4246
}
4347
}
44-
while (posFinish > posStart + 1) {
48+
while (posFinish != posStart + 1) {
49+
AFL_VERIFY(posFinish > posStart + 1)("finish", posFinish)("start", posStart);
4550
AFL_VERIFY(guard.InitSortingPosition(0.5 * (posStart + posFinish)));
4651
const auto comparision = position.Compare(forFound);
47-
if (comparision == std::partial_ordering::less) {
48-
posStart = position.Position;
49-
} else if (comparision == std::partial_ordering::greater) {
52+
if (cond(comparision)) {
5053
posFinish = position.Position;
5154
} else {
52-
return TFoundPosition::Equal(position.Position);
55+
posStart = position.Position;
5356
}
5457
}
55-
AFL_VERIFY(posFinish != posStart);
56-
if (greater) {
57-
AFL_VERIFY(guard.InitSortingPosition(posFinish));
58-
return TFoundPosition::Greater(posFinish);
59-
} else {
60-
AFL_VERIFY(guard.InitSortingPosition(posStart));
61-
return TFoundPosition::Less(posStart);
62-
}
58+
AFL_VERIFY(posFinish == posStart + 1)("finish", posFinish)("start", posStart);
59+
AFL_VERIFY(guard.InitSortingPosition(posFinish));
60+
const auto comparision = position.Compare(forFound);
61+
AFL_VERIFY(cond(comparision));
62+
return TFoundPosition(posFinish, comparision);
6363
}
6464

65-
std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch,
65+
std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindBound(const std::shared_ptr<arrow::RecordBatch>& batch,
6666
const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) {
6767
if (!batch || !batch->num_rows()) {
6868
return {};
@@ -77,12 +77,11 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
7777
}
7878

7979
TRWSortableBatchPosition position = forFound.BuildRWPosition(batch, posStart);
80-
return FindPosition(position, posStart, posFinish, forFound, greater);
80+
return FindBound(position, posStart, posFinish, forFound, greater);
8181
}
8282

8383
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(const bool needData, const bool deepCopy) const {
84-
return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort,
85-
deepCopy ? Sorting->BuildCopy(Position) : Sorting,
84+
return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort, deepCopy ? Sorting->BuildCopy(Position) : Sorting,
8685
(needData && Data) ? (deepCopy ? Data->BuildCopy(Position) : Data) : nullptr);
8786
}
8887

@@ -96,9 +95,15 @@ NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::Build
9695
}
9796

9897
TSortableBatchPosition::TFoundPosition TRWSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) {
98+
AFL_VERIFY(RecordsCount);
9999
const ui32 posStart = Position;
100-
auto pos = FindPosition(*this, posStart, ReverseSort ? 0 : (RecordsCount - 1), forFound, true);
101-
AFL_VERIFY(pos)("cursor", DebugJson())("found", forFound.DebugJson());
100+
AFL_VERIFY(!ReverseSort)("reason", "unimplemented");
101+
auto pos = FindBound(*this, posStart, RecordsCount - 1, forFound, false);
102+
if (!pos) {
103+
auto guard = CreateAsymmetricAccessGuard();
104+
AFL_VERIFY(guard.InitSortingPosition(RecordsCount - 1));
105+
return TFoundPosition(RecordsCount - 1, Compare(forFound));
106+
}
102107
if (ReverseSort) {
103108
AFL_VERIFY(Position <= posStart)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true);
104109
} else {
@@ -133,8 +138,7 @@ TSortableScanData::TSortableScanData(
133138
BuildPosition(position);
134139
}
135140

136-
TSortableScanData::TSortableScanData(
137-
const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) {
141+
TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch) {
138142
for (auto&& c : batch->columns()) {
139143
Columns.emplace_back(std::make_shared<NAccessor::TTrivialArray>(c));
140144
}

ydb/core/formats/arrow/reader/position.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,15 +314,24 @@ class TSortableBatchPosition {
314314
static TFoundPosition Equal(const ui32 pos) {
315315
return TFoundPosition(pos);
316316
}
317+
318+
TFoundPosition(const ui32 pos, const std::partial_ordering cmp)
319+
: Position(pos) {
320+
if (cmp == std::partial_ordering::less) {
321+
GreaterIfNotEqual = false;
322+
} else if (cmp == std::partial_ordering::greater) {
323+
GreaterIfNotEqual = true;
324+
}
325+
}
317326
};
318327

319328
[[nodiscard]] bool IsAvailablePosition(const i64 position) const {
320329
return 0 <= position && position < RecordsCount;
321330
}
322331

323-
static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound,
332+
static std::optional<TFoundPosition> FindBound(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound,
324333
const bool needGreater, const std::optional<ui32> includedStartPosition);
325-
static std::optional<TSortableBatchPosition::TFoundPosition> FindPosition(TRWSortableBatchPosition& position, const ui64 posStart,
334+
static std::optional<TSortableBatchPosition::TFoundPosition> FindBound(TRWSortableBatchPosition& position, const ui64 posStart,
326335
const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater);
327336

328337
const TSortableScanData& GetData() const {
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#include <ydb/core/formats/arrow/reader/position.h>
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
5+
namespace NKikimr::NArrow {
6+
7+
Y_UNIT_TEST_SUITE(SortableBatchPosition) {
8+
Y_UNIT_TEST(FindPosition) {
9+
std::shared_ptr<arrow::RecordBatch> data;
10+
std::shared_ptr<arrow::Schema> schema =
11+
std::make_shared<arrow::Schema>(arrow::Schema({ std::make_shared<arrow::Field>("class", std::make_shared<arrow::StringType>()),
12+
std::make_shared<arrow::Field>("name", std::make_shared<arrow::StringType>()) }));
13+
{
14+
std::unique_ptr<arrow::RecordBatchBuilder> batchBuilder;
15+
UNIT_ASSERT(arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), &batchBuilder).ok());
16+
17+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
18+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
19+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
20+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
21+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("c").ok());
22+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("c").ok());
23+
24+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok());
25+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok());
26+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
27+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
28+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("a").ok());
29+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
30+
31+
UNIT_ASSERT(batchBuilder->Flush(&data).ok());
32+
}
33+
34+
std::shared_ptr<arrow::RecordBatch> search;
35+
{
36+
std::unique_ptr<arrow::RecordBatchBuilder> batchBuilder;
37+
UNIT_ASSERT(arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), &batchBuilder).ok());
38+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(0)->Append("a").ok());
39+
UNIT_ASSERT(batchBuilder->GetFieldAs<arrow::TypeTraits<arrow::StringType>::BuilderType>(1)->Append("c").ok());
40+
UNIT_ASSERT(batchBuilder->Flush(&search).ok());
41+
}
42+
43+
NMerger::TSortableBatchPosition searchPosition(search, 0, false);
44+
{
45+
auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPosition, false, std::nullopt);
46+
UNIT_ASSERT(!!findPosition);
47+
UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 2);
48+
}
49+
50+
{
51+
auto findPosition = NMerger::TSortableBatchPosition::FindBound(data, searchPosition, true, std::nullopt);
52+
UNIT_ASSERT(!!findPosition);
53+
UNIT_ASSERT_VALUES_EQUAL(findPosition->GetPosition(), 4);
54+
}
55+
}
56+
}
57+
58+
} // namespace NKikimr::NArrow

ydb/core/formats/arrow/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ SRCS(
3333
ut_dictionary.cpp
3434
ut_column_filter.cpp
3535
ut_hash.cpp
36+
ut_reader.cpp
3637
)
3738

3839
END()

ydb/core/kqp/ut/olap/helpers/get_value.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,24 @@ void PrintValue(IOutputStream& out, const NYdb::TValue& v) {
6161
out << value.GetBool();
6262
break;
6363
}
64+
case NYdb::EPrimitiveType::String:
65+
{
66+
out << value.GetString();
67+
break;
68+
}
69+
case NYdb::EPrimitiveType::Json:
70+
{
71+
out << value.GetJson();
72+
break;
73+
}
74+
case NYdb::EPrimitiveType::JsonDocument:
75+
{
76+
out << value.GetJsonDocument();
77+
break;
78+
}
6479
default:
6580
{
66-
UNIT_ASSERT_C(false, "PrintValue not iplemented for this type");
81+
UNIT_ASSERT_C(false, TStringBuilder() << "PrintValue not iplemented for this type: " << (ui64)value.GetPrimitiveType());
6782
}
6883
}
6984
}

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
10711071
for (const auto& r : rows) {
10721072
TInstant ts = GetTimestamp(r.at("timestamp"));
10731073
UNIT_ASSERT_GE_C(ts, tsPrev, "result is not sorted in ASC order");
1074-
UNIT_ASSERT(results.erase(ts.GetValue()));
1074+
UNIT_ASSERT_C(results.erase(ts.GetValue()), Sprintf("%d", ts.GetValue()));
10751075
tsPrev = ts;
10761076
}
10771077
UNIT_ASSERT(rows.size() == 6);

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
142142
}
143143
}
144144
Merger->PutControlPoint(MergingContext->GetFinish(), false);
145-
Merger->SkipToLowerBound(MergingContext->GetStart(), MergingContext->GetIncludeStart());
145+
Merger->SkipToBound(MergingContext->GetStart(), MergingContext->GetIncludeStart());
146146
const ui32 originalSourcesCount = Sources.size();
147147
Sources.clear();
148148

0 commit comments

Comments
 (0)