Skip to content

Commit 71d6e59

Browse files
committed
Added blocks splitting into dq channels
1 parent 0ce9793 commit 71d6e59

13 files changed

+533
-40
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
8686
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
8787
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
8888
std::atomic<ui64> MinChannelBufferSize = 0;
89+
std::atomic<ui64> ChannelChunkSizeLimit = 48_MB;
8990
std::atomic<ui64> MinMemAllocSize = 8_MB;
9091
std::atomic<ui64> MinMemFreeSize = 32_MB;
9192

@@ -106,6 +107,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
106107
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
107108
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
108109
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110+
ChannelChunkSizeLimit.store(config.GetChannelChunkSizeLimit());
109111
MinMemAllocSize.store(config.GetMinMemAllocSize());
110112
MinMemFreeSize.store(config.GetMinMemFreeSize());
111113
}
@@ -142,6 +144,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
142144

143145
memoryLimits.ChannelBufferSize = std::max<ui32>(estimation.ChannelBufferMemoryLimit / std::max<ui32>(1, inputChannelsCount), MinChannelBufferSize.load());
144146
memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize;
147+
memoryLimits.ChunkSizeLimit = ChannelChunkSizeLimit.load();
145148
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info")
146149
("ch_size", estimation.ChannelBufferMemoryLimit)
147150
("ch_count", estimation.ChannelBuffersCount)

ydb/core/protos/table_service_config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ message TTableServiceConfig {
2222
}
2323
optional uint32 ComputeActorsCount = 1 [default = 10000];
2424
optional uint64 ChannelBufferSize = 2 [default = 8388608]; // 8 MB
25+
optional uint64 ChannelChunkSizeLimit = 30 [default = 50331648]; // 48 MB
2526
reserved 3;
2627
optional uint64 MkqlLightProgramMemoryLimit = 4 [default = 1048576]; // 1 MiB
2728
optional uint64 MkqlHeavyProgramMemoryLimit = 5 [default = 31457280]; // 30 MB

ydb/library/yql/dq/actors/compute/dq_compute_actor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ struct TComputeMemoryLimits {
376376
ui64 MinMemAllocSize = 30_MB;
377377
ui64 MinMemFreeSize = 30_MB;
378378
ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
379+
ui64 ChunkSizeLimit = 48_MB;
379380
TMaybe<ui8> ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer
380381

381382
IMemoryQuotaManager::TPtr MemoryQuotaManager;

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10891089

10901090
Channels->SetOutputChannelPeer(channelUpdate.GetId(), peer);
10911091
outputChannel->HasPeer = true;
1092+
outputChannel->Channel->UpdateSettings({.IsLocalChannel = peer.NodeId() == this->SelfId().NodeId()});
10921093

10931094
continue;
10941095
}

ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,11 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
207207
TDqTaskRunnerMemoryLimits limits;
208208
limits.ChannelBufferSize = this->MemoryLimits.ChannelBufferSize;
209209
limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize;
210+
limits.ChunkSizeLimit = this->MemoryLimits.ChunkSizeLimit;
210211

211212
if (!limits.OutputChunkMaxSize) {
212213
limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
213-
}
214+
}
214215

215216
if (this->Task.GetEnableSpilling()) {
216217
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));

ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp

Lines changed: 216 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
#include "dq_arrow_helpers.h"
22

33
#include <cstddef>
4-
#include <yql/essentials/public/udf/udf_value.h>
5-
#include <yql/essentials/minikql/defs.h>
4+
#include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
65
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
6+
#include <yql/essentials/minikql/defs.h>
77
#include <yql/essentials/minikql/mkql_node.h>
8+
#include <yql/essentials/public/udf/arrow/defs.h>
9+
#include <yql/essentials/public/udf/arrow/memory_pool.h>
10+
#include <yql/essentials/public/udf/arrow/util.h>
11+
#include <yql/essentials/public/udf/udf_value.h>
812

13+
#include <ydb/library/formats/arrow/size_calcer.h>
914
#include <ydb/library/yverify_stream/yverify_stream.h>
1015
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
1116

@@ -942,6 +947,214 @@ std::shared_ptr<arrow::Array> DeserializeArray(const std::string& blob, std::sha
942947
return (*batch)->column(0);
943948
}
944949

