Skip to content

Commit cc183d2

Browse files
ilezhankinGrigoriyPA
authored andcommitted
Implement and use a page arena for small Arrow allocations
commit_hash:2bcb57a12fbb750db7b33872e2cfbec66bdf6405
1 parent 9cdc396 commit cc183d2

File tree

5 files changed

+156
-5
lines changed

5 files changed

+156
-5
lines changed

yql/essentials/minikql/aligned_page_pool.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,40 @@ void* GetAlignedPage(ui64 size) {
742742
return mem;
743743
}
744744

745+
template<typename TMmap>
746+
void* GetAlignedPage() {
747+
const auto size = TAlignedPagePool::POOL_PAGE_SIZE;
748+
auto& globalPool = TGlobalPools<TMmap, false>::Instance();
749+
750+
if (auto* page = globalPool.Get(0).GetPage()) {
751+
return page;
752+
}
753+
754+
auto allocSize = size * 2;
755+
void* unalignedPtr = globalPool.DoMmap(allocSize);
756+
if (Y_UNLIKELY(MAP_FAILED == unalignedPtr)) {
757+
TStringStream mmaps;
758+
const auto lastError = LastSystemError();
759+
if (lastError == ENOMEM) {
760+
mmaps << GetMemoryMapsString();
761+
}
762+
763+
ythrow yexception() << "Mmap failed to allocate " << allocSize << " bytes: "
764+
<< LastSystemErrorText(lastError) << mmaps.Str();
765+
}
766+
767+
void* page = AlignUp(unalignedPtr, size);
768+
769+
// Unmap unaligned prefix before offset and tail after aligned page
770+
const size_t offset = (intptr_t)page - (intptr_t)unalignedPtr;
771+
if (Y_UNLIKELY(offset)) {
772+
globalPool.DoMunmap(unalignedPtr, offset);
773+
globalPool.DoMunmap((ui8*)page + size, size - offset);
774+
}
775+
776+
return page;
777+
}
778+
745779
template<typename TMmap>
746780
void ReleaseAlignedPage(void* mem, ui64 size) {
747781
size = AlignUp(size, SYS_PAGE_SIZE);
@@ -760,6 +794,11 @@ void ReleaseAlignedPage(void* mem, ui64 size) {
760794
TGlobalPools<TMmap, true>::Instance().DoMunmap(mem, size);
761795
}
762796

797+
template<typename TMmap>
798+
void ReleaseAlignedPage(void* ptr) {
799+
TGlobalPools<TMmap, false>::Instance().PushPage(0, ptr);
800+
}
801+
763802
template<typename TMmap>
764803
i64 GetTotalMmapedBytes() {
765804
return TGlobalPools<TMmap, true>::Instance().GetTotalMmappedBytes() + TGlobalPools<TMmap, false>::Instance().GetTotalMmappedBytes();
@@ -782,10 +821,18 @@ template void* GetAlignedPage<>(ui64);
782821
template void* GetAlignedPage<TFakeAlignedMmap>(ui64);
783822
template void* GetAlignedPage<TFakeUnalignedMmap>(ui64);
784823

824+
template void* GetAlignedPage<>();
825+
template void* GetAlignedPage<TFakeAlignedMmap>();
826+
template void* GetAlignedPage<TFakeUnalignedMmap>();
827+
785828
template void ReleaseAlignedPage<>(void*,ui64);
786829
template void ReleaseAlignedPage<TFakeAlignedMmap>(void*,ui64);
787830
template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*,ui64);
788831

832+
template void ReleaseAlignedPage<>(void*);
833+
template void ReleaseAlignedPage<TFakeAlignedMmap>(void*);
834+
template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*);
835+
789836
size_t GetMemoryMapsCount() {
790837
size_t lineCount = 0;
791838
TString line;

yql/essentials/minikql/aligned_page_pool.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,15 @@ using TAlignedPagePool = TAlignedPagePoolImpl<>;
308308
template<typename TMmap = TSystemMmap>
309309
void* GetAlignedPage(ui64 size);
310310

311+
template<typename TMmap = TSystemMmap>
312+
void* GetAlignedPage();
313+
311314
template<typename TMmap = TSystemMmap>
312315
void ReleaseAlignedPage(void* mem, ui64 size);
313316

317+
template<typename TMmap = TSystemMmap>
318+
void ReleaseAlignedPage(void* mem);
319+
314320
template<typename TMmap = TSystemMmap>
315321
i64 GetTotalMmapedBytes();
316322
template<typename TMmap = TSystemMmap>

yql/essentials/minikql/computation/mkql_block_transport.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace {
1414
using NYql::TChunkedBuffer;
1515

1616
TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
17-
MKQLArrowUntrack(owner->data());
17+
MKQLArrowUntrack(owner->data(), owner->capacity());
1818
return TChunkedBuffer(TStringBuf{data, size}, owner);
1919
}
2020

yql/essentials/minikql/mkql_alloc.cpp

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ namespace NKikimr {
77

88
namespace NMiniKQL {
99

10+
constexpr ui64 ArrowSizeForArena = (TAllocState::POOL_PAGE_SIZE >> 2);
11+
1012
Y_POD_THREAD(TAllocState*) TlsAllocState;
1113

1214
TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
@@ -94,6 +96,10 @@ void TAllocState::KillAllBoxed() {
9496
OffloadedBlocksRoot.InitLinks();
9597
}
9698

99+
if (CurrentArrowPages) {
100+
MKQLArrowFree(CurrentArrowPages, 0);
101+
CurrentArrowPages = nullptr;
102+
}
97103
CleanupArrowList(&ArrowBlocksRoot);
98104

99105
#ifndef NDEBUG
@@ -253,7 +259,49 @@ void TPagedArena::Clear() noexcept {
253259
}
254260
}
255261

262+
void* MKQLArrowAllocateOnArena(ui64 size) {
263+
TAllocState* state = TlsAllocState;
264+
Y_ENSURE(state);
265+
266+
auto alignedSize = AlignUp(size, ArrowAlignment);
267+
auto& page = state->CurrentArrowPages;
268+
269+
if (Y_UNLIKELY(!page || page->Offset + alignedSize > page->Size)) {
270+
const auto pageSize = TAllocState::POOL_PAGE_SIZE;
271+
272+
if (state->EnableArrowTracking) {
273+
state->OffloadAlloc(pageSize);
274+
}
275+
276+
if (page) {
277+
MKQLArrowFree(page, 0);
278+
}
279+
280+
page = (TMkqlArrowHeader*)GetAlignedPage();
281+
page->Offset = sizeof(TMkqlArrowHeader);
282+
page->Size = pageSize;
283+
page->UseCount = 1;
284+
285+
if (state->EnableArrowTracking) {
286+
page->Entry.Link(&state->ArrowBlocksRoot);
287+
Y_ENSURE(state->ArrowBuffers.insert(page).second);
288+
} else {
289+
page->Entry.Clear();
290+
}
291+
}
292+
293+
void* ptr = (ui8*)page + page->Offset;
294+
page->Offset += alignedSize;
295+
++page->UseCount;
296+
297+
return ptr;
298+
}
299+
256300
void* MKQLArrowAllocate(ui64 size) {
301+
if (size <= ArrowSizeForArena) {
302+
return MKQLArrowAllocateOnArena(size);
303+
}
304+
257305
TAllocState* state = TlsAllocState;
258306
Y_ENSURE(state);
259307
auto fullSize = size + sizeof(TMkqlArrowHeader);
@@ -276,6 +324,9 @@ void* MKQLArrowAllocate(ui64 size) {
276324
#endif
277325

278326
auto* header = (TMkqlArrowHeader*)ptr;
327+
header->Offset = 0;
328+
header->UseCount = 0;
329+
279330
if (state->EnableArrowTracking) {
280331
header->Entry.Link(&state->ArrowBlocksRoot);
281332
Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
@@ -294,7 +345,31 @@ void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
294345
return res;
295346
}
296347

348+
void MKQLArrowFreeOnArena(const void* ptr) {
349+
auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(ptr);
350+
if (page->UseCount.fetch_sub(1) == 1) {
351+
if (!page->Entry.IsUnlinked()) {
352+
TAllocState* state = TlsAllocState;
353+
Y_ENSURE(state);
354+
state->OffloadFree(page->Size);
355+
page->Entry.Unlink();
356+
357+
auto it = state->ArrowBuffers.find(page);
358+
Y_ENSURE(it != state->ArrowBuffers.end());
359+
state->ArrowBuffers.erase(it);
360+
}
361+
362+
ReleaseAlignedPage(page);
363+
}
364+
365+
return;
366+
}
367+
297368
void MKQLArrowFree(const void* mem, ui64 size) {
369+
if (size <= ArrowSizeForArena) {
370+
return MKQLArrowFreeOnArena(mem);
371+
}
372+
298373
auto fullSize = size + sizeof(TMkqlArrowHeader);
299374
auto header = ((TMkqlArrowHeader*)mem) - 1;
300375
if (!header->Entry.IsUnlinked()) {
@@ -318,19 +393,37 @@ void MKQLArrowFree(const void* mem, ui64 size) {
318393
ReleaseAlignedPage(header, fullSize);
319394
}
320395

321-
void MKQLArrowUntrack(const void* mem) {
396+
void MKQLArrowUntrack(const void* mem, ui64 size) {
322397
TAllocState* state = TlsAllocState;
323398
Y_ENSURE(state);
324399
if (!state->EnableArrowTracking) {
325400
return;
326401
}
327402

403+
if (size <= ArrowSizeForArena) {
404+
auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(mem);
405+
406+
auto it = state->ArrowBuffers.find(page);
407+
if (it == state->ArrowBuffers.end()) {
408+
return;
409+
}
410+
411+
if (!page->Entry.IsUnlinked()) {
412+
page->Entry.Unlink(); // unlink page immediately so we don't accidentally free untracked memory within `TAllocState`
413+
state->ArrowBuffers.erase(it);
414+
state->OffloadFree(page->Size);
415+
}
416+
417+
return;
418+
}
419+
328420
auto it = state->ArrowBuffers.find(mem);
329421
if (it == state->ArrowBuffers.end()) {
330422
return;
331423
}
332424

333-
auto header = ((TMkqlArrowHeader*)mem) - 1;
425+
auto* header = ((TMkqlArrowHeader*)mem) - 1;
426+
Y_ENSURE(header->UseCount == 0);
334427
if (!header->Entry.IsUnlinked()) {
335428
header->Entry.Unlink();
336429
auto fullSize = header->Size + sizeof(TMkqlArrowHeader);

yql/essentials/minikql/mkql_alloc.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ constexpr ui32 MaxPageUserData = TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TAllo
4141

4242
static_assert(sizeof(TAllocPageHeader) % MKQL_ALIGNMENT == 0, "Incorrect size of header");
4343

44+
struct TMkqlArrowHeader;
45+
4446
struct TAllocState : public TAlignedPagePool
4547
{
4648
struct TListEntry {
@@ -90,6 +92,7 @@ struct TAllocState : public TAlignedPagePool
9092
TListEntry GlobalPAllocList;
9193
TListEntry* CurrentPAllocList;
9294
TListEntry ArrowBlocksRoot;
95+
TMkqlArrowHeader* CurrentArrowPages = nullptr; // page arena for small arrow allocations
9396
std::unordered_set<const void*> ArrowBuffers;
9497
bool EnableArrowTracking = true;
9598

@@ -186,7 +189,9 @@ constexpr size_t ArrowAlignment = 64;
186189
struct TMkqlArrowHeader {
187190
TAllocState::TListEntry Entry;
188191
ui64 Size;
189-
char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
192+
ui64 Offset;
193+
std::atomic<ui64> UseCount;
194+
char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64) - sizeof(ui64) - sizeof(std::atomic<ui64>)];
190195
};
191196

192197
static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);
@@ -441,7 +446,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
441446
void* MKQLArrowAllocate(ui64 size);
442447
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
443448
void MKQLArrowFree(const void* mem, ui64 size);
444-
void MKQLArrowUntrack(const void* mem);
449+
void MKQLArrowUntrack(const void* mem, ui64 size);
445450

446451
template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
447452
struct TWithMiniKQLAlloc {

0 commit comments

Comments
 (0)