diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp index 46267030ba46..f5b6db6c58a1 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp @@ -86,6 +86,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { std::atomic MkqlLightProgramMemoryLimit = 0; std::atomic MkqlHeavyProgramMemoryLimit = 0; std::atomic MinChannelBufferSize = 0; + std::atomic ChannelChunkSizeLimit = 48_MB; std::atomic MinMemAllocSize = 8_MB; std::atomic MinMemFreeSize = 32_MB; @@ -106,6 +107,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit()); MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit()); MinChannelBufferSize.store(config.GetMinChannelBufferSize()); + ChannelChunkSizeLimit.store(config.GetChannelChunkSizeLimit()); MinMemAllocSize.store(config.GetMinMemAllocSize()); MinMemFreeSize.store(config.GetMinMemFreeSize()); } @@ -142,6 +144,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory { memoryLimits.ChannelBufferSize = std::max(estimation.ChannelBufferMemoryLimit / std::max(1, inputChannelsCount), MinChannelBufferSize.load()); memoryLimits.OutputChunkMaxSize = args.OutputChunkMaxSize; + memoryLimits.ChunkSizeLimit = ChannelChunkSizeLimit.load(); AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "channel_info") ("ch_size", estimation.ChannelBufferMemoryLimit) ("ch_count", estimation.ChannelBuffersCount) diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 9fa705484174..a63328a7fc7b 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -22,6 +22,7 @@ message TTableServiceConfig { } optional uint32 ComputeActorsCount = 1 [default = 10000]; optional uint64 ChannelBufferSize = 2 [default = 8388608]; // 8 MB + optional uint64 ChannelChunkSizeLimit = 30 [default = 50331648]; // 48 MB reserved 3; optional uint64 MkqlLightProgramMemoryLimit = 4 [default = 1048576]; // 1 MiB optional uint64 MkqlHeavyProgramMemoryLimit = 5 [default = 31457280]; // 30 MB diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index e9404a8cda62..ae2d3831854f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -376,6 +376,7 @@ struct TComputeMemoryLimits { ui64 MinMemAllocSize = 30_MB; ui64 MinMemFreeSize = 30_MB; ui64 OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; + ui64 ChunkSizeLimit = 48_MB; TMaybe ArrayBufferMinFillPercentage; // Used by DqOutputHashPartitionConsumer IMemoryQuotaManager::TPtr MemoryQuotaManager; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 1b97071e769e..205b87b00bbf 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1094,6 +1094,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped Channels->SetOutputChannelPeer(channelUpdate.GetId(), peer); outputChannel->HasPeer = true; + if (outputChannel->Channel) { + outputChannel->Channel->UpdateSettings({.IsLocalChannel = peer.NodeId() == this->SelfId().NodeId()}); + } continue; } diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 7ccf74b29def..7dc44d6ccc4d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -214,10 +214,11 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseMemoryLimits.ChannelBufferSize; limits.OutputChunkMaxSize = this->MemoryLimits.OutputChunkMaxSize; + limits.ChunkSizeLimit = this->MemoryLimits.ChunkSizeLimit; if (!limits.OutputChunkMaxSize) { limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize; - } + } if (this->Task.GetEnableSpilling()) { TaskRunner->SetSpillerFactory(std::make_shared(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback())); diff --git a/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp b/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp index 7619c4e5f11e..b28ff8644062 100644 --- a/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp +++ b/ydb/library/yql/dq/runtime/dq_arrow_helpers.cpp @@ -1,11 +1,16 @@ #include "dq_arrow_helpers.h" #include -#include -#include +#include #include +#include #include +#include +#include +#include +#include +#include #include #include @@ -942,6 +947,214 @@ std::shared_ptr DeserializeArray(const std::string& blob, std::sha return (*batch)->column(0); } +// Block splitter + +namespace { + +class TBlockSplitter : public IBlockSplitter { + class TItem { + public: + TItem(TBlockSplitter& self, const NUdf::TUnboxedValuePod* values) + : Self(self) + { + Data.reserve(Self.Width); + ArraysIdx.reserve(Self.Width); + for (ui64 i = 0; i < Self.Width; ++i) { + auto datum = TBlockSplitter::ExtractDatum(values[i]); + if (datum.is_scalar()) { + ScalarsSize += Self.GetDatumMemorySize(i, datum); + } else { + ArraysIdx.emplace_back(i); + } + Data.emplace_back(std::move(datum)); + } + + NumberRows = Data.back().scalar_as().value; + UpdateArraysSize(); + } + + TItem(TBlockSplitter& self, std::vector&& data, const std::vector& arraysIdx, ui64 numberRows, ui64 scalarsSize) + : Self(self) + , Data(std::move(data)) + , ArraysIdx(arraysIdx) + , NumberRows(numberRows) + , ScalarsSize(scalarsSize) + { + UpdateArraysSize(); + } + + ui64 GetNumberRows() const { + return NumberRows; + } + + ui64 GetSize() const { + return ScalarsSize + ArraysSize; + } + + std::vector ExtractData() { + std::vector result(std::move(Data)); + for (ui64 i : ArraysIdx) { + result[i] = Self.GetColumnTrimmer(i).Trim(result[i].array()); + } + result.back() = arrow::Datum(std::make_shared(NumberRows)); + return result; + } + + TItem PopBack(ui64 length) { + MKQL_ENSURE(length <= NumberRows, "Can not pop more than number of rows"); + std::vector backData = Data; + for (ui64 i : ArraysIdx) { + auto array = Data[i].array(); + Data[i] = NUdf::Chop(array, NumberRows - length); + backData[i] = array; + } + + NumberRows -= length; + UpdateArraysSize(); + + return TItem(Self, std::move(backData), ArraysIdx, length, ScalarsSize); + } + + private: + void UpdateArraysSize() { + ArraysSize = 0; + for (ui64 i : ArraysIdx) { + ArraysSize += NKikimr::NArrow::GetArrayDataSize(Data[i].make_array()); + } + } + + private: + TBlockSplitter& Self; + std::vector Data; + std::vector ArraysIdx; + ui64 NumberRows = 0; + ui64 ScalarsSize = 0; + ui64 ArraysSize = 0; + }; + +public: + TBlockSplitter(const TVector& items, ui64 chunkSizeLimit, arrow::MemoryPool* pool) + : Items(items) + , Width(items.size()) + , ChunkSizeLimit(chunkSizeLimit) + , ArrowPool(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool()) + , ScalarSizes(Width) + , BlockTrimmers(Width) + {} + + bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) override { + MKQL_ENSURE(count == Width, "Invalid width"); + + ui64 itemSize = 0; + for (size_t i = 0; i < Width; ++i) { + itemSize += GetDatumMemorySize(i, ExtractDatum(values[i])); + } + return itemSize > ChunkSizeLimit; + } + + std::vector> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) override { + MKQL_ENSURE(count == Width, "Invalid width"); + + SplitStack.clear(); + SplitStack.emplace_back(*this, values); + std::vector> result; + + const auto estimatedSize = SplitStack.back().GetSize() / std::max(ChunkSizeLimit, ui64(1)); + result.reserve(estimatedSize); + SplitStack.reserve(estimatedSize); + while (!SplitStack.empty()) { + auto item = std::move(SplitStack.back()); + SplitStack.pop_back(); + + while (item.GetSize() > ChunkSizeLimit) { + if (item.GetNumberRows() <= 1) { + throw yexception() << "Row size in block is " << item.GetSize() << ", that is larger than allowed limit " << ChunkSizeLimit; + } + SplitStack.emplace_back(item.PopBack(item.GetNumberRows() / 2)); + } + result.emplace_back(item.ExtractData()); + } + return result; + } + +private: + static arrow::Datum ExtractDatum(const NUdf::TUnboxedValuePod& value) { + arrow::Datum datum = TArrowBlock::From(value).GetDatum(); + MKQL_ENSURE(datum.is_array() || datum.is_scalar(), "Expecting array or scalar"); + return datum; + } + + ui64 GetDatumMemorySize(ui64 index, const arrow::Datum& datum) { + MKQL_ENSURE(index < Width, "Invalid index"); + if (datum.is_array()) { + return NKikimr::NArrow::GetArrayMemorySize(datum.array()); + } + + if (!ScalarSizes[index]) { + const auto& array = ARROW_RESULT(arrow::MakeArrayFromScalar(*datum.scalar(), 1)); + ScalarSizes[index] = NKikimr::NArrow::GetArrayMemorySize(array->data()); + } + return *ScalarSizes[index]; + } + + IBlockTrimmer& GetColumnTrimmer(ui64 index) { + MKQL_ENSURE(index < Width, "Invalid index"); + if (!BlockTrimmers[index]) { + BlockTrimmers[index] = MakeBlockTrimmer(TTypeInfoHelper(), Items[index]->GetItemType(), &ArrowPool); + } + return *BlockTrimmers[index]; + } + +private: + const TVector Items; + const ui64 Width; + const ui64 ChunkSizeLimit; + arrow::MemoryPool& ArrowPool; + + std::vector> ScalarSizes; + std::vector BlockTrimmers; + std::vector SplitStack; +}; + +} // namespace + +IBlockSplitter::TPtr CreateBlockSplitter(const TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool) { + if (!type->IsMulti()) { + return nullptr; + } + + const TMultiType* multiType = static_cast(type); + const ui32 width = multiType->GetElementsCount(); + if (!width) { + return nullptr; + } + + TVector items; + items.reserve(width); + for (ui32 i = 0; i < width; i++) { + const auto type = multiType->GetElementType(i); + if (!type->IsBlock()) { + return nullptr; + } + + const TBlockType* blockType = static_cast(type); + if (i == width - 1) { + if (blockType->GetShape() != TBlockType::EShape::Scalar) { + return nullptr; + } + if (!blockType->GetItemType()->IsData()) { + return nullptr; + } + if (static_cast(blockType->GetItemType())->GetDataSlot() != NUdf::EDataSlot::Uint64) { + return nullptr; + } + } + + items.push_back(blockType); + } + + return MakeIntrusive(items, chunkSizeLimit, pool); +} + } // namespace NArrow } // namespace NYql - diff --git a/ydb/library/yql/dq/runtime/dq_arrow_helpers.h b/ydb/library/yql/dq/runtime/dq_arrow_helpers.h index 293bb8dee398..f11598889d2b 100644 --- a/ydb/library/yql/dq/runtime/dq_arrow_helpers.h +++ b/ydb/library/yql/dq/runtime/dq_arrow_helpers.h @@ -84,6 +84,16 @@ std::shared_ptr DeserializeArray(const std::string& blob, std::sha */ void AppendElement(NYql::NUdf::TUnboxedValue value, arrow::ArrayBuilder* builder, const NKikimr::NMiniKQL::TType* type); +class IBlockSplitter : public TThrRefBase { +public: + using TPtr = TIntrusivePtr; + + virtual bool ShouldSplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) = 0; + + virtual std::vector> SplitItem(const NUdf::TUnboxedValuePod* values, ui32 count) = 0; +}; + +IBlockSplitter::TPtr CreateBlockSplitter(const NKikimr::NMiniKQL::TType* type, ui64 chunkSizeLimit, arrow::MemoryPool* pool = nullptr); } // NArrow } // NYql diff --git a/ydb/library/yql/dq/runtime/dq_arrow_helpers_ut.cpp b/ydb/library/yql/dq/runtime/dq_arrow_helpers_ut.cpp index d8df96d4a86d..efd0f778599f 100644 --- a/ydb/library/yql/dq/runtime/dq_arrow_helpers_ut.cpp +++ b/ydb/library/yql/dq/runtime/dq_arrow_helpers_ut.cpp @@ -1,16 +1,6 @@ #include "dq_arrow_helpers.h" -#include - #include -#include -#include -#include -#include -#include -#include -#include -#include #include #include @@ -18,16 +8,34 @@ #include #include +#include + #include #include -#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include -using namespace NKikimr; using namespace NKikimr::NMiniKQL; +using namespace NKikimr::NArrow; using namespace NYql; namespace { + NUdf::TUnboxedValue GetValueOfBasicType(TType* type, ui64 value) { Y_ABORT_UNLESS(type->GetKind() == TType::EKind::Data); auto dataType = static_cast(type); @@ -60,6 +68,31 @@ NUdf::TUnboxedValue GetValueOfBasicType(TType* type, ui64 value) { } } +ui64 GetDatumSize(const arrow::Datum& datum) { + if (datum.is_scalar()) { + return GetArrayMemorySize(ARROW_RESULT(arrow::MakeArrayFromScalar(*datum.scalar(), 1))->data()); + } + if (datum.is_array()) { + return GetArrayDataSize(datum.make_array()); + } + + UNIT_FAIL("Expected scalar or array"); + return 0; +} + +struct TBlockColumn { + arrow::Datum Datum; + TBlockType* Type; + ui64 Size = 0; +}; + +struct TBlockValue { + TUnboxedValueVector Values; + const TMultiType* Type; + ui64 ArraysSize = 0; + ui64 ScalarsSize = 0; +}; + struct TTestContext { TScopedAlloc Alloc; TTypeEnvironment TypeEnv; @@ -342,6 +375,60 @@ struct TTestContext { } return values; } + + template + TBlockColumn CreateBlockColumn(std::optional numberRows, TFiller& arrayFiller, std::function scalarFiller) { + TBlockType* type = TBlockType::Create(TDataType::Create(NUdf::TDataType::Id, TypeEnv), numberRows ? TBlockType::EShape::Many : TBlockType::EShape::Scalar, TypeEnv); + arrow::Datum datum; + if (numberRows) { + datum = std::make_shared>("field", arrayFiller)->BuildArray(*numberRows); + } else { + datum = arrow::Datum(std::make_shared(scalarFiller())); + } + return {.Datum = datum, .Type = type, .Size = GetDatumSize(datum)}; + } + + TBlockColumn CreateStringBlockColumn(std::optional numberRows) { + NConstruction::TStringPoolFiller stringGenerator(8, 512); + return CreateBlockColumn(numberRows, stringGenerator, [&]() { + return arrow::StringScalar(stringGenerator.GetValue(0).to_string()); + }); + } + + TBlockColumn CreateIntBlockColumn(std::optional numberRows) { + NConstruction::TIntSeqFiller intGenerator; + return CreateBlockColumn(numberRows, intGenerator, [&]() { + return arrow::Int32Scalar(intGenerator.GetValue(0)); + }); + } + + TBlockValue ComposeBlockColumns(std::vector columns, ui64 numberRows) { + ui64 arraysSize = 0; + ui64 scalarsSize = 0; + TUnboxedValueVector columnValues; + std::vector columnTypes; + columnValues.reserve(columns.size() + 1); + columnTypes.reserve(columns.size() + 1); + for (auto& column : columns) { + if (column.Datum.is_scalar()) { + scalarsSize += column.Size; + } else { + arraysSize += column.Size; + } + + columnValues.emplace_back(HolderFactory.CreateArrowBlock(std::move(column.Datum))); + columnTypes.emplace_back(column.Type); + } + + auto lengtDatum = arrow::Datum(std::make_shared(numberRows)); + scalarsSize += GetDatumSize(lengtDatum); + columnValues.emplace_back(HolderFactory.CreateArrowBlock(std::move(lengtDatum))); + columnTypes.emplace_back(TBlockType::Create(TDataType::Create(NUdf::TDataType::Id, TypeEnv), TBlockType::EShape::Scalar, TypeEnv)); + + const TMultiType* multiType = TMultiType::Create(columnTypes.size(), columnTypes.data(), TypeEnv); + + return {.Values = std::move(columnValues), .Type = multiType, .ArraysSize = arraysSize, .ScalarsSize = scalarsSize}; + } }; // Note this equality check is not fully valid. But it is sufficient for UnboxedValues used in tests. @@ -471,7 +558,8 @@ void AssertUnboxedValuesAreEqual(NUdf::TUnboxedValue& left, NUdf::TUnboxedValue& THROW yexception() << "Unsupported type: " << type->GetKindAsStr(); } } -} + +} // namespace Y_UNIT_TEST_SUITE(DqUnboxedValueToNativeArrowConversion) { @@ -967,3 +1055,120 @@ Y_UNIT_TEST_SUITE(ConvertUnboxedValueToArrowAndBack){ } } +Y_UNIT_TEST_SUITE(TestArrowBlockSplitter) { + void ValidateSplit(const TBlockValue& initialItem, ui64 numberParts, ui64 sizeLimit, const std::vector>& splttedItems) { + UNIT_ASSERT_VALUES_EQUAL(splttedItems.size(), numberParts); + const ui64 numberRows = TArrowBlock::From(initialItem.Values.back()).GetDatum().scalar_as().value; + const ui64 expectedSplittedSize = numberRows / numberParts; + const ui64 width = initialItem.Type->GetElementsCount(); + + ui64 rowsCount = 0; + for (const auto& splttedBatch : splttedItems) { + const auto batchSuffix = TStringBuilder() << "rows count: " << rowsCount; + + UNIT_ASSERT_VALUES_EQUAL_C(width, splttedBatch.size(), batchSuffix); + UNIT_ASSERT_C(splttedBatch.back().is_scalar(), batchSuffix); + + const auto splittedSize = splttedBatch.back().scalar_as().value; + UNIT_ASSERT_VALUES_EQUAL_C(splittedSize, expectedSplittedSize, batchSuffix); + + ui64 itemSize = 0; + for (ui64 i = 0; i < width - 1; ++i) { + const auto columnSuffix = TStringBuilder() << batchSuffix << ", column: " << i; + + const auto initialDatum = TArrowBlock::From(initialItem.Values[i]).GetDatum(); + const auto splittedDatum = splttedBatch[i]; + if (initialDatum.is_scalar()) { + UNIT_ASSERT_C(splittedDatum.is_scalar(), columnSuffix); + UNIT_ASSERT_C(initialDatum.Equals(splittedDatum), columnSuffix); + itemSize += GetArrayMemorySize(ARROW_RESULT(arrow::MakeArrayFromScalar(*splittedDatum.scalar(), 1))->data()); + } else { + UNIT_ASSERT_C(splittedDatum.is_array(), columnSuffix); + + const auto splittedArray = splittedDatum.make_array(); + UNIT_ASSERT_VALUES_EQUAL_C(splittedSize, splittedArray->length(), columnSuffix); + UNIT_ASSERT_VALUES_EQUAL_C(0, splittedArray->offset(), columnSuffix); + UNIT_ASSERT_C(splittedArray->Equals(initialDatum.make_array()->Slice(rowsCount, splittedSize)), columnSuffix); + itemSize += GetArrayMemorySize(splittedArray->data()); + } + } + UNIT_ASSERT_LE_C(itemSize, sizeLimit, batchSuffix); + + rowsCount += splittedSize; + } + UNIT_ASSERT_VALUES_EQUAL(numberRows, rowsCount); + } + + Y_UNIT_TEST(SplitLargeBlock) { + TTestContext context; + + constexpr ui64 numberRows = 2048; + const auto& item = context.ComposeBlockColumns({ + context.CreateStringBlockColumn(numberRows), + context.CreateIntBlockColumn(numberRows) + }, numberRows); + + constexpr ui64 numberParts = 8; + const ui64 sizeLimit = item.ScalarsSize + item.ArraysSize / numberParts; + const auto splitter = NArrow::CreateBlockSplitter(item.Type, sizeLimit); + + UNIT_ASSERT(splitter->ShouldSplitItem(item.Values.data(), item.Values.size())); + ValidateSplit(item, numberParts, sizeLimit, splitter->SplitItem(item.Values.data(), item.Values.size())); + } + + Y_UNIT_TEST(SplitWithScalars) { + TTestContext context; + + constexpr ui64 numberRows = 2048; + const auto& item = context.ComposeBlockColumns({ + context.CreateStringBlockColumn(std::nullopt), + context.CreateIntBlockColumn(numberRows) + }, numberRows); + + constexpr ui64 numberParts = 8; + const ui64 sizeLimit = item.ScalarsSize + item.ArraysSize / numberParts; + const auto splitter = NArrow::CreateBlockSplitter(item.Type, sizeLimit); + + UNIT_ASSERT(splitter->ShouldSplitItem(item.Values.data(), item.Values.size())); + ValidateSplit(item, numberParts, sizeLimit, splitter->SplitItem(item.Values.data(), item.Values.size())); + } + + Y_UNIT_TEST(PassSmallBlock) { + TTestContext context; + + constexpr ui64 numberRows = 2048; + const auto& item = context.ComposeBlockColumns({ + context.CreateStringBlockColumn(std::nullopt), + context.CreateStringBlockColumn(numberRows), + context.CreateIntBlockColumn(std::nullopt), + context.CreateIntBlockColumn(numberRows) + }, numberRows); + + const auto splitter = NArrow::CreateBlockSplitter(item.Type, 2 * (item.ArraysSize + item.ScalarsSize)); + UNIT_ASSERT(!splitter->ShouldSplitItem(item.Values.data(), item.Values.size())); + } + + Y_UNIT_TEST(CheckLargeRows) { + TTestContext context; + + constexpr ui64 numberRows = 2048; + const auto& item = context.ComposeBlockColumns({context.CreateStringBlockColumn(numberRows)}, numberRows); + + const ui64 sizeLimit = item.ArraysSize / numberRows; + const auto splitter = NArrow::CreateBlockSplitter(item.Type, sizeLimit); + UNIT_ASSERT(splitter->ShouldSplitItem(item.Values.data(), item.Values.size())); + UNIT_ASSERT_EXCEPTION_CONTAINS(splitter->SplitItem(item.Values.data(), item.Values.size()), yexception, TStringBuilder() << "Row size in block is " << item.ArraysSize / numberRows + item.ScalarsSize << ", that is larger than allowed limit " << sizeLimit); + } + + Y_UNIT_TEST(CheckLargeScalarRows) { + TTestContext context; + + constexpr ui64 numberRows = 2048; + const auto& item = context.ComposeBlockColumns({context.CreateStringBlockColumn(std::nullopt)}, numberRows); + + const ui64 sizeLimit = item.ScalarsSize / 2; + const auto splitter = NArrow::CreateBlockSplitter(item.Type, sizeLimit); + UNIT_ASSERT(splitter->ShouldSplitItem(item.Values.data(), item.Values.size())); + UNIT_ASSERT_EXCEPTION_CONTAINS(splitter->SplitItem(item.Values.data(), item.Values.size()), yexception, TStringBuilder() << "Row size in block is " << item.ScalarsSize << ", that is larger than allowed limit " << sizeLimit); + } +} diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index ddf5e63fca83..c9e1d7b4a4fd 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -1,13 +1,12 @@ #include "dq_output_channel.h" -#include "dq_transport.h" - -#include -#include +#include "dq_arrow_helpers.h" #include #include #include +#include +#include namespace NYql::NDq { @@ -45,6 +44,8 @@ class TDqOutputChannel : public IDqOutputChannel { , MaxStoredBytes(settings.MaxStoredBytes) , MaxChunkBytes(settings.MaxChunkBytes) , ChunkSizeLimit(settings.ChunkSizeLimit) + , MutableSettings(settings.MutableSettings) + , BlockSplitter(NArrow::CreateBlockSplitter(OutputType, ChunkSizeLimit * 0.75)) , LogFunc(logFunc) { PopStats.Level = settings.Level; @@ -79,16 +80,17 @@ class TDqOutputChannel : public IDqOutputChannel { virtual void Push(NUdf::TUnboxedValue&& value) override { YQL_ENSURE(!OutputType->IsMulti()); - DoPush(&value, 1); + DoPushSafe(&value, 1); } virtual void WidePush(NUdf::TUnboxedValue* values, ui32 width) override { YQL_ENSURE(OutputType->IsMulti()); YQL_ENSURE(Width == width); - DoPush(values, width); + DoPushSafe(values, width); } - void DoPush(NUdf::TUnboxedValue* values, ui32 width) { + // Try to split data before push to fulfill ChunkSizeLimit + void DoPushSafe(NUdf::TUnboxedValue* values, ui32 width) { YQL_ENSURE(!IsFull()); if (Finished) { @@ -105,6 +107,23 @@ class TDqOutputChannel : public IDqOutputChannel { PushStats.Resume(); } + PackerCurrentRowCount += rows; + + if (!MutableSettings.IsLocalChannel && BlockSplitter && BlockSplitter->ShouldSplitItem(values, width)) { + if (Packer.PackedSizeEstimate()) { + TryPack(/* force */ true); + } + auto splittedItem = BlockSplitter->SplitItem(values, width); + for (auto it = splittedItem.rbegin(); it != splittedItem.rend(); ++it) { + DoPushBlock(std::move(*it)); + } + } else { + DoPush(values, width); + } + } + + // Push data as single chunk + void DoPush(NUdf::TUnboxedValue* values, ui32 width) { if (OutputType->IsMulti()) { Packer.AddWideItem(values, width); } else { @@ -115,10 +134,25 @@ class TDqOutputChannel : public IDqOutputChannel { } PackerCurrentChunkCount++; - PackerCurrentRowCount += rows; + TryPack(/* force */ false); + } + + void DoPushBlock(std::vector&& data) { + NKikimr::NMiniKQL::TUnboxedValueVector outputValues; + outputValues.reserve(data.size()); + for (auto& datum : data) { + outputValues.emplace_back(HolderFactory.CreateArrowBlock(std::move(datum))); + } + Packer.AddWideItem(outputValues.data(), outputValues.size()); + + PackerCurrentChunkCount++; + TryPack(/* force */ false); + } + // Pack and spill data batch if enough data (>= Max Chunk Bytes) or force = true + void TryPack(bool force) { size_t packerSize = Packer.PackedSizeEstimate(); - if (packerSize >= MaxChunkBytes) { + if (packerSize >= MaxChunkBytes || force && packerSize) { Data.emplace_back(); Data.back().Buffer = FinishPackAndCheckSize(); if (PushStats.CollectBasic()) { @@ -342,7 +376,7 @@ class TDqOutputChannel : public IDqOutputChannel { TChunkedBuffer FinishPackAndCheckSize() { TChunkedBuffer result = Packer.Finish(); - if (result.Size() > ChunkSizeLimit) { + if (!MutableSettings.IsLocalChannel && result.Size() > ChunkSizeLimit) { // TODO: may relax requirement if OOB transport is enabled ythrow TDqOutputChannelChunkSizeLimitExceeded() << "Row data size is too big: " << result.Size() << " bytes, exceeds limit of " << ChunkSizeLimit << " bytes"; @@ -379,6 +413,10 @@ class TDqOutputChannel : public IDqOutputChannel { void Terminate() override { } + void UpdateSettings(const TDqOutputChannelSettings::TMutable& settings) override { + MutableSettings = settings; + } + private: NKikimr::NMiniKQL::TType* OutputType; NKikimr::NMiniKQL::TValuePackerTransport Packer; @@ -389,6 +427,8 @@ class TDqOutputChannel : public IDqOutputChannel { const ui64 MaxStoredBytes; const ui64 MaxChunkBytes; const ui64 ChunkSizeLimit; + TDqOutputChannelSettings::TMutable MutableSettings; + const NArrow::IBlockSplitter::TPtr BlockSplitter; TLogFunc LogFunc; struct TSerializedBatch { diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h index e475caa3ccc9..53b935a8689e 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.h +++ b/ydb/library/yql/dq/runtime/dq_output_channel.h @@ -25,6 +25,20 @@ struct TDqOutputChannelStats : public TDqOutputStats { ui64 SpilledBlobs = 0; }; +struct TDqOutputChannelSettings { + struct TMutable { + bool IsLocalChannel = false; + }; + + ui64 MaxStoredBytes = 8_MB; + ui64 MaxChunkBytes = 2_MB; + ui64 ChunkSizeLimit = 48_MB; + NDqProto::EDataTransportVersion TransportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; + IDqChannelStorage::TPtr ChannelStorage; + TCollectStatsLevel Level = TCollectStatsLevel::None; + TMutable MutableSettings; +}; + class IDqOutputChannel : public IDqOutput { public: using TPtr = TIntrusivePtr; @@ -55,15 +69,8 @@ class IDqOutputChannel : public IDqOutput { virtual ui64 Drop() = 0; virtual void Terminate() = 0; -}; -struct TDqOutputChannelSettings { - ui64 MaxStoredBytes = 8_MB; - ui64 MaxChunkBytes = 2_MB; - ui64 ChunkSizeLimit = 48_MB; - NDqProto::EDataTransportVersion TransportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; - IDqChannelStorage::TPtr ChannelStorage; - TCollectStatsLevel Level = TCollectStatsLevel::None; + virtual void UpdateSettings(const TDqOutputChannelSettings::TMutable& settings) = 0; }; struct TDqOutputChannelChunkSizeLimitExceeded : public yexception { diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 059de16683bf..802239bc44a1 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -672,6 +672,12 @@ class TDqTaskRunner : public IDqTaskRunner { settings.ChannelStorage = execCtx.CreateChannelStorage(channelId, outputChannelDesc.GetEnableSpilling()); } + if (outputChannelDesc.GetSrcEndpoint().HasActorId() && outputChannelDesc.GetDstEndpoint().HasActorId()) { + const auto srcNodeId = NActors::ActorIdFromProto(outputChannelDesc.GetSrcEndpoint().GetActorId()).NodeId(); + const auto dstNodeId = NActors::ActorIdFromProto(outputChannelDesc.GetDstEndpoint().GetActorId()).NodeId(); + settings.MutableSettings.IsLocalChannel = srcNodeId == dstNodeId; + } + auto outputChannel = CreateDqOutputChannel(channelId, outputChannelDesc.GetDstStageId(), *taskOutputType, holderFactory, settings, LogFunc); auto ret = AllocatedHolder->OutputChannels.emplace(channelId, outputChannel); diff --git a/ydb/library/yql/dq/runtime/ya.make b/ydb/library/yql/dq/runtime/ya.make index a9191f3a9836..5decd2ab6db3 100644 --- a/ydb/library/yql/dq/runtime/ya.make +++ b/ydb/library/yql/dq/runtime/ya.make @@ -2,21 +2,23 @@ LIBRARY() PEERDIR( contrib/libs/apache/arrow - ydb/library/yverify_stream + ydb/library/actors/util + ydb/library/formats/arrow/hash + ydb/library/formats/arrow ydb/library/mkql_proto - yql/essentials/minikql/comp_nodes - yql/essentials/parser/pg_wrapper/interface - yql/essentials/public/udf ydb/library/yql/dq/actors/protos ydb/library/yql/dq/common ydb/library/yql/dq/expr_nodes ydb/library/yql/dq/type_ann + ydb/library/yverify_stream + yql/essentials/minikql + yql/essentials/minikql/comp_nodes + yql/essentials/minikql/computation yql/essentials/parser/pg_wrapper/interface yql/essentials/providers/common/comp_nodes yql/essentials/providers/common/schema/mkql yql/essentials/public/udf - ydb/library/actors/util - ydb/library/formats/arrow/hash + yql/essentials/public/udf/arrow ) SRCS( diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index e8f71d108166..f1942a2a4572 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1133,6 +1133,10 @@ class TDqOutputChannel: public IDqOutputChannel { } } + void UpdateSettings(const TDqOutputChannelSettings::TMutable& settings) override { + Y_UNUSED(settings); + } + template void FromProto(const T& f) {