From d9f3792d77dd5f73938558025a79766a559971d3 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Wed, 23 Oct 2024 15:28:09 +0000 Subject: [PATCH 1/4] Added graph invalidation support into purecalc --- .../yql/public/purecalc/common/worker.cpp | 16 +- .../yql/public/purecalc/common/worker.h | 1 + .../public/purecalc/ut/test_push_stream.cpp | 138 ++++++++++++++++++ ydb/library/yql/public/purecalc/ut/ya.make | 1 + 4 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 ydb/library/yql/public/purecalc/ut/test_push_stream.cpp diff --git a/ydb/library/yql/public/purecalc/common/worker.cpp b/ydb/library/yql/public/purecalc/common/worker.cpp index f7d1ad1e723c..036dd2c1fcc0 100644 --- a/ydb/library/yql/public/purecalc/common/worker.cpp +++ b/ydb/library/yql/public/purecalc/common/worker.cpp @@ -526,6 +526,18 @@ void TPushStreamWorker::FeedToConsumer() { } } +NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() { + auto& ctx = Graph_.ComputationGraph_->GetContext(); + NUdf::TUnboxedValue pushStream = SelfNode_->GetValue(ctx); + + if (Y_UNLIKELY(pushStream.IsInvalid())) { + SelfNode_->SetValue(ctx, Graph_.ComputationGraph_->GetHolderFactory().Create()); + pushStream = SelfNode_->GetValue(ctx); + } + + return pushStream.AsBoxed().Get(); +} + void TPushStreamWorker::SetConsumer(THolder> consumer) { auto guard = Guard(GetScopedAlloc()); const auto inputsCount = Graph_.SelfNodes_.size(); @@ -553,7 +565,7 @@ void TPushStreamWorker::Push(NKikimr::NUdf::TUnboxedValue&& value) { YQL_ENSURE(!Finished_, "OnFinish has already been sent to the consumer; no new values can be pushed"); if (Y_LIKELY(SelfNode_)) { - static_cast(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetValue(std::move(value)); + static_cast(GetPushStream())->SetValue(std::move(value)); } FeedToConsumer(); @@ -564,7 +576,7 @@ void TPushStreamWorker::OnFinish() { YQL_ENSURE(!Finished_, "already finished"); if (Y_LIKELY(SelfNode_)) { - static_cast(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetFinished(); + static_cast(GetPushStream())->SetFinished(); } FeedToConsumer(); diff --git a/ydb/library/yql/public/purecalc/common/worker.h b/ydb/library/yql/public/purecalc/common/worker.h index 3f700ad353f6..335586b75c6d 100644 --- a/ydb/library/yql/public/purecalc/common/worker.h +++ b/ydb/library/yql/public/purecalc/common/worker.h @@ -164,6 +164,7 @@ namespace NYql { private: void FeedToConsumer(); + NYql::NUdf::IBoxedValue* GetPushStream(); public: void SetConsumer(THolder>) override; diff --git a/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp b/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp new file mode 100644 index 000000000000..3d2e401064d9 --- /dev/null +++ b/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp @@ -0,0 +1,138 @@ +#include + +#include +#include + +#include +#include + +using namespace NYql::NPureCalc; + +namespace { + class TStatelessInputSpec : public TInputSpecBase { + public: + TStatelessInputSpec() + : Schemas_({NYT::TNode::CreateList() + .Add("StructType") + .Add(NYT::TNode::CreateList() + .Add(NYT::TNode::CreateList() + .Add("InputValue") + .Add(NYT::TNode::CreateList() + .Add("DataType") + .Add("Utf8") + ) + ) + ) + }) + {}; + + const TVector& GetSchemas() const override { + return Schemas_; + } + + private: + const TVector Schemas_; + }; + + class TStatelessInputConsumer : public IConsumer { + public: + TStatelessInputConsumer(TWorkerHolder worker) + : Worker_(std::move(worker)) + {} + + void OnObject(const NYql::NUdf::TUnboxedValue& value) override { + with_lock (Worker_->GetScopedAlloc()) { + NYql::NUdf::TUnboxedValue* items = nullptr; + NYql::NUdf::TUnboxedValue result = Worker_->GetGraph().GetHolderFactory().CreateDirectArrayHolder(1, items); + + items[0] = value; + + Worker_->Push(std::move(result)); + + // Clear graph after each object because + // values allocated on another allocator and should be released + Worker_->GetGraph().Invalidate(); + } + } + + void OnFinish() override { + with_lock(Worker_->GetScopedAlloc()) { + Worker_->OnFinish(); + } + } + + private: + TWorkerHolder Worker_; + }; + + class TStatelessConsumer : public IConsumer { + const TString ExpectedData_; + const ui64 ExpectedRows_; + ui64 RowId_ = 0; + + public: + TStatelessConsumer(const TString& expectedData, ui64 expectedRows) + : ExpectedData_(expectedData) + , ExpectedRows_(expectedRows) + {} + + void OnObject(NPureCalcProto::TStringMessage* message) override { + UNIT_ASSERT_VALUES_EQUAL_C(ExpectedData_, message->GetX(), RowId_); + RowId_++; + } + + void OnFinish() override { + UNIT_ASSERT_VALUES_EQUAL(ExpectedRows_, RowId_); + } + }; +} + +template <> +struct TInputSpecTraits { + static constexpr bool IsPartial = false; + static constexpr bool SupportPushStreamMode = true; + + using TConsumerType = THolder>; + + static TConsumerType MakeConsumer(const TStatelessInputSpec&, TWorkerHolder worker) { + return MakeHolder(std::move(worker)); + } +}; + +Y_UNIT_TEST_SUITE(TestPushStream) { + Y_UNIT_TEST(TestGraphInvalidation) { + const auto targetString = "large string >= 14 bytes"; + const auto factory = MakeProgramFactory(); + const auto sql = TStringBuilder() << "SELECT InputValue AS X FROM Input WHERE InputValue = \"" << targetString << "\";"; + + const auto program = factory->MakePushStreamProgram( + TStatelessInputSpec(), + TProtobufOutputSpec(), + sql + ); + + const ui64 numberRows = 5; + const auto inputConsumer = program->Apply(MakeHolder(targetString, numberRows)); + const NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false); + + const auto pushString = [&](TString inputValue) { + NYql::NUdf::TUnboxedValue stringValue; + with_lock(alloc) { + stringValue = NKikimr::NMiniKQL::MakeString(inputValue); + } + + inputConsumer->OnObject(stringValue); + + with_lock(alloc) { + stringValue.Clear(); + } + }; + + for (ui64 i = 0; i < numberRows; ++i) { + pushString(targetString); + pushString("another large string >= 14 bytes"); + Cerr << "Computed string " << i << "\n"; + } + inputConsumer->OnFinish(); + } +} diff --git a/ydb/library/yql/public/purecalc/ut/ya.make b/ydb/library/yql/public/purecalc/ut/ya.make index 87f90a25ccf1..0403064572a1 100644 --- a/ydb/library/yql/public/purecalc/ut/ya.make +++ b/ydb/library/yql/public/purecalc/ut/ya.make @@ -12,6 +12,7 @@ SRCS( test_user_data.cpp test_eval.cpp test_pool.cpp + test_push_stream.cpp ) PEERDIR( From 5076b506d182783e8e2bd7499ef94773f8b427b3 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Wed, 23 Oct 2024 15:32:07 +0000 Subject: [PATCH 2/4] Added const --- ydb/library/yql/public/purecalc/common/worker.cpp | 2 +- ydb/library/yql/public/purecalc/common/worker.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/public/purecalc/common/worker.cpp b/ydb/library/yql/public/purecalc/common/worker.cpp index 036dd2c1fcc0..e04e2062ba84 100644 --- a/ydb/library/yql/public/purecalc/common/worker.cpp +++ b/ydb/library/yql/public/purecalc/common/worker.cpp @@ -526,7 +526,7 @@ void TPushStreamWorker::FeedToConsumer() { } } -NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() { +NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() const { auto& ctx = Graph_.ComputationGraph_->GetContext(); NUdf::TUnboxedValue pushStream = SelfNode_->GetValue(ctx); diff --git a/ydb/library/yql/public/purecalc/common/worker.h b/ydb/library/yql/public/purecalc/common/worker.h index 335586b75c6d..55eb440656b1 100644 --- a/ydb/library/yql/public/purecalc/common/worker.h +++ b/ydb/library/yql/public/purecalc/common/worker.h @@ -164,7 +164,7 @@ namespace NYql { private: void FeedToConsumer(); - NYql::NUdf::IBoxedValue* GetPushStream(); + NYql::NUdf::IBoxedValue* GetPushStream() const; public: void SetConsumer(THolder>) override; From 62d391233799b9236bb0a4d633cbe94bb28f053d Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Wed, 23 Oct 2024 15:36:18 +0000 Subject: [PATCH 3/4] Simplified include --- ydb/library/yql/public/purecalc/ut/test_push_stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp b/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp index 3d2e401064d9..d0b8bce08854 100644 --- a/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp +++ b/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp @@ -1,6 +1,6 @@ #include -#include +#include #include #include From fda305eaf011d0dd04c9ccb98cd7e2fa0795df86 Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Wed, 23 Oct 2024 16:06:04 +0000 Subject: [PATCH 4/4] Fixed issues: added LockObject, removed Cerr, renamed tests --- ...test_push_stream.cpp => test_mixed_allocators.cpp} | 11 ++++++----- ydb/library/yql/public/purecalc/ut/ya.make | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) rename ydb/library/yql/public/purecalc/ut/{test_push_stream.cpp => test_mixed_allocators.cpp} (93%) diff --git a/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp b/ydb/library/yql/public/purecalc/ut/test_mixed_allocators.cpp similarity index 93% rename from ydb/library/yql/public/purecalc/ut/test_push_stream.cpp rename to ydb/library/yql/public/purecalc/ut/test_mixed_allocators.cpp index d0b8bce08854..3038b5e4b2a4 100644 --- a/ydb/library/yql/public/purecalc/ut/test_push_stream.cpp +++ b/ydb/library/yql/public/purecalc/ut/test_mixed_allocators.cpp @@ -1,6 +1,6 @@ #include -#include +#include #include #include @@ -99,8 +99,8 @@ struct TInputSpecTraits { } }; -Y_UNIT_TEST_SUITE(TestPushStream) { - Y_UNIT_TEST(TestGraphInvalidation) { +Y_UNIT_TEST_SUITE(TestMixedAllocators) { + Y_UNIT_TEST(TestPushStream) { const auto targetString = "large string >= 14 bytes"; const auto factory = MakeProgramFactory(); const auto sql = TStringBuilder() << "SELECT InputValue AS X FROM Input WHERE InputValue = \"" << targetString << "\";"; @@ -113,17 +113,19 @@ Y_UNIT_TEST_SUITE(TestPushStream) { const ui64 numberRows = 5; const auto inputConsumer = program->Apply(MakeHolder(targetString, numberRows)); - const NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false); + NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false); const auto pushString = [&](TString inputValue) { NYql::NUdf::TUnboxedValue stringValue; with_lock(alloc) { stringValue = NKikimr::NMiniKQL::MakeString(inputValue); + alloc.Ref().LockObject(stringValue); } inputConsumer->OnObject(stringValue); with_lock(alloc) { + alloc.Ref().UnlockObject(stringValue); stringValue.Clear(); } }; @@ -131,7 +133,6 @@ Y_UNIT_TEST_SUITE(TestPushStream) { for (ui64 i = 0; i < numberRows; ++i) { pushString(targetString); pushString("another large string >= 14 bytes"); - Cerr << "Computed string " << i << "\n"; } inputConsumer->OnFinish(); } diff --git a/ydb/library/yql/public/purecalc/ut/ya.make b/ydb/library/yql/public/purecalc/ut/ya.make index 0403064572a1..6f23dcd7f4a3 100644 --- a/ydb/library/yql/public/purecalc/ut/ya.make +++ b/ydb/library/yql/public/purecalc/ut/ya.make @@ -12,7 +12,7 @@ SRCS( test_user_data.cpp test_eval.cpp test_pool.cpp - test_push_stream.cpp + test_mixed_allocators.cpp ) PEERDIR(