Skip to content

Commit 33bdce5

Browse files
asymmetric access for sorted key on find position (#7177)
1 parent 13b5991 commit 33bdce5

File tree

3 files changed

+59
-23
lines changed

3 files changed

+59
-23
lines changed

ydb/core/formats/arrow/reader/batch_iterator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ class TBatchIterator {
4444

4545
TBatchIterator(TRWSortableBatchPosition&& keyColumns)
4646
: ControlPointFlag(true)
47-
, KeyColumns(std::move(keyColumns)) {
47+
, KeyColumns(std::move(keyColumns))
48+
{
4849

4950
}
5051

ydb/core/formats/arrow/reader/position.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
2222
const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) {
2323
ui64 posStart = posStartExt;
2424
ui64 posFinish = posFinishExt;
25+
auto guard = position.CreateAsymmetricAccessGuard();
2526
{
26-
AFL_VERIFY(position.InitPosition(posStart));
27+
AFL_VERIFY(guard.InitSortingPosition(posStart));
2728
auto cmp = position.Compare(forFound);
2829
if (cmp == std::partial_ordering::greater) {
2930
return TFoundPosition::Greater(posStart);
@@ -32,7 +33,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
3233
}
3334
}
3435
{
35-
AFL_VERIFY(position.InitPosition(posFinish));
36+
AFL_VERIFY(guard.InitSortingPosition(posFinish));
3637
auto cmp = position.Compare(forFound);
3738
if (cmp == std::partial_ordering::less) {
3839
return TFoundPosition::Less(posFinish);
@@ -41,7 +42,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
4142
}
4243
}
4344
while (posFinish > posStart + 1) {
44-
Y_ABORT_UNLESS(position.InitPosition(0.5 * (posStart + posFinish)));
45+
AFL_VERIFY(guard.InitSortingPosition(0.5 * (posStart + posFinish)));
4546
const auto comparision = position.Compare(forFound);
4647
if (comparision == std::partial_ordering::less) {
4748
posStart = position.Position;
@@ -51,12 +52,12 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
5152
return TFoundPosition::Equal(position.Position);
5253
}
5354
}
54-
Y_ABORT_UNLESS(posFinish != posStart);
55+
AFL_VERIFY(posFinish != posStart);
5556
if (greater) {
56-
Y_ABORT_UNLESS(position.InitPosition(posFinish));
57+
AFL_VERIFY(guard.InitSortingPosition(posFinish));
5758
return TFoundPosition::Greater(posFinish);
5859
} else {
59-
Y_ABORT_UNLESS(position.InitPosition(posStart));
60+
AFL_VERIFY(guard.InitSortingPosition(posStart));
6061
return TFoundPosition::Less(posStart);
6162
}
6263
}
@@ -176,7 +177,7 @@ void TSortableScanData::BuildPosition(const ui64 position) {
176177
bool TSortableScanData::InitPosition(const ui64 position) {
177178
AFL_VERIFY(position < RecordsCount);
178179
if (position < FinishPosition && StartPosition <= position) {
179-
return false;
180+
return true;
180181
}
181182
LastInit = position;
182183
ui32 idx = 0;

ydb/core/formats/arrow/reader/position.h

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ class TSortableScanData {
152152

153153
void AppendPositionTo(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const ui64 position, ui64* recordSize) const;
154154

155-
bool InitPosition(const ui64 position);
155+
[[nodiscard]] bool InitPosition(const ui64 position);
156156

157157
std::shared_ptr<arrow::Table> Slice(const ui64 offset, const ui64 count) const {
158158
std::vector<std::shared_ptr<arrow::ChunkedArray>> slicedArrays;
@@ -244,9 +244,8 @@ class TSortableBatchPosition {
244244
, RecordsCount(recordsCount)
245245
, ReverseSort(reverseSort)
246246
, Sorting(sorting)
247-
, Data(data)
248-
{
249-
247+
, Data(data) {
248+
AFL_VERIFY(IsAvailablePosition(Position));
250249
}
251250

252251
TSortableBatchPosition(const TRWSortableBatchPosition& source) = delete;
@@ -315,7 +314,12 @@ class TSortableBatchPosition {
315314
}
316315
};
317316

318-
static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition);
317+
[[nodiscard]] bool IsAvailablePosition(const i64 position) const {
318+
return 0 <= position && position < RecordsCount;
319+
}
320+
321+
static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound,
322+
const bool needGreater, const std::optional<ui32> includedStartPosition);
319323
static std::optional<TSortableBatchPosition::TFoundPosition> FindPosition(TRWSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater);
320324

321325
const TSortableScanData& GetData() const {
@@ -487,7 +491,7 @@ class TIntervalPositions {
487491
void AddPosition(TSortableBatchPosition&& position, const bool includePositionToLeftInterval) {
488492
TIntervalPosition intervalPosition(std::move(position), includePositionToLeftInterval);
489493
AddPosition(std::move(intervalPosition));
490-
}
494+
}
491495

492496
void AddPosition(const TSortableBatchPosition& position, const bool includePositionToLeftInterval) {
493497
TIntervalPosition intervalPosition(position, includePositionToLeftInterval);
@@ -501,23 +505,53 @@ class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly
501505
public:
502506
using TBase::TBase;
503507

504-
bool NextPosition(const i64 delta) {
508+
[[nodiscard]] bool NextPosition(const i64 delta) {
505509
return InitPosition(Position + delta);
506510
}
507511

508-
bool InitPosition(const i64 position) {
509-
if (position < RecordsCount && position >= 0) {
510-
Sorting->InitPosition(position);
511-
if (Data) {
512-
Data->InitPosition(position);
512+
[[nodiscard]] bool InitPosition(const i64 position) {
513+
if (!IsAvailablePosition(position)) {
514+
return false;
515+
}
516+
AFL_VERIFY(Sorting->InitPosition(position))("pos", position)("count", RecordsCount);
517+
if (Data) {
518+
AFL_VERIFY(Data->InitPosition(position))("pos", position)("count", RecordsCount);
519+
}
520+
Position = position;
521+
return true;
522+
}
523+
524+
class TAsymmetricPositionGuard: TNonCopyable {
525+
private:
526+
TRWSortableBatchPosition& Owner;
527+
public:
528+
TAsymmetricPositionGuard(TRWSortableBatchPosition& owner)
529+
: Owner(owner)
530+
{
531+
}
532+
533+
[[nodiscard]] bool InitSortingPosition(const i64 position) {
534+
if (!Owner.IsAvailablePosition(position)) {
535+
return false;
513536
}
514-
Position = position;
537+
AFL_VERIFY(Owner.Sorting->InitPosition(position));
538+
Owner.Position = position;
515539
return true;
516-
} else {
517-
return false;
518540
}
519541

542+
~TAsymmetricPositionGuard() {
543+
if (Owner.IsAvailablePosition(Owner.Position)) {
544+
if (Owner.Data) {
545+
AFL_VERIFY(Owner.Data->InitPosition(Owner.Position));
546+
}
547+
}
548+
}
549+
};
550+
551+
TAsymmetricPositionGuard CreateAsymmetricAccessGuard() {
552+
return TAsymmetricPositionGuard(*this);
520553
}
554+
521555
TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);
522556

523557
// (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf)

0 commit comments

Comments
 (0)