Skip to content

Commit f22b617

Browse files
committed
Moved blocks splitting into channels
1 parent 0ce9793 commit f22b617

File tree

16 files changed

+333
-47
lines changed

16 files changed

+333
-47
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
8686
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
8787
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
8888
std::atomic<ui64> MinChannelBufferSize = 0;
89+
std::atomic<ui64> ChannelChunkSizeLimit = 48_MB;
8990
std::atomic<ui64> MinMemAllocSize = 8_MB;
9091
std::atomic<ui64> MinMemFreeSize = 32_MB;
9192

@@ -106,6 +107,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
106107
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
107108
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
108109
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110+
ChannelChunkSizeLimit.store(config.GetChannelChunkSizeLimit());
109111
MinMemAllocSize.store(config.GetMinMemAllocSize());
110112
MinMemFreeSize.store(config.GetMinMemFreeSize());
111113
}
@@ -142,6 +144,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
142144

143145
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
144146
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
147+
memoryLimits.ChunkSizeLimit = ChannelChunkSizeLimit.load();
145148
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
146149
("ch_size", estimation.ChannelBufferMemoryLimit)
147150
("ch_count", estimation.ChannelBuffersCount)

ydb/core/protos/table_service_config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ message TTableServiceConfig {
2222
}
2323
optional uint32 ComputeActorsCount = 1 [default = 10000];
2424
optional uint64 ChannelBufferSize = 2 [default = 8388608]; // 8 MB
25+
optional uint64 ChannelChunkSizeLimit = 30 [default = 50331648]; // 48 MB
2526
reserved 3;
2627
optional uint64 MkqlLightProgramMemoryLimit = 4 [default = 1048576]; // 1 MiB
2728
optional uint64 MkqlHeavyProgramMemoryLimit = 5 [default = 31457280]; // 30 MB

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ struct TComputeMemoryLimits {
376376
ui64 MinMemAllocSize = 30_MB;
377377
ui64 MinMemFreeSize = 30_MB;
378378
ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
379+
ui64 ChunkSizeLimit = 48_MB;
379380
TMaybe<ui8> ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer
380381

381382
IMemoryQuotaManager::TPtr MemoryQuotaManager;

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10891089

10901090
Channels->SetOutputChannelPeer(channelUpdate.GetId(), peer);
10911091
outputChannel->HasPeer = true;
1092+
outputChannel->Channel->UpdateSettings({.IsLocalChannel = peer.NodeId() == this->SelfId().NodeId()});
1093+
Cerr << "-------------------------------- TDqComputeActorBase::HandleExecuteBase(TEvChannelsInfo), channel src " << peer.NodeId() << ", dst " << this->SelfId().NodeId() << "\n";
10921094

10931095
continue;
10941096
}

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,11 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
207207
TDqTaskRunnerMemoryLimits limits;
208208
limits.ChannelBufferSize = this->MemoryLimits.ChannelBufferSize;
209209
limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize;
210+
limits.ChunkSizeLimit = this->MemoryLimits.ChunkSizeLimit;
210211

211212
if (!limits.OutputChunkMaxSize) {
212213
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
213-
}
214+
}
214215

215216
if (this->Task.GetEnableSpilling()) {
216217
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));

ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp

Lines changed: 199 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
#include "dq_arrow_helpers.h"
22

33
#include <cstddef>
4-
#include <yql/essentials/public/udf/udf_value.h>
5-
#include <yql/essentials/minikql/defs.h>
4+
#include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
65
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
6+
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
7+
#include <yql/essentials/minikql/defs.h>
78
#include <yql/essentials/minikql/mkql_node.h>
9+
#include <yql/essentials/public/udf/arrow/memory_pool.h>
10+
#include <yql/essentials/public/udf/arrow/util.h>
11+
#include <yql/essentials/public/udf/udf_value.h>
812

13+
#include <ydb/library/formats/arrow/size_calcer.h>
914
#include <ydb/library/yverify_stream/yverify_stream.h>
1015
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
1116

@@ -942,6 +947,197 @@ std::shared_ptr<arrow::Array> DeserializeArray(const std::string& blob, std::sha
942947
return (*batch)->column(0);
943948
}
944949