950+
// Block splitter
951+
952+
namespace {
953+
954+
class TBlockSplitter : public IBlockSplitter {
955+
class TItem {
956+
public:
957+
TItem(TBlockSplitter& self, const NUdf::TUnboxedValuePod* values)
958+
: Self(self)
959+
{
960+
Data.reserve(Self.Width);
961+
ArraysIdx.reserve(Self.Width);
962+
for (ui64 i = 0; i < Self.Width; ++i) {
963+
auto datum = TBlockSplitter::ExtractDatum(values[i]);
964+
if (datum.is_scalar()) {
965+
ScalarsSize += Self.GetDatumMemorySize(i, datum);
966+
} else {
967+
ArraysIdx.emplace_back(i);
968+
}
969+
Data.emplace_back(std::move(datum));
970+
}
971+
972+
NumberRows = Data.back().scalar_as<arrow::UInt64Scalar>().value;
973+
UpdateArraysSize();
974+
}
975+
976+
TItem(TBlockSplitter& self, std::vector<arrow::Datum>&& data, const std::vector<ui64>& arraysIdx, ui64 numberRows, ui64 scalarsSize)
977+
: Self(self)
978+
, Data(std::move(data))
979+
, ArraysIdx(arraysIdx)
980+
, NumberRows(numberRows)
981+
, ScalarsSize(scalarsSize)
982+
{
983+
UpdateArraysSize();
984+
}
985+
986+
ui64 GetNumberRows() const {
987+
return NumberRows;
988+
}
989+
990+
ui64 GetSize() const {
991+
return ScalarsSize + ArraysSize;
992+
}
993+
994+
std::vector<arrow::Datum> ExtractData() {
995+
std::vector<arrow::Datum> result(std::move(Data));
996+
for (ui64 i : ArraysIdx) {
997+
result[i] = Self.GetColumnTrimmer(i).Trim(result[i].array());
998+
}
999+
result.back() = arrow::Datum(std::make_shared<arrow::UInt64Scalar>(NumberRows));
1000+
return result;
1001+
}
1002+
1003+
TItem PopBack(ui64 length) {
1004+
MKQL_ENSURE(length <= NumberRows, "Can not pop more than number of rows");
1005+
std::vector<arrow::Datum> backData = Data;
1006+
for (ui64 i : ArraysIdx) {
1007+
auto array = Data[i].array();
1008+
Data[i] = NUdf::Chop(array, NumberRows - length);
1009+
backData[i] = array;
1010+
}
1011+
1012+
NumberRows -= length;
1013+
UpdateArraysSize();
1014+
1015+
return TItem(Self, std::move(backData), ArraysIdx, length, ScalarsSize);
1016+
}
1017+
1018+
private:
1019+
void UpdateArraysSize() {
1020+
ArraysSize = 0;
1021+
for (ui64 i : ArraysIdx) {
1022+
ArraysSize += NKikimr::NArrow::GetArrayDataSize(Data[i].make_array());
1023+
}
1024+
}
1025+
1026+
private:
1027+
TBlockSplitter& Self;
1028+
std::vector<arrow::Datum> Data;
1029+
std::vector<ui64> ArraysIdx;
1030+
ui64 NumberRows = 0;
1031+
ui64 ScalarsSize = 0;
1032+
ui64 ArraysSize = 0;
1033+
};
1034+
1035+
public:
1036+
TBlockSplitter(const TVector<const TBlockType*>& items, ui64 chunkSizeLimit, arrow::MemoryPool* pool)
1037+
: Items(items)
1038+
, Width(items.size())
1039+
, ChunkSizeLimit(chunkSizeLimit)
1040+
, ArrowPool(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
1041+
, ScalarSizes(Width)
1042+
, BlockTrimmers(Width)
1043+
{}
1044+
1045+
bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) override {
1046+
MKQL_ENSURE(count == Width, "Invalid width");
1047+
1048+
ui64 itemSize = 0;
1049+
for (size_t i = 0; i < Width; ++i) {
1050+
itemSize += GetDatumMemorySize(i, ExtractDatum(values[i]));
1051+
}
1052+
return itemSize > ChunkSizeLimit;
1053+
}
1054+
1055+
std::vector<std::vector<arrow::Datum>> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) override {
1056+
MKQL_ENSURE(count == Width, "Invalid width");
1057+
1058+
SplitStack.clear();
1059+
SplitStack.emplace_back(*this, values);
1060+
std::vector<std::vector<arrow::Datum>> result;
1061+
1062+
const auto estimatedSize = SplitStack.back().GetSize() / std::max(ChunkSizeLimit, ui64(1));
1063+
result.reserve(estimatedSize);
1064+
SplitStack.reserve(estimatedSize);
1065+
while (!SplitStack.empty()) {
1066+
auto item = std::move(SplitStack.back());
1067+
SplitStack.pop_back();
1068+
1069+
while (item.GetSize() > ChunkSizeLimit) {
1070+
if (item.GetNumberRows() <= 1) {
1071+
throw yexception() << "Row size in block is " << item.GetSize() << ", that is larger than allowed limit " << ChunkSizeLimit;
1072+
}
1073+
SplitStack.emplace_back(item.PopBack(item.GetNumberRows() / 2));
1074+
}
1075+
result.emplace_back(item.ExtractData());
1076+
}
1077+
return result;
1078+
}
1079+
1080+
private:
1081+
static arrow::Datum ExtractDatum(const NUdf::TUnboxedValuePod& value) {
1082+
arrow::Datum datum = TArrowBlock::From(value).GetDatum();
1083+
MKQL_ENSURE(datum.is_array() || datum.is_scalar(), "Expecting array or scalar");
1084+
return datum;
1085+
}
1086+
1087+
ui64 GetDatumMemorySize(ui64 index, const arrow::Datum& datum) {
1088+
MKQL_ENSURE(index < Width, "Invalid index");
1089+
if (datum.is_array()) {
1090+
return NKikimr::NArrow::GetArrayMemorySize(datum.array());
1091+
}
1092+
1093+
if (!ScalarSizes[index]) {
1094+
const auto& array = ARROW_RESULT(arrow::MakeArrayFromScalar(*datum.scalar(), 1));
1095+
ScalarSizes[index] = NKikimr::NArrow::GetArrayMemorySize(array->data());
1096+
}
1097+
return *ScalarSizes[index];
1098+
}
1099+
1100+
IBlockTrimmer& GetColumnTrimmer(ui64 index) {
1101+
MKQL_ENSURE(index < Width, "Invalid index");
1102+
if (!BlockTrimmers[index]) {
1103+
BlockTrimmers[index] = MakeBlockTrimmer(TTypeInfoHelper(), Items[index]->GetItemType(), &ArrowPool);
1104+
}
1105+
return *BlockTrimmers[index];
1106+
}
1107+
1108+
private:
1109+
const TVector<const TBlockType*> Items;
1110+
const ui64 Width;
1111+
const ui64 ChunkSizeLimit;
1112+
arrow::MemoryPool& ArrowPool;
1113+
1114+
std::vector<std::optional<ui64>> ScalarSizes;
1115+
std::vector<IBlockTrimmer::TPtr> BlockTrimmers;
1116+
std::vector<TItem> SplitStack;
1117+
};
1118+
1119+
} // namespace
1120+
1121+
IBlockSplitter::TPtr CreateBlockSplitter(const TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool) {
1122+
if (!type->IsMulti()) {
1123+
return nullptr;
1124+
}
1125+
1126+
const TMultiType* multiType = static_cast<const TMultiType*>(type);
1127+
const ui32 width = multiType->GetElementsCount();
1128+
if (!width) {
1129+
return nullptr;
1130+
}
1131+
1132+
TVector<const TBlockType*> items;
1133+
items.reserve(width);
1134+
for (ui32 i = 0; i < width; i++) {
1135+
const auto type = multiType->GetElementType(i);
1136+
if (!type->IsBlock()) {
1137+
return nullptr;
1138+
}
1139+
1140+
const TBlockType* blockType = static_cast<const TBlockType*>(type);
1141+
if (i == width - 1) {
1142+
if (blockType->GetShape() != TBlockType::EShape::Scalar) {
1143+
return nullptr;
1144+
}
1145+
if (!blockType->GetItemType()->IsData()) {
1146+
return nullptr;
1147+
}
1148+
if (static_cast<const TDataType*>(blockType->GetItemType())->GetDataSlot() != NUdf::EDataSlot::Uint64) {
1149+
return nullptr;
1150+
}
1151+
}
1152+
1153+
items.push_back(blockType);
1154+
}
1155+
1156+
return MakeIntrusive<TBlockSplitter>(items, chunkSizeLimit, pool);
1157+
}
1158+
9451159
} // namespace NArrow
9461160
} // namespace NYql
947-

ydb/library/yql/dq/runtime/dq_arrow_helpers.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,16 @@ std::shared_ptr<arrow::Array> DeserializeArray(const std::string& blob, std::sha
8484
*/
8585
void AppendElement(NYql::NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, const NKikimr::NMiniKQL::TType* type);
8686

87+
class IBlockSplitter : public TThrRefBase {
88+
public:
89+
using TPtr = TIntrusivePtr<IBlockSplitter>;
90+
91+
virtual bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) = 0;
92+
93+
virtual std::vector<std::vector<arrow::Datum>> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) = 0;
94+
};
95+
96+
IBlockSplitter::TPtr CreateBlockSplitter(const NKikimr::NMiniKQL::TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool = nullptr);
8797

8898
} // NArrow
8999
} // NYql

0 commit comments

Comments
 (0)