Skip to content

Commit 26afa4e

Browse files
authored
Make sure we have properly aligned arrow buffers after deserialization (#9179)
1 parent bce2fdd commit 26afa4e

File tree

2 files changed

+38
-12
lines changed

2 files changed

+38
-12
lines changed

ydb/library/yql/minikql/computation/mkql_block_transport.cpp

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <ydb/library/yql/minikql/mkql_type_builder.h>
55
#include <ydb/library/yql/public/udf/arrow/block_reader.h>
6+
#include <ydb/library/yql/public/udf/arrow/memory_pool.h>
67
#include <ydb/library/yql/utils/rope_over_buffer.h>
78
#include <ydb/library/yql/utils/yql_panic.h>
89

@@ -30,22 +31,35 @@ std::shared_ptr<arrow::Buffer> MakeEmptyBuffer() {
3031
return std::make_shared<arrow::Buffer>(nullptr, 0);
3132
}
3233

34+
bool HasArrrowAlignment(const void* buf) {
35+
return AlignUp(buf, NYql::NUdf::ArrowMemoryAlignment) == buf;
36+
}
37+
3338
std::shared_ptr<arrow::Buffer> MakeZeroBuffer(size_t byteLen) {
39+
using namespace NYql::NUdf;
40+
if (!byteLen) {
41+
return MakeEmptyBuffer();
42+
}
43+
3444
constexpr size_t NullWordCount = (MaxBlockSizeInBytes + sizeof(ui64) - 1) / sizeof(ui64);
35-
static ui64 nulls[NullWordCount] = { 0 };
36-
if (byteLen <= sizeof(nulls)) {
37-
return std::make_shared<arrow::Buffer>(reinterpret_cast<const ui8*>(nulls), byteLen);
45+
constexpr size_t ExtraAlignWords = (ArrowMemoryAlignment > sizeof(ui64)) ? (ArrowMemoryAlignment / sizeof(ui64) - 1) : 0;
46+
static const ui64 nulls[NullWordCount + ExtraAlignWords] = { 0 };
47+
48+
// round all buffer length to 64 bytes
49+
size_t capacity = AlignUp(byteLen, size_t(64));
50+
if (capacity <= NullWordCount * sizeof(ui64)) {
51+
return std::make_shared<arrow::Buffer>(AlignUp(reinterpret_cast<const ui8*>(nulls), ArrowMemoryAlignment), byteLen);
3852
}
3953

40-
size_t wordCount = (byteLen + sizeof(ui64) - 1) / sizeof(ui64);
41-
std::shared_ptr<ui64[]> buf(new ui64[wordCount]);
42-
std::fill(buf.get(), buf.get() + wordCount, 0);
43-
return std::make_shared<TOwnedArrowBuffer>(TContiguousSpan{ reinterpret_cast<const char*>(buf.get()), byteLen }, buf);
54+
auto result = AllocateResizableBuffer(byteLen, GetYqlMemoryPool());
55+
ARROW_OK(result->Resize(byteLen));
56+
std::memset(result->mutable_data(), 0, byteLen);
57+
return result;
4458
}
4559

4660
std::shared_ptr<arrow::Buffer> MakeZeroBitmap(size_t bitCount) {
4761
// align up 8 byte boundary
48-
size_t byteCount = ((bitCount + 63u) & ~size_t(63u)) >> 3;
62+
size_t byteCount = AlignUp(bitCount, size_t(64)) >> 3;
4963
return MakeZeroBuffer(byteCount);
5064
}
5165

@@ -62,7 +76,7 @@ void StoreNullsSizes(const arrow::ArrayData& data, const IBlockSerializer::TMeta
6276
}
6377

6478
const ui64 desiredOffset = data.offset % 8;
65-
size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
79+
size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3;
6680
metaSink(nullBytes);
6781
}
6882

@@ -77,7 +91,7 @@ void StoreNulls(const arrow::ArrayData& data, TRope& dst) {
7791
return;
7892
}
7993
const ui64 desiredOffset = data.offset % 8;
80-
size_t nullBytes = (((size_t)data.length + desiredOffset + 7) & ~7ull) >> 3;
94+
size_t nullBytes = AlignUp((size_t)data.length + desiredOffset, size_t(8)) >> 3;
8195
YQL_ENSURE(desiredOffset <= (size_t)data.offset);
8296
YQL_ENSURE((data.offset - desiredOffset) % 8 == 0);
8397
const char* nulls = data.GetValues<char>(0, 0) + (data.offset - desiredOffset) / 8;
@@ -90,17 +104,26 @@ void LoadBufferSize(const IBlockDeserializer::TMetadataSource& metaSource, TMayb
90104
}
91105

92106
std::shared_ptr<arrow::Buffer> LoadBuffer(TRope& source, TMaybe<ui64> size) {
107+
using namespace NYql::NUdf;
93108
YQL_ENSURE(size.Defined(), "Buffer size is not loaded");
94109
if (!*size) {
95-
return std::make_shared<arrow::Buffer>(nullptr, 0);
110+
return MakeEmptyBuffer();
96111
}
97112

98113
YQL_ENSURE(source.size() >= *size, "Premature end of data");
99114
auto owner = std::make_shared<TRope>(source.Begin(), source.Begin() + *size);
100115
source.EraseFront(*size);
101116

102117
owner->Compact();
103-
return std::make_shared<TOwnedArrowBuffer>(owner->GetContiguousSpan(), owner);
118+
auto span = owner->GetContiguousSpan();
119+
if (HasArrrowAlignment(span.Data())) {
120+
return std::make_shared<TOwnedArrowBuffer>(span, owner);
121+
}
122+
123+
auto result = AllocateResizableBuffer(span.Size(), NYql::NUdf::GetYqlMemoryPool());
124+
ARROW_OK(result->Resize((int64_t)span.Size()));
125+
std::memcpy(result->mutable_data(), span.Data(), span.Size());
126+
return result;
104127
}
105128

106129
std::shared_ptr<arrow::Buffer> LoadNullsBitmap(TRope& source, TMaybe<ui64> nullCount, TMaybe<ui64> bitmapSize) {

ydb/library/yql/public/udf/arrow/memory_pool.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
namespace NYql {
77
namespace NUdf {
88

9+
constexpr size_t ArrowMemoryAlignment = 64;
10+
static_assert((ArrowMemoryAlignment & (ArrowMemoryAlignment - 1)) == 0, "ArrowMemoryAlignment should be power of 2");
11+
912
#if UDF_ABI_COMPATIBILITY_VERSION_CURRENT >= UDF_ABI_COMPATIBILITY_VERSION(2, 37)
1013
arrow::MemoryPool* GetYqlMemoryPool();
1114
#else

0 commit comments

Comments
 (0)