diff --git a/ydb/library/yql/public/purecalc/common/worker.cpp b/ydb/library/yql/public/purecalc/common/worker.cpp index f7d1ad1e723c..e04e2062ba84 100644 --- a/ydb/library/yql/public/purecalc/common/worker.cpp +++ b/ydb/library/yql/public/purecalc/common/worker.cpp @@ -526,6 +526,18 @@ void TPushStreamWorker::FeedToConsumer() { } } +NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() const { + auto& ctx = Graph_.ComputationGraph_->GetContext(); + NUdf::TUnboxedValue pushStream = SelfNode_->GetValue(ctx); + + if (Y_UNLIKELY(pushStream.IsInvalid())) { + SelfNode_->SetValue(ctx, Graph_.ComputationGraph_->GetHolderFactory().Create()); + pushStream = SelfNode_->GetValue(ctx); + } + + return pushStream.AsBoxed().Get(); +} + void TPushStreamWorker::SetConsumer(THolder> consumer) { auto guard = Guard(GetScopedAlloc()); const auto inputsCount = Graph_.SelfNodes_.size(); @@ -553,7 +565,7 @@ void TPushStreamWorker::Push(NKikimr::NUdf::TUnboxedValue&& value) { YQL_ENSURE(!Finished_, "OnFinish has already been sent to the consumer; no new values can be pushed"); if (Y_LIKELY(SelfNode_)) { - static_cast(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetValue(std::move(value)); + static_cast(GetPushStream())->SetValue(std::move(value)); } FeedToConsumer(); @@ -564,7 +576,7 @@ void TPushStreamWorker::OnFinish() { YQL_ENSURE(!Finished_, "already finished"); if (Y_LIKELY(SelfNode_)) { - static_cast(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetFinished(); + static_cast(GetPushStream())->SetFinished(); } FeedToConsumer(); diff --git a/ydb/library/yql/public/purecalc/common/worker.h b/ydb/library/yql/public/purecalc/common/worker.h index 3f700ad353f6..55eb440656b1 100644 --- a/ydb/library/yql/public/purecalc/common/worker.h +++ b/ydb/library/yql/public/purecalc/common/worker.h @@ -164,6 +164,7 @@ namespace NYql { private: void FeedToConsumer(); + NYql::NUdf::IBoxedValue* GetPushStream() const; public: void SetConsumer(THolder>) override; diff --git a/ydb/library/yql/public/purecalc/ut/test_mixed_allocators.cpp b/ydb/library/yql/public/purecalc/ut/test_mixed_allocators.cpp new file mode 100644 index 000000000000..3038b5e4b2a4 --- /dev/null +++ b/ydb/library/yql/public/purecalc/ut/test_mixed_allocators.cpp @@ -0,0 +1,139 @@ +#include + +#include +#include + +#include +#include + +using namespace NYql::NPureCalc; + +namespace { + class TStatelessInputSpec : public TInputSpecBase { + public: + TStatelessInputSpec() + : Schemas_({NYT::TNode::CreateList() + .Add("StructType") + .Add(NYT::TNode::CreateList() + .Add(NYT::TNode::CreateList() + .Add("InputValue") + .Add(NYT::TNode::CreateList() + .Add("DataType") + .Add("Utf8") + ) + ) + ) + }) + {}; + + const TVector& GetSchemas() const override { + return Schemas_; + } + + private: + const TVector Schemas_; + }; + + class TStatelessInputConsumer : public IConsumer { + public: + TStatelessInputConsumer(TWorkerHolder worker) + : Worker_(std::move(worker)) + {} + + void OnObject(const NYql::NUdf::TUnboxedValue& value) override { + with_lock (Worker_->GetScopedAlloc()) { + NYql::NUdf::TUnboxedValue* items = nullptr; + NYql::NUdf::TUnboxedValue result = Worker_->GetGraph().GetHolderFactory().CreateDirectArrayHolder(1, items); + + items[0] = value; + + Worker_->Push(std::move(result)); + + // Clear graph after each object because + // values allocated on another allocator and should be released + Worker_->GetGraph().Invalidate(); + } + } + + void OnFinish() override { + with_lock(Worker_->GetScopedAlloc()) { + Worker_->OnFinish(); + } + } + + private: + TWorkerHolder Worker_; + }; + + class TStatelessConsumer : public IConsumer { + const TString ExpectedData_; + const ui64 ExpectedRows_; + ui64 RowId_ = 0; + + public: + TStatelessConsumer(const TString& expectedData, ui64 expectedRows) + : ExpectedData_(expectedData) + , ExpectedRows_(expectedRows) + {} + + void OnObject(NPureCalcProto::TStringMessage* message) override { + UNIT_ASSERT_VALUES_EQUAL_C(ExpectedData_, message->GetX(), RowId_); + RowId_++; + } + + void OnFinish() override { + UNIT_ASSERT_VALUES_EQUAL(ExpectedRows_, RowId_); + } + }; +} + +template <> +struct TInputSpecTraits { + static constexpr bool IsPartial = false; + static constexpr bool SupportPushStreamMode = true; + + using TConsumerType = THolder>; + + static TConsumerType MakeConsumer(const TStatelessInputSpec&, TWorkerHolder worker) { + return MakeHolder(std::move(worker)); + } +}; + +Y_UNIT_TEST_SUITE(TestMixedAllocators) { + Y_UNIT_TEST(TestPushStream) { + const auto targetString = "large string >= 14 bytes"; + const auto factory = MakeProgramFactory(); + const auto sql = TStringBuilder() << "SELECT InputValue AS X FROM Input WHERE InputValue = \"" << targetString << "\";"; + + const auto program = factory->MakePushStreamProgram( + TStatelessInputSpec(), + TProtobufOutputSpec(), + sql + ); + + const ui64 numberRows = 5; + const auto inputConsumer = program->Apply(MakeHolder(targetString, numberRows)); + NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false); + + const auto pushString = [&](TString inputValue) { + NYql::NUdf::TUnboxedValue stringValue; + with_lock(alloc) { + stringValue = NKikimr::NMiniKQL::MakeString(inputValue); + alloc.Ref().LockObject(stringValue); + } + + inputConsumer->OnObject(stringValue); + + with_lock(alloc) { + alloc.Ref().UnlockObject(stringValue); + stringValue.Clear(); + } + }; + + for (ui64 i = 0; i < numberRows; ++i) { + pushString(targetString); + pushString("another large string >= 14 bytes"); + } + inputConsumer->OnFinish(); + } +} diff --git a/ydb/library/yql/public/purecalc/ut/ya.make b/ydb/library/yql/public/purecalc/ut/ya.make index 87f90a25ccf1..6f23dcd7f4a3 100644 --- a/ydb/library/yql/public/purecalc/ut/ya.make +++ b/ydb/library/yql/public/purecalc/ut/ya.make @@ -12,6 +12,7 @@ SRCS( test_user_data.cpp test_eval.cpp test_pool.cpp + test_mixed_allocators.cpp ) PEERDIR(