Skip to content

Commit 6ee50a5

Browse files
uzhastiknepal
andauthored
Make sure we have properly aligned arrow buffers after deserializatio… (ydb-platform#9424)
Co-authored-by: Andrey Neporada <aneporada@ydb.tech>
1 parent d4d3395 commit 6ee50a5

File tree

2 files changed

+38
-12
lines changed

2 files changed

+38
-12
lines changed

ydb/library/yql/minikql/computation/mkql_block_transport.cpp

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <ydb/library/yql/minikql/mkql_type_builder.h>
55
#include <ydb/library/yql/public/udf/arrow/block_reader.h>
6+
#include <ydb/library/yql/public/udf/arrow/memory_pool.h>
67
#include <ydb/library/yql/utils/rope_over_buffer.h>
78
#include <ydb/library/yql/utils/yql_panic.h>
89

@@ -25,22 +26,35 @@ std::shared_ptr<arrow::Buffer> MakeEmptyBuffer() {
2526
return std::make_shared<arrow::Buffer>(nullptr, 0);
2627
}
2728

29+
bool HasArrrowAlignment(const void* buf) {
30+
return AlignUp(buf, NYql::NUdf::ArrowMemoryAlignment) == buf;
31+
}
32+
2833
std::shared_ptr<arrow::Buffer> MakeZeroBuffer(size_t byteLen) {
34+
using namespace NYql::NUdf;
35+
if (!byteLen) {
36+
return MakeEmptyBuffer();
37+
}
38+
2939
constexpr size_t NullWordCount = (MaxBlockSizeInBytes + sizeof(ui64) - 1) / sizeof(ui64);
30-
static ui64 nulls[NullWordCount] = { 0 };
31-
if (byteLen <= sizeof(nulls)) {
32-
return std::make_shared<arrow::Buffer>(reinterpret_cast<const ui8*>(nulls), byteLen);
40+
constexpr size_t ExtraAlignWords = (ArrowMemoryAlignment > sizeof(ui64)) ? (ArrowMemoryAlignment / sizeof(ui64) - 1) : 0;
41+
static const ui64 nulls[NullWordCount + ExtraAlignWords] = { 0 };
42+
43+
// round all buffer length to 64 bytes
44+
size_t capacity = AlignUp(byteLen, size_t(64));
45+
if (capacity <= NullWordCount * sizeof(ui64)) {
46+
return std::make_shared<arrow::Buffer>(AlignUp(reinterpret_cast<const ui8*>(nulls), ArrowMemoryAlignment), byteLen);
3347
}
3448

35-
size_t wordCount = (byteLen + sizeof(ui64) - 1) / sizeof(ui64);
36-
std::shared_ptr<ui64[]> buf(new ui64[wordCount]);
37-
std::fill(buf.get(), buf.get() + wordCount, 0);
38-
return std::make_shared<TOwnedArrowBuffer>(TContiguousSpan{ reinterpret_cast<const char*>(buf.get()), byteLen }, buf);
49+
auto result = AllocateResizableBuffer(byteLen, GetYqlMemoryPool());
50+
ARROW_OK(result->Resize(byteLen));
51+
std::memset(result->mutable_data(), 0, byteLen);
52+
return result;
3953
}
4054

4155
std::shared_ptr<arrow::Buffer> MakeZeroBitmap(size_t bitCount) {
4256
// align up 8 byte boundary
43-
size_t byteCount = ((bitCount + 63u) & ~size_t(63u)) >> 3;
57+
size_t byteCount = AlignUp(bitCount, size_t(64)) >> 3;
4458
return MakeZeroBuffer(byteCount);
4559
}
4660

@@ -57,7 +71,7 @@ void StoreNullsSizes(const arrow::ArrayData& data, const IBlockSerializer::TMeta
5771
}
5872

5973
const ui64 desiredOffset = data.offset % 8;
60-
size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
74+
size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3;
6175
metaSink(nullBytes);
6276
}
6377

@@ -72,7 +86,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
7286
return;
7387
}
7488
const ui64 desiredOffset = data.offset % 8;
75-
size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
89+
size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3;
7690
YQL_ENSURE(desiredOffset <= (size_t)data.offset);
7791
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
7892
const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
@@ -85,17 +99,26 @@ void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMayb
8599
}
86100

87101
std::shared_ptr<arrow::Buffer> LoadBuffer(TRope& source, TMaybe<ui64> size) {
102+
using namespace NYql::NUdf;
88103
YQL_ENSURE(size.Defined(), "Buffer size is not loaded");
89104
if (!*size) {
90-
return std::make_shared<arrow::Buffer>(nullptr, 0);
105+
return MakeEmptyBuffer();
91106
}
92107

93108
YQL_ENSURE(source.size() >= *size, "Premature end of data");
94109
auto owner = std::make_shared<TRope>(source.Begin(), source.Begin() + *size);
95110
source.EraseFront(*size);
96111

97112
owner->Compact();
98-
return std::make_shared<TOwnedArrowBuffer>(owner->GetContiguousSpan(), owner);
113+
auto span = owner->GetContiguousSpan();
114+
if (HasArrrowAlignment(span.Data())) {
115+
return std::make_shared<TOwnedArrowBuffer>(span, owner);
116+
}
117+
118+
auto result = AllocateResizableBuffer(span.Size(), NYql::NUdf::GetYqlMemoryPool());
119+
ARROW_OK(result->Resize((int64_t)span.Size()));
120+
std::memcpy(result->mutable_data(), span.Data(), span.Size());
121+
return result;
99122
}
100123

101124
std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TRope& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) {

ydb/library/yql/public/udf/arrow/memory_pool.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
namespace NYql {
77
namespace NUdf {
88

9+
constexpr size_t ArrowMemoryAlignment = 64;
10+
static_assert((ArrowMemoryAlignment & (ArrowMemoryAlignment - 1)) == 0, "ArrowMemoryAlignment should be power of 2");
11+
912
#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 37)
1013
arrow::MemoryPool* GetYqlMemoryPool();
1114
#else

0 commit comments

Comments
 (0)