Skip to content

[DO NOT MERGE!] Fix Arrow arena problem #20728

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions yql/essentials/minikql/mkql_alloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

#include <tuple>

namespace NKikimr {
namespace NKikimr::NMiniKQL {

namespace NMiniKQL {
static ui8 ZeroSizeObject alignas(ArrowAlignment)[0];

constexpr ui64 ArrowSizeForArena = (TAllocState::POOL_PAGE_SIZE >> 2);

Expand Down Expand Up @@ -268,7 +268,14 @@ void TPagedArena::Clear() noexcept {
}
}

namespace {

void* MKQLArrowAllocateOnArena(ui64 size) {
Y_ENSURE(size);
// If size is zero we can get in trouble: when `page->Offset == page->Size`.
// The zero size leads to return `ptr` just after the current page.
// Then getting start of page for such pointer returns next page - which may be unmapped or unrelevant to `ptr`.

TAllocState* state = TlsAllocState;
Y_ENSURE(state);

Expand All @@ -287,8 +294,8 @@ void* MKQLArrowAllocateOnArena(ui64 size) {

page = (TMkqlArrowHeader*)GetAlignedPage();
NYql::NUdf::SanitizerMakeRegionAccessible(page, sizeof(TMkqlArrowHeader));
page->Offset = sizeof(TMkqlArrowHeader);
page->Size = pageSize;
page->Offset = 0;
page->Size = pageSize - sizeof(TMkqlArrowHeader); // for consistency with CleanupArrowList()
page->UseCount = 1;

if (state->EnableArrowTracking) {
Expand All @@ -299,14 +306,20 @@ void* MKQLArrowAllocateOnArena(ui64 size) {
}
}

void* ptr = (ui8*)page + page->Offset;
void* ptr = (ui8*)page + page->Offset + sizeof(TMkqlArrowHeader);
page->Offset += alignedSize;
++page->UseCount;

Y_DEBUG_ABORT_UNLESS(TAllocState::GetPageStart(ptr) == page);

return ptr;
}

namespace {
void* MKQLArrowAllocateImpl(ui64 size) {
if (Y_UNLIKELY(size == 0)) {
return reinterpret_cast<void*>(ZeroSizeObject);
}

if (Y_LIKELY(!TAllocState::IsDefaultAllocatorUsed())) {
if (size <= ArrowSizeForArena) {
return MKQLArrowAllocateOnArena(size);
Expand Down Expand Up @@ -345,28 +358,14 @@ void* MKQLArrowAllocateImpl(ui64 size) {
header->Size = size;
return header + 1;
}
} // namespace

void* MKQLArrowAllocate(ui64 size) {
auto sizeWithRedzones = NYql::NUdf::GetSizeToAlloc(size);
void* mem = MKQLArrowAllocateImpl(sizeWithRedzones);
return NYql::NUdf::WrapPointerWithRedZones(mem, sizeWithRedzones);
}

void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
auto res = MKQLArrowAllocate(size);
memcpy(res, mem, Min(prevSize, size));
MKQLArrowFree(mem, prevSize);
return res;
}

void MKQLArrowFreeOnArena(const void* ptr) {
auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(ptr);
if (page->UseCount.fetch_sub(1) == 1) {
if (!page->Entry.IsUnlinked()) {
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
state->OffloadFree(page->Size);
state->OffloadFree(page->Size + sizeof(TMkqlArrowHeader));
page->Entry.Unlink();

auto it = state->ArrowBuffers.find(page);
Expand All @@ -380,8 +379,12 @@ void MKQLArrowFreeOnArena(const void* ptr) {
return;
}

namespace {
void MKQLArrowFreeImpl(const void* mem, ui64 size) {
if (Y_UNLIKELY(mem == reinterpret_cast<const void*>(ZeroSizeObject))) {
Y_DEBUG_ABORT_UNLESS(size == 0);
return;
}

if (Y_LIKELY(!TAllocState::IsDefaultAllocatorUsed())) {
if (size <= ArrowSizeForArena) {
return MKQLArrowFreeOnArena(mem);
Expand Down Expand Up @@ -409,15 +412,34 @@ void MKQLArrowFreeImpl(const void* mem, ui64 size) {

ReleaseAlignedPage(header, fullSize);
}

} // namespace

void* MKQLArrowAllocate(ui64 size) {
auto sizeWithRedzones = NYql::NUdf::GetSizeToAlloc(size);
void* mem = MKQLArrowAllocateImpl(sizeWithRedzones);
return NYql::NUdf::WrapPointerWithRedZones(mem, sizeWithRedzones);
}

void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
auto res = MKQLArrowAllocate(size);
memcpy(res, mem, Min(prevSize, size));
MKQLArrowFree(mem, prevSize);
return res;
}

void MKQLArrowFree(const void* mem, ui64 size) {
mem = NYql::NUdf::UnwrapPointerWithRedZones(mem, size);
auto sizeWithRedzones = NYql::NUdf::GetSizeToAlloc(size);
return MKQLArrowFreeImpl(mem, sizeWithRedzones);
}

void MKQLArrowUntrack(const void* mem, ui64 size) {
if (Y_UNLIKELY(mem == reinterpret_cast<const void*>(ZeroSizeObject))) {
Y_DEBUG_ABORT_UNLESS(size == 0);
return;
}

mem = NYql::NUdf::GetOriginalAllocatedObject(mem, size);
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
Expand All @@ -437,7 +459,7 @@ void MKQLArrowUntrack(const void* mem, ui64 size) {
if (!page->Entry.IsUnlinked()) {
page->Entry.Unlink(); // unlink page immediately so we don't accidentally free untracked memory within `TAllocState`
state->ArrowBuffers.erase(it);
state->OffloadFree(page->Size);
state->OffloadFree(page->Size + sizeof(TMkqlArrowHeader));
}

return;
Expand All @@ -459,6 +481,4 @@ void MKQLArrowUntrack(const void* mem, ui64 size) {
}
}

} // NMiniKQL

} // NKikimr
} // namespace NKikimr::NMiniKQL
Loading