950+
// Block splitter
951+
952+
namespace {
953+
954+
class TBlockSplitter : public IBlockSplitter {
955+
class TItem {
956+
public:
957+
TItem(TBlockSplitter& self, const NUdf::TUnboxedValuePod* values)
958+
: Self(self)
959+
{
960+
Data.reserve(Self.Width);
961+
ArraysIdx.reserve(Self.Width);
962+
for (ui64 i = 0; i < Self.Width; ++i) {
963+
auto datum = TBlockSplitter::ExtractDatum(values[i]);
964+
if (datum.is_scalar()) {
965+
ScalarsSize += NUdf::GetSizeOfDatumInBytes(datum);
966+
} else {
967+
ArraysIdx.emplace_back(i);
968+
}
969+
if (i == Self.BlockLenIndex) {
970+
NumberRows = datum.scalar_as<arrow::UInt64Scalar>().value;
971+
}
972+
Data.emplace_back(std::move(datum));
973+
}
974+
UpdateArraysSize();
975+
}
976+
977+
TItem(TBlockSplitter& self, std::vector<arrow::Datum>&& data, const std::vector<ui64>& arraysIdx, ui64 numberRows, ui64 scalarsSize)
978+
: Self(self)
979+
, Data(std::move(data))
980+
, ArraysIdx(arraysIdx)
981+
, NumberRows(numberRows)
982+
, ScalarsSize(scalarsSize)
983+
{
984+
UpdateArraysSize();
985+
}
986+
987+
ui64 GetNumberRows() const {
988+
return NumberRows;
989+
}
990+
991+
ui64 GetSize() const {
992+
return ScalarsSize + ArraysSize;
993+
}
994+
995+
std::vector<arrow::Datum>&& ExtractData() {
996+
std::vector<arrow::Datum> result(std::move(Data));
997+
for (ui64 i : ArraysIdx) {
998+
result[i] = Self.GetColumnTrimmer(i).Trim(result[i].array());
999+
}
1000+
result[Self.BlockLenIndex] = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(NumberRows));
1001+
return std::move(Data);
1002+
}
1003+
1004+
TItem PopBack(ui64 length) {
1005+
std::vector<arrow::Datum> backData = Data;
1006+
const auto offset = NumberRows - length;
1007+
for (ui64 i : ArraysIdx) {
1008+
const auto& array = *Data[i].array();
1009+
Data[i] = arrow::Datum(array.Slice(0, offset));
1010+
backData[i] = arrow::Datum(array.Slice(offset, length));
1011+
}
1012+
1013+
NumberRows -= length;
1014+
UpdateArraysSize();
1015+
1016+
return TItem(Self, std::move(backData), ArraysIdx, length, ScalarsSize);
1017+
}
1018+
1019+
private:
1020+
void UpdateArraysSize() {
1021+
ArraysSize = 0;
1022+
for (ui64 i : ArraysIdx) {
1023+
ArraysSize += NKikimr::NArrow::GetArrayDataSize(Data[i].make_array());
1024+
}
1025+
}
1026+
1027+
private:
1028+
TBlockSplitter& Self;
1029+
std::vector<arrow::Datum> Data;
1030+
std::vector<ui64> ArraysIdx;
1031+
ui64 NumberRows = 0;
1032+
ui64 ScalarsSize = 0;
1033+
ui64 ArraysSize = 0;
1034+
};
1035+
1036+
public:
1037+
TBlockSplitter(const TType* type, ui64 width, ui32 blockLenIndex, bool isLegacyBlock, ui64 chunkSizeLimit, arrow::MemoryPool* pool)
1038+
: Type(type)
1039+
, Width(width)
1040+
, BlockLenIndex(blockLenIndex)
1041+
, IsLegacyBlock(isLegacyBlock)
1042+
, ChunkSizeLimit(chunkSizeLimit)
1043+
, ArrowPool(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
1044+
{}
1045+
1046+
bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) const override {
1047+
values = UnwrapItem(values, count);
1048+
1049+
ui64 itemSize = 0;
1050+
for (size_t i = 0; i < Width; ++i) {
1051+
itemSize += NUdf::GetSizeOfDatumInBytes(ExtractDatum(values[i]));
1052+
}
1053+
return itemSize > ChunkSizeLimit;
1054+
}
1055+
1056+
std::vector<std::vector<arrow::Datum>> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) override {
1057+
values = UnwrapItem(values, count);
1058+
1059+
SplitStack.clear();
1060+
SplitStack.emplace_back(values, count, BlockLenIndex);
1061+
std::vector<std::vector<arrow::Datum>> result;
1062+
1063+
const auto estimatedSize = SplitStack.back().GetSize() / ChunkSizeLimit;
1064+
result.reserve(estimatedSize);
1065+
SplitStack.reserve(estimatedSize);
1066+
while (!SplitStack.empty()) {
1067+
auto item = std::move(SplitStack.back());
1068+
SplitStack.pop_back();
1069+
1070+
while (item.GetSize() > ChunkSizeLimit) {
1071+
if (item.GetNumberRows() <= 1) {
1072+
throw yexception() << "Row size in block is " << item.GetSize() << ", that is larger than allowed limit " << ChunkSizeLimit;
1073+
}
1074+
SplitStack.emplace_back(item.PopBack(item.GetNumberRows() / 2));
1075+
}
1076+
1077+
result.emplace_back(std::move(item.ExtractData()));
1078+
}
1079+
1080+
return result;
1081+
}
1082+
1083+
private:
1084+
const NUdf::TUnboxedValuePod* UnwrapItem(const NUdf::TUnboxedValuePod* values, ui32 count) const {
1085+
if (IsLegacyBlock) {
1086+
MKQL_ENSURE(count == 1, "Invalid width");
1087+
values = ExtractLegacyBlock(*values);
1088+
count = Width;
1089+
} else {
1090+
MKQL_ENSURE(count == Width, "Invalid width");
1091+
}
1092+
return values;
1093+
}
1094+
1095+
static arrow::Datum ExtractDatum(const NUdf::TUnboxedValuePod& value) {
1096+
arrow::Datum datum = TArrowBlock::From(value).GetDatum();
1097+
MKQL_ENSURE(datum.is_array() || datum.is_scalar(), "Expecting array or scalar");
1098+
return datum;
1099+
}
1100+
1101+
IBlockTrimmer& GetColumnTrimmer(ui64 index) {
1102+
MKQL_ENSURE(index < Width, "Invalid index");
1103+
if (!BlockTrimmers[index]) {
1104+
BlockTrimmers[index] = MakeBlockTrimmer(TTypeInfoHelper(), GetColumnType(index), &ArrowPool);
1105+
}
1106+
return *BlockTrimmers[index];
1107+
}
1108+
1109+
const TType* GetColumnType(ui64 index) const {
1110+
MKQL_ENSURE(index < Width, "Invalid index");
1111+
return IsLegacyBlock ? static_cast<const TStructType*>(Type)->GetMemberType(index) :
1112+
static_cast<const TMultiType*>(Type)->GetElementType(index);
1113+
}
1114+
1115+
private:
1116+
const TType* Type;
1117+
const ui64 Width;
1118+
const ui32 BlockLenIndex;
1119+
const bool IsLegacyBlock;
1120+
const ui64 ChunkSizeLimit;
1121+
arrow::MemoryPool& ArrowPool;
1122+
1123+
std::vector<IBlockTrimmer::TPtr> BlockTrimmers;
1124+
std::vector<TItem> SplitStack;
1125+
};
1126+
1127+
} // namespace
1128+
1129+
IBlockSplitter::TPtr CreateBlockSplitter(const TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool) {
1130+
TVector<const TBlockType*> items;
1131+
ui32 blockLenIndex = 0;
1132+
bool isLegacyBlock = false;
1133+
if (IsLegacyStructBlock(type, blockLenIndex, items)) {
1134+
isLegacyBlock = true;
1135+
} else if (!IsMultiBlock(type, blockLenIndex, items)) {
1136+
return nullptr;
1137+
}
1138+
1139+
return MakeIntrusive<TBlockSplitter>(items.size(), blockLenIndex, isLegacyBlock, chunkSizeLimit, pool);
1140+
}
1141+
9451142
} // namespace NArrow
9461143
} // namespace NYql
947-

ydb/library/yql/dq/runtime/dq_arrow_helpers.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ std::shared_ptr<arrow::Array> DeserializeArray(const std::string& blob, std::sha
8484
*/
8585
void AppendElement(NYql::NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, const NKikimr::NMiniKQL::TType* type);
8686

87+
class IBlockSplitter : public TThrRefBase {
88+
public:
89+
using TPtr = TIntrusivePtr<IBlockSplitter>;
90+
91+
virtual bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) const = 0;
92+
93+
virtual std::vector<std::vector<arrow::Datum>> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) = 0;
94+
};
95+
96+
IBlockSplitter::TPtr CreateBlockSplitter(const NKikimr::NMiniKQL::TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool = nullptr);
8797

8898
} // NArrow
8999
} // NYql

0 commit comments

Comments
 (0)