diff --git a/library/cpp/tld/tlds-alpha-by-domain.txt b/library/cpp/tld/tlds-alpha-by-domain.txt index 6459c3ccd849..736fc8a20cec 100644 --- a/library/cpp/tld/tlds-alpha-by-domain.txt +++ b/library/cpp/tld/tlds-alpha-by-domain.txt @@ -1,4 +1,4 @@ -# Version 2025070400, Last Updated Fri Jul 4 07:07:01 2025 UTC +# Version 2025070700, Last Updated Mon Jul 7 07:07:01 2025 UTC AAA AARP ABB diff --git a/ydb/ci/rightlib.txt b/ydb/ci/rightlib.txt index 25e285f602a8..66aa39f2eaca 100644 --- a/ydb/ci/rightlib.txt +++ b/ydb/ci/rightlib.txt @@ -1 +1 @@ -66c697edffda3020e43c59bd99a7194bd82e7c01 +4985bee7f78371eae4f89d2d010143ab71912a8e diff --git a/yql/essentials/core/cbo/cbo_hints.cpp b/yql/essentials/core/cbo/cbo_hints.cpp index e7dc201c638c..fb3164f2d60f 100644 --- a/yql/essentials/core/cbo/cbo_hints.cpp +++ b/yql/essentials/core/cbo/cbo_hints.cpp @@ -29,13 +29,15 @@ class TOptimizerHintsParser { private: void Start() { while (Pos_ < Size_) { - auto hintType = Keyword({"JoinOrder", "Leading", "JoinType", "Rows"}); + auto hintType = Keyword({"JoinOrder", "Leading", "JoinType", "Rows", "Bytes"}); if (hintType == "JoinOrder" || hintType == "Leading") { JoinOrder(hintType == "Leading"); } else if (hintType == "JoinType") { JoinType(); - } else if (hintType == "Rows"){ - Rows(); + } else if (hintType == "Rows") { + CardinalityOrBytes(true); + } else if (hintType == "Bytes") { + CardinalityOrBytes(false); } else { ParseError(Sprintf("Undefined hints type: %s", hintType.c_str()), Pos_ - hintType.size()); } @@ -113,7 +115,7 @@ class TOptimizerHintsParser { Y_UNREACHABLE(); } - void Rows() { + void CardinalityOrBytes(bool isRows) { i32 beginPos = Pos_ + 1; Keyword({"("}); @@ -134,7 +136,11 @@ class TOptimizerHintsParser { default: {ParseError(Sprintf("Unknown operation: '%c'", sign), Pos_ - 1); Y_UNREACHABLE();} } - Hints_.CardinalityHints->PushBack(std::move(labels), op, value, "Rows" + Text_.substr(beginPos, Pos_ - beginPos + 1)); + if (isRows) { + Hints_.CardinalityHints->PushBack(std::move(labels), op, value, "Rows" + Text_.substr(beginPos, Pos_ - beginPos + 1)); + } else { + Hints_.BytesHints->PushBack(std::move(labels), op, value, "Bytes" + Text_.substr(beginPos, Pos_ - beginPos + 1)); + } } private: diff --git a/yql/essentials/core/cbo/cbo_optimizer_new.cpp b/yql/essentials/core/cbo/cbo_optimizer_new.cpp index beded3cba7b7..6fe7772bce7f 100644 --- a/yql/essentials/core/cbo/cbo_optimizer_new.cpp +++ b/yql/essentials/core/cbo/cbo_optimizer_new.cpp @@ -194,6 +194,28 @@ TOptimizerStatistics TBaseProviderContext::ComputeJoinStatsV1( return stats; } +TOptimizerStatistics TBaseProviderContext::ComputeJoinStatsV2( + const TOptimizerStatistics& leftStats, + const TOptimizerStatistics& rightStats, + const TVector& leftJoinKeys, + const TVector& rightJoinKeys, + EJoinAlgoType joinAlgo, + EJoinKind joinKind, + TCardinalityHints::TCardinalityHint* maybeHint, + bool shuffleLeftSide, + bool shuffleRightSide, + TCardinalityHints::TCardinalityHint* maybeBytesHint +) const { + + auto stats = ComputeJoinStatsV1(leftStats, rightStats, leftJoinKeys, rightJoinKeys, joinAlgo, joinKind, maybeHint, shuffleLeftSide, shuffleRightSide); + + if (maybeBytesHint) { + stats.ByteSize = maybeBytesHint->ApplyHint(stats.ByteSize); + } + + return stats; +} + /** * Compute the cost and output cardinality of a join * @@ -338,6 +360,12 @@ TVector TOptimizerHints::GetUnappliedString() { } } + for (const auto& hint: BytesHints->Hints) { + if (!hint.Applied) { + res.push_back(hint.StringRepr); + } + } + return res; } diff --git a/yql/essentials/core/cbo/cbo_optimizer_new.h b/yql/essentials/core/cbo/cbo_optimizer_new.h index 18f51639963a..9fe8a6bc1bb9 100644 --- a/yql/essentials/core/cbo/cbo_optimizer_new.h +++ b/yql/essentials/core/cbo/cbo_optimizer_new.h @@ -175,6 +175,7 @@ struct TJoinOrderHints { }; struct TOptimizerHints { + std::shared_ptr BytesHints = std::make_shared(); std::shared_ptr CardinalityHints = std::make_shared(); std::shared_ptr JoinAlgoHints = std::make_shared(); std::shared_ptr JoinOrderHints = std::make_shared(); @@ -182,10 +183,11 @@ struct TOptimizerHints { TVector GetUnappliedString(); /* - * The function accepts string with three type of expressions: array of (JoinAlgo | Card | JoinOrder): + * The function accepts string with four type of expressions: array of (JoinAlgo | Rows | Bytes | JoinOrder): * 1) JoinAlgo(t1 t2 ... tn Map | Grace | Lookup) to change join algo for join, where these labels take part - * 2) Card(t1 t2 ... tn (*|/|+|-) Number) to change cardinality for join, where these labels take part or labels only - * 3) JoinOrder( (t1 t2) (t3 (t4 ...)) ) - fixate this join subtree in the general join tree + * 2) Rows(t1 t2 ... tn (*|/|+|-|#) Number) to change cardinality for join, where these labels take part or labels only + * 3) Bytes(t1 t2 ... tn (*|/|+|-|#) Number) to change byte size for join, where these labels take part or labels only + * 4) JoinOrder( (t1 t2) (t3 (t4 ...)) ) - fixate this join subtree in the general join tree */ static TOptimizerHints Parse(const TString&); }; @@ -228,6 +230,19 @@ struct IProviderContext { bool shuffleRightSide ) const = 0; + virtual TOptimizerStatistics ComputeJoinStatsV2( + const TOptimizerStatistics& leftStats, + const TOptimizerStatistics& rightStats, + const TVector& leftJoinKeys, + const TVector& rightJoinKeys, + EJoinAlgoType joinAlgo, + EJoinKind joinKind, + TCardinalityHints::TCardinalityHint* maybeHint, + bool shuffleLeftSide, + bool shuffleRightSide, + TCardinalityHints::TCardinalityHint* maybeBytesHint + ) const = 0; + virtual bool IsJoinApplicable(const std::shared_ptr& left, const std::shared_ptr& right, const TVector& leftJoinKeys, @@ -283,6 +298,19 @@ struct TBaseProviderContext : public IProviderContext { bool shuffleRightSide ) const override; + TOptimizerStatistics ComputeJoinStatsV2( + const TOptimizerStatistics& leftStats, + const TOptimizerStatistics& rightStats, + const TVector& leftJoinKeys, + const TVector& rightJoinKeys, + EJoinAlgoType joinAlgo, + EJoinKind joinKind, + TCardinalityHints::TCardinalityHint* maybeHint, + bool shuffleLeftSide, + bool shuffleRightSide, + TCardinalityHints::TCardinalityHint* maybeBytesHint + ) const override; + static const TBaseProviderContext& Instance(); }; diff --git a/yql/essentials/minikql/datetime/datetime.h b/yql/essentials/minikql/datetime/datetime.h index 67801abcc8e7..293f9e38bb00 100644 --- a/yql/essentials/minikql/datetime/datetime.h +++ b/yql/essentials/minikql/datetime/datetime.h @@ -170,8 +170,12 @@ bool DoAddMonths(TStorage& storage, i64 months, const NUdf::IDateBuilder& builde storage.Year--; newMonth += 12; } - if (storage.Year == 0) { - storage.Year += months > 0 ? 1 : -1; + // The minimal year value for TTMStorage is 1970, but the + // check below makes coverity happy. + if constexpr (!std::is_same_v) { + if (storage.Year == 0) { + storage.Year += months > 0 ? 1 : -1; + } } storage.Month = newMonth; bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year); @@ -183,8 +187,12 @@ bool DoAddMonths(TStorage& storage, i64 months, const NUdf::IDateBuilder& builde template bool DoAddYears(TStorage& storage, i64 years, const NUdf::IDateBuilder& builder) { storage.Year += years; - if (storage.Year == 0) { - storage.Year += years > 0 ? 1 : -1; + // The minimal year value for TTMStorage is 1970, but the + // check below makes coverity happy. + if constexpr (!std::is_same_v) { + if (storage.Year == 0) { + storage.Year += years > 0 ? 1 : -1; + } } if (storage.Month == 2 && storage.Day == 29) { bool isLeap = NKikimr::NMiniKQL::IsLeapYear(storage.Year); diff --git a/yt/yql/providers/yt/codec/yt_codec_job.cpp b/yt/yql/providers/yt/codec/yt_codec_job.cpp index 47d1feb734d5..5c9cfab2f768 100644 --- a/yt/yql/providers/yt/codec/yt_codec_job.cpp +++ b/yt/yql/providers/yt/codec/yt_codec_job.cpp @@ -46,11 +46,10 @@ TJobMkqlReaderImpl::TJobMkqlReaderImpl(const TFile& in) { } -TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector& outHandles) +TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TVector& outHandles) : NPrivate::TOutStreamsHolder(outHandles) , TMkqlWriterImpl(GetVectorOfStreams(), YQL_JOB_CODEC_BLOCK_COUNT, YQL_JOB_CODEC_BLOCK_SIZE) { - SetSpecs(specs); } void TJobMkqlWriterImpl::DoFinish(bool abort) { diff --git a/yt/yql/providers/yt/codec/yt_codec_job.h b/yt/yql/providers/yt/codec/yt_codec_job.h index e2a8232d2ce6..c255d290f5dd 100644 --- a/yt/yql/providers/yt/codec/yt_codec_job.h +++ b/yt/yql/providers/yt/codec/yt_codec_job.h @@ -42,7 +42,7 @@ class TJobMkqlReaderImpl: protected NPrivate::TInStreamHolder, public TMkqlReade class TJobMkqlWriterImpl: protected NPrivate::TOutStreamsHolder, public TMkqlWriterImpl { public: - TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector& outHandles); + TJobMkqlWriterImpl(const TVector& outHandles); ~TJobMkqlWriterImpl() = default; private: diff --git a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp index f611abef437c..a94e72495576 100644 --- a/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp +++ b/yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp @@ -619,7 +619,7 @@ class TFmrCoordinator: public IFmrCoordinator { }); mapTaskParams.Output = fmrTableOutputRefs; - mapTaskParams.Executable = mapOperationParams.Executable; // TODO - change Executable to mapper + mapTaskParams.SerializedMapJobState = mapOperationParams.SerializedMapJobState; TaskParams.emplace_back(mapTaskParams); } } diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make index b9120a7dc8cf..4513ffa013d7 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ut/ya.make @@ -2,6 +2,8 @@ UNITTEST() SRCS( yql_yt_job_ut.cpp + yql_yt_map_job_ut.cpp + yql_yt_raw_table_queue_ut.cpp yql_yt_table_data_service_reader_ut.cpp yql_yt_table_data_service_writer_ut.cpp ) @@ -9,8 +11,10 @@ SRCS( PEERDIR( yt/cpp/mapreduce/common yt/yql/providers/yt/fmr/job/impl + yt/yql/providers/yt/fmr/process + yt/yql/providers/yt/fmr/table_data_service/helpers yt/yql/providers/yt/fmr/yt_job_service/mock - yt/yql/providers/yt/fmr/table_data_service/local/impl + yt/yql/providers/yt/fmr/utils yql/essentials/utils/log yql/essentials/parser/pg_wrapper yql/essentials/parser/pg_wrapper/interface @@ -23,6 +27,8 @@ PEERDIR( yt/yql/providers/yt/codec/codegen/llvm16 yql/essentials/minikql/codegen/llvm16 yql/essentials/minikql/computation/llvm16 + yql/essentials/minikql/comp_nodes/llvm16 + yt/yql/providers/yt/comp_nodes/llvm16 ) YQL_LAST_ABI_VERSION() diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp index 58cfe7e3b815..f3f4aea529a5 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp @@ -1,5 +1,4 @@ -#include -#include +#include "yql_yt_job_ut.h" #include #include #include diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.h b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.h new file mode 100644 index 000000000000..75f27d39e6e1 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.h @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + +namespace NYql::NFmr { + // helper functions for yson reformatting + + TString GetBinaryYson(const TString& textYsonContent); + + TString GetTextYson(const TString& binaryYsonContent); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_map_job_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_map_job_ut.cpp new file mode 100644 index 000000000000..d8faa99ff1a1 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_map_job_ut.cpp @@ -0,0 +1,130 @@ +#include "yql_yt_job_ut.h" +#include +#include +#include +#include +#include +#include + +using namespace NKikimr::NMiniKQL; + +namespace NYql::NFmr { + +TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n" + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n"; + +Y_UNIT_TEST_SUITE(MapTests) { + Y_UNIT_TEST(RunMapJob) { + TFmrUserJob mapJob; + + TTempFileHandle tableDataServiceHostsFile; + ui16 port = 2345; + auto tableDataServiceServer = MakeTableDataServiceServer(port); + std::vector connections{{.Host = "localhost", .Port = port}}; + WriteHostsToFile(tableDataServiceHostsFile, 1, connections); + + TTempFileHandle inputYsonContentFile{}; + TFileOutput fileWriter(inputYsonContentFile.Name()); + fileWriter.Write(inputYsonContent.data(), inputYsonContent.size()); + fileWriter.Flush(); + + TYtTableTaskRef fileTask{.FilePaths = {inputYsonContentFile.Name()}}; + TFmrTableOutputRef fmrOutputRef{.TableId = "table_id", .PartId = "part_id"}; + TTaskTableRef taskTableRef(fileTask); + TMapTaskParams mapTaskParams{ + .Input = TTaskTableInputRef{.Inputs ={taskTableRef}}, + .Output = {fmrOutputRef} + }; + FillMapFmrJob(mapJob, mapTaskParams, {}, tableDataServiceHostsFile.Name(), true); + + { + auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); + TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),functionRegistry->SupportsSizedAllocators()); + alloc.Ref().ForcefullySetMemoryYellowZone(false); + + TTypeEnvironment env(alloc); + TProgramBuilder pgmBuilder(env, *functionRegistry); + + const auto structType = pgmBuilder.NewStructType({ + {"key", pgmBuilder.NewDataType(NUdf::EDataSlot::String)}, + {"subkey", pgmBuilder.NewDataType(NUdf::EDataSlot::String)}, + {"value", pgmBuilder.NewDataType(NUdf::EDataSlot::String)} + }); + + auto dataType = pgmBuilder.NewFlowType(structType); + TCallableBuilder inputCallableBuilder(env, "FmrInputJob", dataType); + auto inputNode = TRuntimeNode(inputCallableBuilder.Build(), false); + + const auto prefix = pgmBuilder.NewDataLiteral("prefix_"); + const auto map = pgmBuilder.Map(inputNode, + [&](TRuntimeNode item) { + TRuntimeNode prefixKey = pgmBuilder.Concat(prefix, pgmBuilder.Member(item, "key")); + TRuntimeNode subkey = pgmBuilder.Member(item, "subkey"); + TRuntimeNode value = pgmBuilder.Member(item, "value"); + return pgmBuilder.NewStruct(structType, {{"key", prefixKey}, {"subkey", subkey}, {"value", value}}); + }); + + TCallableBuilder outputCallableBuilder(env, "FmrOutputJob", dataType); + outputCallableBuilder.Add(map); + auto outputNode = TRuntimeNode(outputCallableBuilder.Build(), false); + auto pgmReturn = pgmBuilder.Discard(outputNode); + + TString serializedMapLambda = SerializeRuntimeNode(pgmReturn, env); + mapJob.SetLambdaCode(serializedMapLambda); + + const TString spec = R"({ + tables = [{ + "_yql_row_spec" = { + "Type" = [ + "StructType"; [ + ["key"; ["DataType"; "String"]]; + ["subkey"; ["DataType"; "String"]]; + ["value"; ["DataType"; "String"]]; + ] + ] + } + }] + })"; + const TString type = R"( + [ + "StructType"; + [ + ["key"; ["DataType"; "String"]]; + ["subkey"; ["DataType"; "String"]]; + ["value"; ["DataType"; "String"]]; + ] + ])"; + mapJob.SetInputSpec(spec); + mapJob.SetOutSpec(spec); + mapJob.SetInputType(type); + } + + mapJob.DoFmrJob(); + + // Checking correctness + auto tableDataServiceClient = MakeTableDataServiceClient(port); + TString key = "table_id_part_id:0"; + auto gottenBinaryTableContent = tableDataServiceClient->Get(key).GetValueSync(); + UNIT_ASSERT(gottenBinaryTableContent); + + // Reformating data + auto textTableContent = GetTextYson(*gottenBinaryTableContent); + THashMap> expectedFormattedContent{ + {"key", {"prefix_075", "prefix_800"}}, + {"subkey", {"1", "2"}}, + {"value", {"abc", "ddd"}} + }; + THashMap> gottenFormattedContent; + + std::vector splittedTableContent; + StringSplitter(textTableContent).SplitBySet("\n\";{}=").SkipEmpty().Collect(&splittedTableContent); + + for (ui64 i = 0; i < splittedTableContent.size(); i += 2) { + TString columnKey = splittedTableContent[i], columnValue = splittedTableContent[i + 1]; + gottenFormattedContent[columnKey].emplace_back(columnValue); + } + UNIT_ASSERT_VALUES_EQUAL(expectedFormattedContent, gottenFormattedContent); + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_queue_ut.cpp b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_queue_ut.cpp new file mode 100644 index 000000000000..07b0163271f0 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_raw_table_queue_ut.cpp @@ -0,0 +1,57 @@ +#include +#include +#include +#include +#include + +namespace NYql::NFmr { + +std::vector TableContentRows = { + "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};", + "{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};", + "{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};", + "{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};" +}; + + +Y_UNIT_TEST_SUITE(FmrRawTableQueueTests) { + Y_UNIT_TEST(ReadWriteManyThreads) { + ui64 inputStreamsNum = 5, repeatsNum = 10; + TFmrRawTableQueueSettings queueSettings{.MaxInflightBytes = 128}; + auto queue = MakeIntrusive(inputStreamsNum, queueSettings); + char rowEnd = '|'; // Adding end symbol to row for splitting simplicity + + auto threadPool = CreateThreadPool(3); + for (ui64 i = 0; i < inputStreamsNum; ++i) { + // each thread writes repeatsNum copies of TableContent to queue + threadPool->SafeAddFunc([queue, repeatsNum, i, rowEnd] { + TFmrRawTableQueueWriterSettings writerSettings{.ChunkSize = 100}; + TFmrRawTableQueueWriter queueWriter(queue, writerSettings); + for (ui64 j = 0; j < repeatsNum; ++j) { + for (auto& row: TableContentRows) { + queueWriter.Write(row + rowEnd); + queueWriter.NotifyRowEnd(); + } + } + queueWriter.Flush(); + queue->NotifyInputFinished(i); + }); + } + TFmrRawTableQueueReader reader{queue}; + TString result = reader.ReadAll(); + std::vector splittedRows; + StringSplitter(result).Split(rowEnd).AddTo(&splittedRows); + std::unordered_map gottenRows; + for (auto& row: splittedRows) { + if (!row.empty()) { + ++gottenRows[row]; + } + } + UNIT_ASSERT_VALUES_EQUAL(gottenRows.size(), TableContentRows.size()); + for (auto& row: TableContentRows) { + UNIT_ASSERT_VALUES_EQUAL(gottenRows[row], inputStreamsNum * repeatsNum); + } + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/ya.make b/yt/yql/providers/yt/fmr/job/impl/ya.make index dc29cebd2651..e6d928b8d7ca 100644 --- a/yt/yql/providers/yt/fmr/job/impl/ya.make +++ b/yt/yql/providers/yt/fmr/job/impl/ya.make @@ -2,6 +2,9 @@ LIBRARY() SRCS( yql_yt_job_impl.cpp + yql_yt_raw_table_queue.cpp + yql_yt_raw_table_queue_reader.cpp + yql_yt_raw_table_queue_writer.cpp yql_yt_table_data_service_reader.cpp yql_yt_table_data_service_writer.cpp ) @@ -12,6 +15,7 @@ PEERDIR( yt/cpp/mapreduce/interface yt/yql/providers/yt/fmr/job/interface yt/yql/providers/yt/fmr/utils + yt/yql/providers/yt/fmr/process yt/yql/providers/yt/fmr/table_data_service/interface yql/essentials/utils yql/essentials/utils/log diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp index 3037b1f4958e..350a610f4583 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -36,7 +37,7 @@ class TFmrJob: public IFmrJob { YQL_ENSURE(clusterConnections.size() == 1); - std::vector ytTableReaders = GetYtTableReaders(ytTableTaskRef, clusterConnections); + std::vector ytTableReaders = GetYtTableReaders(YtJobService_, ytTableTaskRef, clusterConnections); auto tableDataServiceWriter = MakeIntrusive(tableId, partId, TableDataService_, Settings_.FmrWriterSettings); for (auto& ytTableReader: ytTableReaders) { @@ -95,7 +96,7 @@ class TFmrJob: public IFmrJob { for (const auto& inputTableRef : taskTableInputRef.Inputs) { threadPool->SafeAddFunc([&, tableDataServiceWriter] { try { - auto inputTableReaders = GetTableInputStreams(inputTableRef, clusterConnections); + auto inputTableReaders = GetTableInputStreams(YtJobService_, TableDataService_, inputTableRef, clusterConnections); for (auto& tableReader: inputTableReaders) { ParseRecords(tableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex); } @@ -124,42 +125,6 @@ class TFmrJob: public IFmrJob { ythrow yexception() << "Not implemented"; } -private: - std::vector GetTableInputStreams(const TTaskTableRef& tableRef, const std::unordered_map& clusterConnections) const { - auto ytTableTaskRef = std::get_if(&tableRef); - auto fmrTable = std::get_if(&tableRef); - if (ytTableTaskRef) { - return GetYtTableReaders(*ytTableTaskRef, clusterConnections); - } else if (fmrTable) { - return {MakeIntrusive(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings)}; - } else { - ythrow yexception() << "Unsupported table type"; - } - } - - std::vector GetYtTableReaders(const TYtTableTaskRef& ytTableTaskRef, const std::unordered_map& clusterConnections) const { - std::vector ytTableReaders; - if (!ytTableTaskRef.FilePaths.empty()) { - // underlying gateway is file, so create readers from filepaths. - for (auto& filePath: ytTableTaskRef.FilePaths) { - ytTableReaders.emplace_back(YtJobService_->MakeReader(filePath)); - YQL_CLOG(DEBUG, FastMapReduce) << "Creating reader for file path " << filePath; - } - } else { - for (auto& richPath: ytTableTaskRef.RichPaths) { - YQL_ENSURE(richPath.Cluster_); - - // TODO - вместо этого написать нормальные хелперы из RichPath в структуры и назад - TStringBuf choppedPath; - YQL_ENSURE(TStringBuf(richPath.Path_).AfterPrefix("//", choppedPath)); - auto fmrTableId = TFmrTableId(*richPath.Cluster_, TString(choppedPath)); - auto clusterConnection = clusterConnections.at(fmrTableId); - ytTableReaders.emplace_back(YtJobService_->MakeReader(richPath, clusterConnection, Settings_.YtReaderSettings)); - } - } - return ytTableReaders; - } - private: ITableDataService::TPtr TableDataService_; IYtJobService::TPtr YtJobService_; @@ -209,6 +174,20 @@ TJobResult RunJob( return {ETaskStatus::Completed, *statistics}; }; +void FillMapFmrJob( + TFmrUserJob& mapJob, + const TMapTaskParams& mapTaskParams, + const std::unordered_map& clusterConnections, + const TString& tableDataServiceDiscoveryFilePath, + bool useFileGateway +) { + mapJob.SetTableDataService(tableDataServiceDiscoveryFilePath); + mapJob.SetTaskInputTables(mapTaskParams.Input); + mapJob.SetTaskFmrOutputTables(mapTaskParams.Output); + mapJob.SetClusterConnections(clusterConnections); + mapJob.SetYtJobService(useFileGateway); +} + TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) { if (!task->JobSettings) { return TFmrJobSettings(); diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h index 93affcaf4edb..11d149bb6eee 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -34,4 +35,12 @@ TJobResult RunJob(TTask::TPtr task, ITableDataService::TPtr tableDataService, IY TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task); +void FillMapFmrJob( + TFmrUserJob& mapJob, + const TMapTaskParams& mapTaskParams, + const std::unordered_map& clusterConnections, + const TString& tableDataServiceDiscoveryFilePath, + bool useFileGateway +); + } // namespace NYql diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue.cpp new file mode 100644 index 000000000000..850e02cdcf63 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue.cpp @@ -0,0 +1,68 @@ +#include "yql_yt_raw_table_queue.h" +#include + +namespace NYql::NFmr { + +TFmrRawTableQueue::TFmrRawTableQueue(ui64 numberOfInputs, const TFmrRawTableQueueSettings& settings) + : InputStreamsNum_(numberOfInputs), Settings_(settings) +{ +} + +void TFmrRawTableQueue::AddRow(TBuffer&& rowContent) { + YQL_ENSURE(rowContent.size() <= Settings_.MaxInflightBytes, "Row size is too large"); + with_lock(QueueMutex_) { + QueueInflightBytesCondVar_.Wait(QueueMutex_, [&] { + return CurInflightBytes_ + rowContent.size() < Settings_.MaxInflightBytes || ExceptionMessage_; + }); + + if (ExceptionMessage_) { + return; + } + CurInflightBytes_ += rowContent.size(); + RowsQueue_.push(std::move(rowContent)); + QueueSizeCondVar_.BroadCast(); + } +} + +TMaybe TFmrRawTableQueue::PopRow() { + with_lock(QueueMutex_) { + QueueSizeCondVar_.Wait(QueueMutex_, [&] { + return !RowsQueue_.empty() || IsFinished() || ExceptionMessage_; + }); + if (ExceptionMessage_) { + ythrow yexception() << ExceptionMessage_; + } + if (IsFinished()) { + return Nothing(); + } + auto row = RowsQueue_.front(); + RowsQueue_.pop(); + CurInflightBytes_ -= row.size(); + QueueInflightBytesCondVar_.BroadCast(); + return row; + } +} + +void TFmrRawTableQueue::NotifyInputFinished(ui64 input_id) { + with_lock(QueueMutex_) { + if (FinishedInputs_.contains(input_id)) { + ExceptionMessage_ = TStringBuilder() << "Input stream with id" << input_id << " is marked as already finished"; + } + FinishedInputs_.insert(input_id); + QueueSizeCondVar_.BroadCast(); + } +} + +bool TFmrRawTableQueue::IsFinished() const { + return (RowsQueue_.empty() && FinishedInputs_.size() == InputStreamsNum_); +} + +void TFmrRawTableQueue::SetException(const TString& exceptionMessage) { + with_lock(QueueMutex_) { + ExceptionMessage_ = exceptionMessage; + } + QueueInflightBytesCondVar_.BroadCast(); + QueueSizeCondVar_.BroadCast(); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue.h new file mode 100644 index 000000000000..fbab670f0dce --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +// TODO - проверить весь код, писал не я + +namespace NYql::NFmr { + +struct TFmrRawTableQueueSettings { + ui64 MaxInflightBytes = 128 * 1024 * 1024; +}; + +class TFmrRawTableQueue: public TThrRefBase { +public: + using TPtr = TIntrusivePtr; + + virtual ~TFmrRawTableQueue() = default; + + TFmrRawTableQueue(ui64 inputStreamsNum, const TFmrRawTableQueueSettings& settings = TFmrRawTableQueueSettings()); + + void AddRow(TBuffer&& rowContent); + + TMaybe PopRow(); + + void NotifyInputFinished(ui64 input_id); + + void SetException(const TString& exceptionMessage); + +private: + bool IsFinished() const; + + TQueue RowsQueue_; + ui64 CurInflightBytes_ = 0; + const ui64 InputStreamsNum_; + TMutex QueueMutex_ = TMutex(); + TCondVar QueueInflightBytesCondVar_; + TCondVar QueueSizeCondVar_; + + TMaybe ExceptionMessage_ = Nothing(); + std::unordered_set FinishedInputs_; + TFmrRawTableQueueSettings Settings_; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_reader.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_reader.cpp new file mode 100644 index 000000000000..d03b333a13a1 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_reader.cpp @@ -0,0 +1,55 @@ +#include "yql_yt_raw_table_queue_reader.h" + +namespace NYql::NFmr { + +TFmrRawTableQueueReader::TFmrRawTableQueueReader(TFmrRawTableQueue::TPtr rawTableQueue) + : RawTableQueue_(rawTableQueue) +{ +} + +size_t TFmrRawTableQueueReader::DoRead(void* buf, size_t len) { + ui64 totalRead = 0; + char* output = static_cast(buf); + + while (len > 0) { + if (BlockContentPos_ == BlockContent_.size()) { + auto row = RawTableQueue_->PopRow(); + if (!row) { + return totalRead; + } + BlockContent_.Clear(); + BlockContent_.Append(row->Data(), row->size()); + BlockContentPos_ = 0; + } + if (!BlockContent_.Empty()) { + ui64 available = BlockContent_.size() - BlockContentPos_; + + if (available > 0) { + ui64 toCopy = std::min(available, len); + std::copy( + BlockContent_.Begin() + BlockContentPos_, + BlockContent_.Begin() + BlockContentPos_ + toCopy, + output + ); + output += toCopy; + len -= toCopy; + BlockContentPos_ += toCopy; + totalRead += toCopy; + } + } + } + return totalRead; +} + +bool TFmrRawTableQueueReader::Retry(const TMaybe&, const TMaybe&, const std::exception_ptr&) { + return false; +} + +void TFmrRawTableQueueReader::ResetRetries() { +} + +bool TFmrRawTableQueueReader::HasRangeIndices() const { + return false; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_reader.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_reader.h new file mode 100644 index 000000000000..188cc1d6fc86 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_reader.h @@ -0,0 +1,26 @@ +#include "yql_yt_raw_table_queue.h" + +namespace NYql::NFmr { + +class TFmrRawTableQueueReader: public NYT::TRawTableReader { +public: + using TPtr = TIntrusivePtr; + + TFmrRawTableQueueReader(TFmrRawTableQueue::TPtr rawTableQueue); + + bool Retry(const TMaybe&, const TMaybe&, const std::exception_ptr&) override; + + void ResetRetries() override; + + bool HasRangeIndices() const override; + +private: + size_t DoRead(void* buf, size_t len) override; + +private: + TFmrRawTableQueue::TPtr RawTableQueue_; + TBuffer BlockContent_; // Represents row currently read from queue + ui64 BlockContentPos_ = 0; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_writer.cpp b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_writer.cpp new file mode 100644 index 000000000000..73ea389fc3a7 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_writer.cpp @@ -0,0 +1,29 @@ +#include "yql_yt_raw_table_queue_writer.h" + +namespace NYql::NFmr { + +TFmrRawTableQueueWriter::TFmrRawTableQueueWriter(TFmrRawTableQueue::TPtr rawTableQueue,const TFmrRawTableQueueWriterSettings& settings) + : RawTableQueue_(rawTableQueue), Settings_(settings) +{ +} + +void TFmrRawTableQueueWriter::DoWrite(const void* buf, size_t len) +{ + BlockContent_.Append(static_cast(buf), len); +} + +void TFmrRawTableQueueWriter::NotifyRowEnd() { + if (BlockContent_.size() >= Settings_.ChunkSize) { + DoFlush(); + } +} + +void TFmrRawTableQueueWriter::DoFlush() { + if (BlockContent_.Empty()) { + return; + } + RawTableQueue_->AddRow(std::move(BlockContent_)); + BlockContent_.Clear(); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_writer.h new file mode 100644 index 000000000000..e2a9af699186 --- /dev/null +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_writer.h @@ -0,0 +1,31 @@ +#include "yql_yt_raw_table_queue.h" + +namespace NYql::NFmr { + +struct TFmrRawTableQueueWriterSettings { + ui64 ChunkSize = 1024 * 1024; +}; + +class TFmrRawTableQueueWriter: public NYT::TRawTableWriter { +public: + TFmrRawTableQueueWriter( + TFmrRawTableQueue::TPtr rawTableQueue, + const TFmrRawTableQueueWriterSettings& settings = TFmrRawTableQueueWriterSettings() + ); + + void NotifyRowEnd() override; + +protected: + void DoWrite(const void* buf, size_t len) override; + + void DoFlush() override; + +private: + TFmrRawTableQueue::TPtr RawTableQueue_; + + TBuffer BlockContent_; + + TFmrRawTableQueueWriterSettings Settings_ = {}; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h index ca42498b276e..5e29833cb138 100644 --- a/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h +++ b/yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h @@ -18,6 +18,8 @@ struct TFmrWriterSettings { class TFmrTableDataServiceWriter: public NYT::TRawTableWriter { public: + using TPtr = TIntrusivePtr; + TFmrTableDataServiceWriter( const TString& tableId, const TString& partId, diff --git a/yt/yql/providers/yt/fmr/process/ya.make b/yt/yql/providers/yt/fmr/process/ya.make new file mode 100644 index 000000000000..9474697574c3 --- /dev/null +++ b/yt/yql/providers/yt/fmr/process/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +SRCS( + yql_yt_job_fmr.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/table_data_service/client/impl + yt/yql/providers/yt/fmr/table_data_service/discovery/file + yt/yql/providers/yt/fmr/yt_job_service/file + yt/yql/providers/yt/fmr/yt_job_service/impl + yt/yql/providers/yt/fmr/utils + yt/yql/providers/yt/job +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp new file mode 100644 index 000000000000..5f3e391cfb1e --- /dev/null +++ b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.cpp @@ -0,0 +1,89 @@ +#include "yql_yt_job_fmr.h" +#include +#include +#include +#include +#include + +namespace NYql { + +void TFmrUserJob::Save(IOutputStream& s) const { + TYqlUserJobBase::Save(s); + ::SaveMany(&s, + InputTables_, + OutputTables_, + ClusterConnections_, + UseFileGateway_, + TableDataServiceDiscoveryFilePath_ + ); +} + +void TFmrUserJob::Load(IInputStream& s) { + TYqlUserJobBase::Load(s); + ::LoadMany(&s, + InputTables_, + OutputTables_, + ClusterConnections_, + UseFileGateway_, + TableDataServiceDiscoveryFilePath_ + ); +} + +TString TFmrUserJob::GetJobFactoryPrefix() const { + return "Fmr"; +} + +TIntrusivePtr TFmrUserJob::MakeMkqlJobReader() { + return MakeIntrusive(*QueueReader_, YQL_JOB_CODEC_BLOCK_COUNT, YQL_JOB_CODEC_BLOCK_SIZE); +} + +TIntrusivePtr TFmrUserJob::MakeMkqlJobWriter() { + TVector outputStreams; + for (auto& writer: TableDataServiceWriters_) { + outputStreams.emplace_back(writer.Get()); + } + return MakeIntrusive(outputStreams, YQL_JOB_CODEC_BLOCK_COUNT, YQL_JOB_CODEC_BLOCK_SIZE); +} + +void TFmrUserJob::FillQueueFromInputTables() { + ui64 inputTablesNum = InputTables_.Inputs.size(); + for (ui64 curTableNum = 0; curTableNum < inputTablesNum; ++curTableNum) { + ThreadPool_->SafeAddFunc([&, curTableNum] () mutable { + try { + auto inputTableRef = InputTables_.Inputs[curTableNum]; + auto queueTableWriter = MakeIntrusive(UnionInputTablesQueue_); + auto inputTableReaders = GetTableInputStreams(YtJobService_, TableDataService_, inputTableRef, ClusterConnections_); + for (auto tableReader: inputTableReaders) { + ParseRecords(tableReader, queueTableWriter, 1, 1000000, CancelFlag_); // TODO - settings + UnionInputTablesQueue_->NotifyInputFinished(curTableNum); + } + queueTableWriter->Flush(); + } catch (...) { + UnionInputTablesQueue_->SetException(CurrentExceptionMessage()); + } + }); + } +} + +void TFmrUserJob::InitializeFmrUserJob() { + ui64 inputTablesSize = InputTables_.Inputs.size(); + UnionInputTablesQueue_ = MakeIntrusive(inputTablesSize); + QueueReader_ = MakeIntrusive(UnionInputTablesQueue_); + + YtJobService_ = UseFileGateway_ ? MakeFileYtJobSerivce() : MakeYtJobSerivce(); + + auto tableDataServiceDiscovery = MakeFileTableDataServiceDiscovery({.Path = TableDataServiceDiscoveryFilePath_}); + TableDataService_ = MakeTableDataServiceClient(tableDataServiceDiscovery); + + for (auto& fmrTable: OutputTables_) { + TableDataServiceWriters_.emplace_back(MakeIntrusive(fmrTable.TableId, fmrTable.PartId, TableDataService_)); // TODO - settings + } +} + +void TFmrUserJob::DoFmrJob() { + InitializeFmrUserJob(); + FillQueueFromInputTables(); + TYqlUserJobBase::Do(); +} + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h new file mode 100644 index 000000000000..dc4d3ade4812 --- /dev/null +++ b/yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NYql { + +using namespace NYql::NFmr; + +class TFmrUserJob: public TYqlUserJobBase { +public: + TFmrUserJob() + : TYqlUserJobBase() + { + } + + virtual ~TFmrUserJob() { + CancelFlag_->store(true); + ThreadPool_->Stop(); + } + + void SetTaskInputTables(const TTaskTableInputRef& taskInputTables) { + InputTables_ = taskInputTables; + } + + void SetTaskFmrOutputTables(const std::vector& outputTables) { + OutputTables_ = outputTables; + } + + void SetClusterConnections(const std::unordered_map& clusterConnections) { + ClusterConnections_ = clusterConnections; + } + + void SetYtJobService(bool useFileGateway) { + UseFileGateway_ = useFileGateway; + } + + void SetTableDataService(const TString& tableDataServiceDiscoveryFilePath) { + TableDataServiceDiscoveryFilePath_ = tableDataServiceDiscoveryFilePath; + } + + void Save(IOutputStream& s) const override; + void Load(IInputStream& s) override; + + void DoFmrJob(); + +protected: + TIntrusivePtr MakeMkqlJobWriter() override; + + TIntrusivePtr MakeMkqlJobReader() override; + + TString GetJobFactoryPrefix() const override; + +private: + void FillQueueFromInputTables(); + + void InitializeFmrUserJob(); + + // Serializable part (don't forget to add new members to Save/Load) + TTaskTableInputRef InputTables_; + std::vector OutputTables_; + std::unordered_map ClusterConnections_; + bool UseFileGateway_; + TString TableDataServiceDiscoveryFilePath_; + // End of serializable part + + TFmrRawTableQueue::TPtr UnionInputTablesQueue_; // Queue which represents union of all input streams + TFmrRawTableQueueReader::TPtr QueueReader_; + TVector TableDataServiceWriters_; + ITableDataService::TPtr TableDataService_; + IYtJobService::TPtr YtJobService_; + THolder ThreadPool_ = CreateThreadPool(3); + std::shared_ptr> CancelFlag_ = std::make_shared>(false); + // TODO - pass settings for various classes here. +}; + +} // namespace NYql diff --git a/yt/yql/providers/yt/fmr/proto/request_options.proto b/yt/yql/providers/yt/fmr/proto/request_options.proto index 92db8d0b0e7d..4a72087e333d 100644 --- a/yt/yql/providers/yt/fmr/proto/request_options.proto +++ b/yt/yql/providers/yt/fmr/proto/request_options.proto @@ -161,13 +161,13 @@ message TMergeTaskParams { message TMapOperationParams { repeated TOperationTableRef Input = 1; repeated TFmrTableRef Output = 2; - string Executable = 3; + string SerializedMapJobState = 3; } message TMapTaskParams { TTaskTableInputRef Input = 1; repeated TFmrTableOutputRef Output = 2; - string Executable = 3; + string SerializedMapJobState = 3; } message TOperationParams { diff --git a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp index 34f01f4884d6..22139984134c 100644 --- a/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp +++ b/yt/yql/providers/yt/fmr/request_options/proto_helpers/yql_yt_request_proto_helpers.cpp @@ -415,7 +415,7 @@ NProto::TMapOperationParams MapOperationParamsToProto(const TMapOperationParams& auto protoFmrTableRef = FmrTableRefToProto(fmrTableRef); protoMapOperationParams.AddOutput()->Swap(&protoFmrTableRef); } - protoMapOperationParams.SetExecutable(mapOperationParams.Executable); + protoMapOperationParams.SetSerializedMapJobState(mapOperationParams.SerializedMapJobState); return protoMapOperationParams; } @@ -428,7 +428,7 @@ TMapOperationParams MapOperationParamsFromProto(const NProto::TMapOperationParam for (auto& protoFmrTableRef: protoMapOperationParams.GetOutput()) { outputTables.emplace_back(FmrTableRefFromProto(protoFmrTableRef)); } - return TMapOperationParams{.Input = inputTables, .Output = outputTables, .Executable = protoMapOperationParams.GetExecutable()}; + return TMapOperationParams{.Input = inputTables, .Output = outputTables, .SerializedMapJobState = protoMapOperationParams.GetSerializedMapJobState()}; } NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) { @@ -439,7 +439,7 @@ NProto::TMapTaskParams MapTaskParamsToProto(const TMapTaskParams& mapTaskParams) auto protoFmrTableOutputRef = FmrTableOutputRefToProto(fmrTableOutputRef); protoMapTaskParams.AddOutput()->Swap(&protoFmrTableOutputRef); } - protoMapTaskParams.SetExecutable(mapTaskParams.Executable); + protoMapTaskParams.SetSerializedMapJobState(mapTaskParams.SerializedMapJobState); return protoMapTaskParams; } @@ -451,7 +451,7 @@ TMapTaskParams MapTaskParamsFromProto(const NProto::TMapTaskParams& protoMapTask outputTables.emplace_back(FmrTableOutputRefFromProto(protoFmrTableOutputRef)); } mapTaskParams.Output = outputTables; - mapTaskParams.Executable = protoMapTaskParams.GetExecutable(); + mapTaskParams.SerializedMapJobState = protoMapTaskParams.GetSerializedMapJobState(); return mapTaskParams; } diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp index a013f67329aa..e36fc8026d41 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.cpp @@ -1,5 +1,6 @@ #include "yql_yt_request_options.h" #include +#include namespace NYql::NFmr { @@ -19,8 +20,134 @@ TTaskState::TPtr MakeTaskState(ETaskStatus taskStatus, const TString& taskId, co return MakeIntrusive(taskStatus, taskId, taskErrorMessage, stats); } +////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// Helper serialization functions + +void SaveRichPath(IOutputStream* buffer, const NYT::TRichYPath& path) { + TString serializedPath = NYT::NodeToYsonString(NYT::PathToNode(path)); + ::Save(buffer, serializedPath); +} +void LoadRichPath(IInputStream* buffer, NYT::TRichYPath& path) { + TString serializedPath; + ::Load(buffer, serializedPath); + auto node = NYT::NodeFromYsonString(serializedPath); + NYT::Deserialize(path, node); +} + +void TYtTableTaskRef::Save(IOutputStream* buffer) const { + ::Save(buffer, RichPaths.size()); + for (auto& path: RichPaths) { + SaveRichPath(buffer, path); + } + ::Save(buffer, FilePaths); +} + +void TYtTableTaskRef::Load(IInputStream* buffer) { + ui64 richPathsSize; + ::Load(buffer, richPathsSize); + std::vector richPaths; + + for (ui64 i = 0; i < richPathsSize; ++i) { + NYT::TRichYPath path; + LoadRichPath(buffer, path); + richPaths.emplace_back(path); + } + RichPaths = richPaths; + ::Load(buffer, FilePaths); +} + +void TTableRange::Save(IOutputStream* buffer) const { + ::SaveMany( + buffer, + PartId, + MinChunk, + MaxChunk + ); +} + +void TTableRange::Load(IInputStream* buffer) { + ::LoadMany( + buffer, + PartId, + MinChunk, + MaxChunk + ); +} + +void TFmrTableInputRef::Save(IOutputStream* buffer) const { + ::SaveMany( + buffer, + TableId, + TableRanges + ); +} + +void TFmrTableInputRef::Load(IInputStream* buffer) { + ::LoadMany( + buffer, + TableId, + TableRanges + ); +} + +void TTaskTableInputRef::Save(IOutputStream* buffer) const { + ::Save(buffer, Inputs); +} + +void TTaskTableInputRef::Load(IInputStream* buffer) { + ::Load(buffer, Inputs); +} + +void TFmrTableOutputRef::Save(IOutputStream* buffer) const { + ::SaveMany( + buffer, + TableId, + PartId + ); +} + +void TFmrTableOutputRef::Load(IInputStream* buffer) { + ::LoadMany( + buffer, + TableId, + PartId + ); +} + +void TClusterConnection::Save(IOutputStream* buffer) const { + ::SaveMany( + buffer, + TransactionId, + YtServerName, + Token + ); +} + +void TClusterConnection::Load(IInputStream* buffer) { + ::LoadMany( + buffer, + TransactionId, + YtServerName, + Token + ); +} + +void TFmrTableId::Save(IOutputStream* buffer) const { + ::Save(buffer, Id); +} + +void TFmrTableId::Load(IInputStream* buffer) { + ::Load(buffer, Id); +} + } // namespace NYql::NFmr +////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// Helper output operators for structs + + template<> void Out(IOutputStream& out, const NYql::NFmr::TFmrTableId& tableId) { out << tableId.Id; diff --git a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h index 53bf1d3d4b5e..732f6e168284 100644 --- a/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h +++ b/yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h @@ -74,10 +74,16 @@ struct TYtTableRef { bool operator == (const TYtTableRef&) const = default; }; +void SaveRichPath(IOutputStream* buffer, const NYT::TRichYPath& path); +void LoadRichPath(IInputStream* buffer, NYT::TRichYPath& path); + struct TYtTableTaskRef { std::vector RichPaths; std::vector FilePaths; + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); + bool operator == (const TYtTableTaskRef&) const = default; }; // corresponds to a partition of several yt input tables. @@ -90,6 +96,9 @@ struct TFmrTableId { TFmrTableId(const TString& cluster, const TString& path); + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); + bool operator == (const TFmrTableId&) const = default; }; @@ -102,12 +111,20 @@ struct TTableRange { TString PartId; ui64 MinChunk = 0; ui64 MaxChunk = 1; + + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); + bool operator == (const TTableRange&) const = default; }; // Corresnponds to range [MinChunk, MaxChunk) struct TFmrTableInputRef { TString TableId; std::vector TableRanges; + + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); + bool operator == (const TFmrTableInputRef&) const = default; }; // Corresponds to part of table with fixed TableId but several PartIds, Empty TablesRanges means that this table is not present in task. @@ -115,6 +132,9 @@ struct TFmrTableOutputRef { TString TableId; TString PartId; + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); + bool operator == (const TFmrTableOutputRef&) const = default; }; @@ -173,6 +193,8 @@ using TOperationTableRef = std::variant; using TTaskTableRef = std::variant; +// TODO - TYtTableTaskRef может быть из нескольких входных таблиц, но TFmrTableInputRef - часть одной таблицы, подумать как лучше + struct TUploadOperationParams { TFmrTableRef Input; TYtTableRef Output; @@ -200,6 +222,9 @@ struct TMergeOperationParams { struct TTaskTableInputRef { std::vector Inputs; + + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); }; // Corresponds to task input tables, which can consist parts of either fmr or yt input tables. struct TMergeTaskParams { @@ -210,13 +235,13 @@ struct TMergeTaskParams { struct TMapOperationParams { std::vector Input; std::vector Output; - TString Executable; + TString SerializedMapJobState; }; struct TMapTaskParams { TTaskTableInputRef Input; std::vector Output; - TString Executable; + TString SerializedMapJobState; }; using TOperationParams = std::variant; @@ -227,6 +252,9 @@ struct TClusterConnection { TString TransactionId; TString YtServerName; TMaybe Token; + + void Save(IOutputStream* buffer) const; + void Load(IInputStream* buffer); }; struct TTask: public TThrRefBase { diff --git a/yt/yql/providers/yt/fmr/table_data_service/client/impl/ya.make b/yt/yql/providers/yt/fmr/table_data_service/client/impl/ya.make new file mode 100644 index 000000000000..9429c16c0af1 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/client/impl/ya.make @@ -0,0 +1,22 @@ +LIBRARY() + +SRCS( + yql_yt_table_data_service_client_impl.cpp +) + +PEERDIR( + library/cpp/threading/future + library/cpp/http/simple + library/cpp/retry + library/cpp/yson/node + yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers + yt/yql/providers/yt/fmr/table_data_service/discovery/file + yt/yql/providers/yt/fmr/table_data_service/interface + yt/yql/providers/yt/fmr/utils + yql/essentials/utils + yql/essentials/utils/log +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/table_data_service/client/impl/yql_yt_table_data_service_client_impl.cpp b/yt/yql/providers/yt/fmr/table_data_service/client/impl/yql_yt_table_data_service_client_impl.cpp new file mode 100644 index 000000000000..ec98fd7658da --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/client/impl/yql_yt_table_data_service_client_impl.cpp @@ -0,0 +1,141 @@ +#include "yql_yt_table_data_service_client_impl.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NYql::NFmr { + +namespace { + +class TFmrTableDataServiceClient: public ITableDataService { +public: + TFmrTableDataServiceClient(ITableDataServiceDiscovery::TPtr discovery): TableDataServiceDiscovery_(discovery) {} + + NThreading::TFuture Put(const TString& key, const TString& value) override { + TString putRequestUrl = "/put_data?key=" + key; + ui64 workersNum = TableDataServiceDiscovery_->GetHostCount(); + auto tableDataServiceWorkerNum = std::hash()(key) % workersNum; + auto workerConnection = TableDataServiceDiscovery_->GetHosts()[tableDataServiceWorkerNum]; + auto httpClient = TKeepAliveHttpClient(workerConnection.Host, workerConnection.Port); + YQL_CLOG(TRACE, FastMapReduce) << "Sending put request with url: " << putRequestUrl << + " To table data service worker with host: " << workerConnection.Host << " and port: " << ToString(workerConnection.Port); + + auto putTableDataServiceFunc = [&]() { + try { + httpClient.DoPost(putRequestUrl, value, nullptr, GetHeadersWithLogContext(Headers_)); + return NThreading::MakeFuture(); + } catch (...) { + return NThreading::MakeErrorFuture(std::current_exception()); + } + }; + return *DoWithRetry, yexception>(putTableDataServiceFunc, RetryPolicy_, true, OnFail_); + } + + NThreading::TFuture> Get(const TString& key) const override { + TString getRequestUrl = "/get_data?key=" + key; + ui64 workersNum = TableDataServiceDiscovery_->GetHostCount(); + auto tableDataServiceWorkerNum = std::hash()(key) % workersNum; + auto workerConnection = TableDataServiceDiscovery_->GetHosts()[tableDataServiceWorkerNum]; + auto httpClient = TKeepAliveHttpClient(workerConnection.Host, workerConnection.Port); + TStringStream outputStream; + YQL_CLOG(TRACE, FastMapReduce) << "Sending get request with url: " << getRequestUrl << + " To table data service worker with host: " << workerConnection.Host << " and port: " << ToString(workerConnection.Port); + + auto getTableDataServiceFunc = [&]() { + try { + httpClient.DoGet(getRequestUrl,&outputStream, GetHeadersWithLogContext(Headers_)); + TString value = outputStream.ReadAll(); + TMaybe result; + if (value) { + result = value; + } + return NThreading::MakeFuture(result); + } catch (...) { + return NThreading::MakeErrorFuture>(std::current_exception()); + } + }; + return *DoWithRetry>, yexception>(getTableDataServiceFunc, RetryPolicy_, true, OnFail_); + } + + + NThreading::TFuture Delete(const TString& key) override { + TString deleteRequestUrl = "/delete_data?key=" + key; + ui64 workersNum = TableDataServiceDiscovery_->GetHostCount(); + auto tableDataServiceWorkerNum = std::hash()(key) % workersNum; + auto workerConnection = TableDataServiceDiscovery_->GetHosts()[tableDataServiceWorkerNum]; + auto httpClient = TKeepAliveHttpClient(workerConnection.Host, workerConnection.Port); + YQL_CLOG(TRACE, FastMapReduce) << "Sending delete request with url: " << deleteRequestUrl << + " To table data service worker with host: " << workerConnection.Host << " and port: " << ToString(workerConnection.Port); + + auto deleteTableDataServiceFunc = [&]() { + try { + httpClient.DoRequest("DELETE", deleteRequestUrl, "", nullptr, GetHeadersWithLogContext(Headers_)); + return NThreading::MakeFuture(); + } catch (...) { + return NThreading::MakeErrorFuture(std::current_exception()); + } + }; + return *DoWithRetry, yexception>(deleteTableDataServiceFunc, RetryPolicy_, true, OnFail_); + } + + NThreading::TFuture RegisterDeletion(const std::vector& groups) override { + NProto::TTableDataServiceGroupDeletionRequest protoDeletionRequest = TTableDataServiceGroupDeletionRequestToProto(groups); + TString serializedProtoDeletionRequest = protoDeletionRequest.SerializeAsStringOrThrow(); + + TString deleteGroupsRequestUrl = "/delete_groups"; + ui64 totalWorkersNum = TableDataServiceDiscovery_->GetHostCount(); + std::vector> allNodesDeletions; + for (ui64 workerNum = 0; workerNum < totalWorkersNum; ++workerNum) { + auto workerConnection = TableDataServiceDiscovery_->GetHosts()[workerNum]; + auto httpClient = TKeepAliveHttpClient(workerConnection.Host, workerConnection.Port); + YQL_CLOG(TRACE, FastMapReduce) << "Sending delete groups request with url: " << deleteGroupsRequestUrl << + " To table data service worker with host: " << workerConnection.Host << " and port: " << ToString(workerConnection.Port); + auto deletionRequestFunc = [&]() { + try { + auto protobufHeaders = TKeepAliveHttpClient::THeaders{{"Content-Type", "application/x-protobuf"}}; + httpClient.DoPost(deleteGroupsRequestUrl, serializedProtoDeletionRequest, nullptr, GetHeadersWithLogContext(protobufHeaders)); + return NThreading::MakeFuture(); + } catch (...) { + return NThreading::MakeErrorFuture(std::current_exception()); + } + }; + allNodesDeletions.emplace_back(*DoWithRetry, yexception>(deletionRequestFunc, RetryPolicy_, true, OnFail_)); + } + return WaitExceptionOrAll(allNodesDeletions); + } + +private: + ITableDataServiceDiscovery::TPtr TableDataServiceDiscovery_; + TKeepAliveHttpClient::THeaders Headers_{}; + + std::shared_ptr> RetryPolicy_ = IRetryPolicy::GetExponentialBackoffPolicy( + /*retryClassFunction*/ [] (const yexception&) { + return ERetryErrorClass::LongRetry; + }, + /*minDelay*/ TDuration::MilliSeconds(10), + /*minLongRetryDelay*/ TDuration::Seconds(1), + /* maxDelay */ TDuration::Seconds(30), + /*maxRetries*/ 3 + ); + + std::function OnFail_ = [](const yexception& exc) { + YQL_CLOG(DEBUG, FastMapReduce) << "Got exception, retrying: " << exc.what(); + }; +}; + +} // namespace + +ITableDataService::TPtr MakeTableDataServiceClient(ITableDataServiceDiscovery::TPtr discovery) { + return MakeIntrusive(discovery); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/client/impl/yql_yt_table_data_service_client_impl.h b/yt/yql/providers/yt/fmr/table_data_service/client/impl/yql_yt_table_data_service_client_impl.h new file mode 100644 index 000000000000..9d103718beba --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/client/impl/yql_yt_table_data_service_client_impl.h @@ -0,0 +1,10 @@ +#pragma once + +#include +#include + +namespace NYql::NFmr { + +ITableDataService::TPtr MakeTableDataServiceClient(ITableDataServiceDiscovery::TPtr discovery); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/ya.make b/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/ya.make new file mode 100644 index 000000000000..b8a49df24778 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + yql_yt_table_data_service_proto_helpers.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/proto +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/yql_yt_table_data_service_proto_helpers.cpp b/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/yql_yt_table_data_service_proto_helpers.cpp new file mode 100644 index 000000000000..60e628e93681 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/yql_yt_table_data_service_proto_helpers.cpp @@ -0,0 +1,21 @@ +#include "yql_yt_table_data_service_proto_helpers.h" + +namespace NYql::NFmr { + +NProto::TTableDataServiceGroupDeletionRequest TTableDataServiceGroupDeletionRequestToProto(const std::vector& groups) { + NProto::TTableDataServiceGroupDeletionRequest protoGroups; + for (auto& group: groups) { + protoGroups.AddGroups(group); + } + return protoGroups; +} + +std::vector TTableDataServiceGroupDeletionRequestFromProto(const NProto::TTableDataServiceGroupDeletionRequest& protoGroups) { + std::vector groups; + for (auto& protoGroup: protoGroups.GetGroups()) { + groups.emplace_back(protoGroup); + } + return groups; +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/yql_yt_table_data_service_proto_helpers.h b/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/yql_yt_table_data_service_proto_helpers.h new file mode 100644 index 000000000000..ec6490d4a88f --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers/yql_yt_table_data_service_proto_helpers.h @@ -0,0 +1,11 @@ +#include + +#include + +namespace NYql::NFmr { + +NProto::TTableDataServiceGroupDeletionRequest TTableDataServiceGroupDeletionRequestToProto(const std::vector& groups); + +std::vector TTableDataServiceGroupDeletionRequestFromProto(const NProto::TTableDataServiceGroupDeletionRequest& protoRequest); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/discovery/file/ya.make b/yt/yql/providers/yt/fmr/table_data_service/discovery/file/ya.make new file mode 100644 index 000000000000..9c03f53b52b8 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/discovery/file/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + yql_yt_file_service_discovery.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/table_data_service/discovery/interface + yql/essentials/utils +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/table_data_service/discovery/file/yql_yt_file_service_discovery.cpp b/yt/yql/providers/yt/fmr/table_data_service/discovery/file/yql_yt_file_service_discovery.cpp new file mode 100644 index 000000000000..089903fb9514 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/discovery/file/yql_yt_file_service_discovery.cpp @@ -0,0 +1,65 @@ +#include "yql_yt_file_service_discovery.h" +#include +#include +#include + +namespace NYql::NFmr { + +namespace { + +class TFileTableDataServiceDiscovery: public ITableDataServiceDiscovery { +public: + TFileTableDataServiceDiscovery(const TFileTableDataServiceDiscoverySettings& settings): WorkersPath_(settings.Path) { + Start(); + } + + void Start() override { + TFileInput readHosts(WorkersPath_); + TString currentRow; + std::vector connection; + std::vector workerConnections; + while (readHosts.ReadLine(currentRow)) { + StringSplitter(currentRow).Split(':').Collect(&connection); + YQL_ENSURE(connection.size() == 2); + TString host = connection[0]; + ui16 port; + if (!TryFromString(connection[1], port)) { + ythrow yexception() << " Failed to convert port " << connection[1] << " to ui16\n"; + } + workerConnections.emplace_back(host, port); + } + WorkerConnections_ = workerConnections; + HasStarted_ = true; + } + + void Stop() override {} + + ui64 GetHostCount() const override { + CheckHasStarted(); + return WorkerConnections_.size(); + } + + const std::vector& GetHosts() const override { + CheckHasStarted(); + return WorkerConnections_; + } + +private: + const TString WorkersPath_; + std::vector WorkerConnections_; + bool HasStarted_ = false; + + void CheckHasStarted() const { + if (!HasStarted_) { + ythrow yexception() << "File service discovery has not started yet"; + } + } +}; + +} + +ITableDataServiceDiscovery::TPtr MakeFileTableDataServiceDiscovery(const TFileTableDataServiceDiscoverySettings& settings) { + return MakeIntrusive(settings); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/discovery/file/yql_yt_file_service_discovery.h b/yt/yql/providers/yt/fmr/table_data_service/discovery/file/yql_yt_file_service_discovery.h new file mode 100644 index 000000000000..9eebfd3566e6 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/discovery/file/yql_yt_file_service_discovery.h @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace NYql::NFmr { + +struct TFileTableDataServiceDiscoverySettings { + TString Path; +}; + +ITableDataServiceDiscovery::TPtr MakeFileTableDataServiceDiscovery(const TFileTableDataServiceDiscoverySettings& settings); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/ya.make b/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/ya.make new file mode 100644 index 000000000000..46447dcead9e --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + yql_yt_service_discovery.cpp +) + +PEERDIR( + yql/essentials/utils +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/yql_yt_service_discovery.cpp b/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/yql_yt_service_discovery.cpp new file mode 100644 index 000000000000..5a5b93fc8fde --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/yql_yt_service_discovery.cpp @@ -0,0 +1 @@ +#include "yql_yt_service_discovery.h" diff --git a/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/yql_yt_service_discovery.h b/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/yql_yt_service_discovery.h new file mode 100644 index 000000000000..8d5c317978a4 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/discovery/interface/yql_yt_service_discovery.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include + +namespace NYql::NFmr { + +struct TTableDataServiceServerConnection { + TString Host; + ui16 Port; +}; + +class ITableDataServiceDiscovery: public IRunnable { +public: + using TPtr = TIntrusivePtr; + + virtual ~ITableDataServiceDiscovery() = default; + + virtual ui64 GetHostCount() const = 0; + + virtual const std::vector& GetHosts() const = 0; +}; + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/helpers/ya.make b/yt/yql/providers/yt/fmr/table_data_service/helpers/ya.make new file mode 100644 index 000000000000..07106e43d628 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/helpers/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + yql_yt_table_data_service_helers.cpp +) + +PEERDIR( + yt/yql/providers/yt/fmr/table_data_service/client/impl + yt/yql/providers/yt/fmr/table_data_service/discovery/file + yt/yql/providers/yt/fmr/table_data_service/local/impl + yt/yql/providers/yt/fmr/table_data_service/server +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helers.cpp b/yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helers.cpp new file mode 100644 index 000000000000..6d2fbb092b10 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helers.cpp @@ -0,0 +1,31 @@ +#include "yql_yt_table_data_service_helpers.h" + +namespace NYql::NFmr { + +TString WriteHostsToFile(TTempFileHandle& file, ui64 WorkersNum, const std::vector& connections) { + TString tempFileName = file.Name(); + TFileOutput writeHosts(file.Name()); + for (size_t i = 0; i < WorkersNum; ++i) { + writeHosts.Write(TStringBuilder() << connections[i].Host << ":" << connections[i].Port << "\n"); + } + return tempFileName; +} + +IFmrServer::TPtr MakeTableDataServiceServer(ui16 port) { + TTableDataServiceServerSettings tableDataServiceWorkerSettings{.WorkerId = 0, .WorkersNum = 1, .Port = port}; + auto tableDataServiceServer = MakeTableDataServiceServer(MakeLocalTableDataService(), tableDataServiceWorkerSettings); + tableDataServiceServer->Start(); + return tableDataServiceServer; +} + +ITableDataService::TPtr MakeTableDataServiceClient(ui16 port) { + TTempFileHandle hostsFile{}; + std::vector connections{{.Host = "localhost", .Port = port}}; + ui64 workersNum = 1; + auto path = WriteHostsToFile(hostsFile, workersNum, connections); + + auto tableDataServiceDiscovery = MakeFileTableDataServiceDiscovery({.Path=path}); + return MakeTableDataServiceClient(tableDataServiceDiscovery); +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helpers.h b/yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helpers.h new file mode 100644 index 000000000000..acaaa1a86fa2 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helpers.h @@ -0,0 +1,17 @@ +#include +#include +#include +#include +#include + +namespace NYql::NFmr { + +// Helper functions, mostly used for testing purposes. + +TString WriteHostsToFile(TTempFileHandle& file, ui64 WorkersNum, const std::vector& connections); + +IFmrServer::TPtr MakeTableDataServiceServer(ui16 port); + +ITableDataService::TPtr MakeTableDataServiceClient(ui16 port); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/server/ya.make b/yt/yql/providers/yt/fmr/table_data_service/server/ya.make new file mode 100644 index 000000000000..fa2435f4cd13 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/server/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + yql_yt_table_data_service_server.cpp +) + +PEERDIR( + library/cpp/http/server + library/cpp/yson/node + yql/essentials/utils + yt/yql/providers/yt/fmr/table_data_service/local/interface + yt/yql/providers/yt/fmr/table_data_service/client/proto_helpers + yt/yql/providers/yt/fmr/utils +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS(ut) diff --git a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp new file mode 100644 index 000000000000..9b2b7738f27b --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.cpp @@ -0,0 +1,183 @@ +#include "yql_yt_table_data_service_server.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NYql::NFmr { + +namespace { + +using THandler = std::function; + +enum class ETableDataServiceRequestHandler { + Put, + Get, + Delete, + DeleteGroups +}; + +class TReplier: public TRequestReplier { +public: + TReplier(std::unordered_map& handlers) + : Handlers_(handlers) + { + } + + bool DoReply(const TReplyParams& params) override { + TParsedHttpFull httpRequest(params.Input.FirstLine()); + auto handlerName = GetHandlerName(httpRequest); + if (!handlerName) { + params.Output << THttpResponse(HTTP_NOT_FOUND); + } else { + YQL_ENSURE(Handlers_.contains(*handlerName)); + auto callbackFunc = Handlers_[*handlerName]; + params.Output << callbackFunc(params.Input); + } + return true; + } + +private: + std::unordered_map Handlers_; + + TMaybe GetHandlerName(TParsedHttpFull httpRequest) { + TStringBuf queryPath; + httpRequest.Path.SkipPrefix("/"); + queryPath = httpRequest.Path.NextTok('/'); + if (queryPath == "put_data") { + YQL_ENSURE(httpRequest.Method == "POST"); + return ETableDataServiceRequestHandler::Put; + } else if (queryPath == "delete_groups") { + YQL_ENSURE(httpRequest.Method == "POST"); + return ETableDataServiceRequestHandler::DeleteGroups; + } else if (queryPath == "delete_data") { + YQL_ENSURE(httpRequest.Method == "DELETE"); + return ETableDataServiceRequestHandler::Delete; + } else if (queryPath == "get_data") { + YQL_ENSURE(httpRequest.Method == "GET"); + return ETableDataServiceRequestHandler::Get; + } + return Nothing(); + } +}; + +class TTableDataServiceServer: public THttpServer::ICallBack, public IRunnable { +public: + TTableDataServiceServer(ILocalTableDataService::TPtr tableDataService, const TTableDataServiceServerSettings& settings) + : TableDataService_(tableDataService), + Host_(settings.Host), + Port_(settings.Port), + WorkerId_(settings.WorkerId), + WorkersNum_(settings.WorkersNum) + { + YQL_ENSURE(WorkerId_ >= 0 && WorkerId_ < WorkersNum_); + THttpServer::TOptions opts; + opts.AddBindAddress(Host_, Port_); + HttpServer_ = MakeHolder(this, opts.EnableKeepAlive(true).EnableCompression(true)); + + THandler putTableDataServiceHandler = std::bind(&TTableDataServiceServer::PutTableDataServiceHandler, this, std::placeholders::_1); + THandler getTableDataServiceHandler = std::bind(&TTableDataServiceServer::GetTableDataServiceHandler, this, std::placeholders::_1); + THandler deleteTableDataServiceHandler = std::bind(&TTableDataServiceServer::DeleteTableDataServiceHandler, this, std::placeholders::_1); + THandler deleteGroupsTableDataServiceHandler = std::bind(&TTableDataServiceServer::DeleteGroupsTableDataServiceHandler, this, std::placeholders::_1); + + Handlers_ = std::unordered_map{ + {ETableDataServiceRequestHandler::Put, putTableDataServiceHandler}, + {ETableDataServiceRequestHandler::Get, getTableDataServiceHandler}, + {ETableDataServiceRequestHandler::Delete, deleteTableDataServiceHandler}, + {ETableDataServiceRequestHandler::DeleteGroups, deleteGroupsTableDataServiceHandler} + }; + } + + void Start() override { + HttpServer_->Start(); + Cerr << "Table data service server with id " << WorkerId_ << " is listnening on url " << "http://" + Host_ + ":" + ToString(Port_) << "\n"; + } + + void Stop() override { + HttpServer_->Stop(); + } + + ~TTableDataServiceServer() override { + Stop(); + } + + TClientRequest* CreateClient() override { + return new TReplier(Handlers_); + } + +private: + std::unordered_map Handlers_; + THolder HttpServer_; + ILocalTableDataService::TPtr TableDataService_; + const TString Host_; + const ui16 Port_; + const ui64 WorkerId_; + const ui64 WorkersNum_; + + TString GetTableDataServiceKey(THttpInput& input) { + TParsedHttpFull httpRequest(input.FirstLine()); + TStringBuf url = httpRequest.Request; + std::vector splittedUrl; + TString delim = "?key="; + StringSplitter(url).SplitByString(delim).AddTo(&splittedUrl); + YQL_ENSURE(splittedUrl.size() == 2); + return splittedUrl[1]; + } + + THttpResponse PutTableDataServiceHandler(THttpInput& input) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetLogContext(input)); + TString ysonTableContent = input.ReadAll(); + auto tableDataServiceKey = GetTableDataServiceKey(input); + TableDataService_->Put(tableDataServiceKey, ysonTableContent).GetValueSync(); + YQL_CLOG(TRACE, FastMapReduce) << "Putting content in table data service with key " << tableDataServiceKey; + return THttpResponse(HTTP_OK); + } + + THttpResponse GetTableDataServiceHandler(THttpInput& input) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetLogContext(input)); + auto tableDataServiceId = GetTableDataServiceKey(input); + TString ysonTableContent; + if (auto value = TableDataService_->Get(tableDataServiceId).GetValueSync()) { + ysonTableContent = *value; + } + THttpResponse httpResponse(HTTP_OK); + httpResponse.SetContent(ysonTableContent); + YQL_CLOG(TRACE, FastMapReduce) << "Getting content in table data service with key " << tableDataServiceId; + return httpResponse; + } + + THttpResponse DeleteTableDataServiceHandler(THttpInput& input) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetLogContext(input)); + auto tableDataServiceKey = GetTableDataServiceKey(input); + TableDataService_->Delete(tableDataServiceKey).GetValueSync(); + YQL_CLOG(TRACE, FastMapReduce) << "Deleting content in table data service with key " << tableDataServiceKey; + return THttpResponse(HTTP_OK); + } + + THttpResponse DeleteGroupsTableDataServiceHandler(THttpInput& input) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetLogContext(input)); + TString serializedProtoGroupDeletionRequest = input.ReadAll(); + NProto::TTableDataServiceGroupDeletionRequest protoGroupDeletionRequest; + protoGroupDeletionRequest.ParseFromStringOrThrow(serializedProtoGroupDeletionRequest); + auto deletionRequest = TTableDataServiceGroupDeletionRequestFromProto(protoGroupDeletionRequest); + TableDataService_->RegisterDeletion(deletionRequest).GetValueSync(); + YQL_CLOG(TRACE, FastMapReduce) << "Deleting groups in table data service" << JoinRange(' ', deletionRequest.begin(), deletionRequest.end()); return THttpResponse(HTTP_OK); + } +}; + +} // namespace + +IFmrServer::TPtr MakeTableDataServiceServer(ILocalTableDataService::TPtr tableDataService, const TTableDataServiceServerSettings& settings) { + return MakeHolder(tableDataService, settings); +} + +} // NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h new file mode 100644 index 000000000000..51ffe88a24d1 --- /dev/null +++ b/yt/yql/providers/yt/fmr/table_data_service/server/yql_yt_table_data_service_server.h @@ -0,0 +1,20 @@ +#include +#include + +namespace NYql::NFmr { + +using IFmrServer = IRunnable; + +struct TTableDataServiceServerSettings { + ui64 WorkerId; + ui64 WorkersNum; + TString Host = "localhost"; + ui16 Port = 7000; +}; + +IFmrServer::TPtr MakeTableDataServiceServer( + ILocalTableDataService::TPtr tableDataSerivce, + const TTableDataServiceServerSettings& settings +); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/utils/ya.make b/yt/yql/providers/yt/fmr/utils/ya.make index 8bd317b26d55..2b7e42a50397 100644 --- a/yt/yql/providers/yt/fmr/utils/ya.make +++ b/yt/yql/providers/yt/fmr/utils/ya.make @@ -5,6 +5,7 @@ SRCS( yql_yt_log_context.cpp yql_yt_parse_records.cpp yql_yt_table_data_service_key.cpp + yql_yt_table_input_streams.cpp ) PEERDIR( @@ -12,6 +13,8 @@ PEERDIR( yt/cpp/mapreduce/client yt/cpp/mapreduce/interface yt/yql/providers/yt/fmr/request_options + yt/yql/providers/yt/fmr/yt_job_service/interface + yt/yql/providers/yt/fmr/table_data_service/interface yt/yql/providers/yt/codec yql/essentials/utils ) diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.cpp b/yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.cpp new file mode 100644 index 000000000000..92e0ae212450 --- /dev/null +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.cpp @@ -0,0 +1,49 @@ +#include "yql_yt_table_input_streams.h" +#include + +namespace NYql::NFmr { + +std::vector GetYtTableReaders( + IYtJobService::TPtr jobService, + const TYtTableTaskRef& ytTableTaskRef, + const std::unordered_map& clusterConnections +) { + std::vector ytTableReaders; + if (!ytTableTaskRef.FilePaths.empty()) { + // underlying gateway is file, so create readers from filepaths. + for (auto& filePath: ytTableTaskRef.FilePaths) { + ytTableReaders.emplace_back(jobService->MakeReader(filePath)); + } + } else { + for (auto& richPath: ytTableTaskRef.RichPaths) { + YQL_ENSURE(richPath.Cluster_); + + // TODO - вместо этого написать нормальные хелперы из RichPath в структуры и назад + TStringBuf choppedPath; + YQL_ENSURE(TStringBuf(richPath.Path_).AfterPrefix("//", choppedPath)); + auto fmrTableId = TFmrTableId(*richPath.Cluster_, TString(choppedPath)); + auto clusterConnection = clusterConnections.at(fmrTableId); + ytTableReaders.emplace_back(jobService->MakeReader(richPath, clusterConnection)); // TODO - reader Settings + } + } + return ytTableReaders; +} + +std::vector GetTableInputStreams( + IYtJobService::TPtr jobService, + ITableDataService::TPtr tableDataService, + const TTaskTableRef& tableRef, + const std::unordered_map& clusterConnections +) { + auto ytTableTaskRef = std::get_if(&tableRef); + auto fmrTable = std::get_if(&tableRef); + if (ytTableTaskRef) { + return GetYtTableReaders(jobService, *ytTableTaskRef, clusterConnections); + } else if (fmrTable) { + return {MakeIntrusive(fmrTable->TableId, fmrTable->TableRanges, tableDataService)}; // TODO - fmr reader settings + } else { + ythrow yexception() << "Unsupported table type"; + } +} + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.h b/yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.h new file mode 100644 index 000000000000..286f436a218f --- /dev/null +++ b/yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.h @@ -0,0 +1,22 @@ +#pragma once + +#include +#include +#include + +namespace NYql::NFmr { + +std::vector GetYtTableReaders( + IYtJobService::TPtr jobService, + const TYtTableTaskRef& ytTableTaskRef, + const std::unordered_map& clusterConnections +); + +std::vector GetTableInputStreams( + IYtJobService::TPtr jobService, + ITableDataService::TPtr tableDataService, + const TTaskTableRef& tableRef, + const std::unordered_map& clusterConnections +); + +} // namespace NYql::NFmr diff --git a/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h index a4f7c2426cec..970b60bb9b18 100644 --- a/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h +++ b/yt/yql/providers/yt/fmr/yt_job_service/file/yql_yt_file_yt_job_service.h @@ -1,3 +1,5 @@ +#pragma once + #include namespace NYql::NFmr { diff --git a/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h index bac4abf6ab7f..b8ad928883b0 100644 --- a/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h +++ b/yt/yql/providers/yt/fmr/yt_job_service/impl/yql_yt_job_service_impl.h @@ -1,3 +1,5 @@ +#pragma once + #include namespace NYql::NFmr { diff --git a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp index a1d8d7409b68..7085e6f0a637 100644 --- a/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp +++ b/yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.cpp @@ -475,8 +475,7 @@ class TFmrYtGateway final: public TYtForwardingGatewayBase { auto [mapInputTables, clusterConnections] = GetInputTablesAndConnections(inputTables, std::move(options)); - TString executable = ""; //??? TODO how to extract executable bytecode from options - TMapOperationParams mapOperationParams{.Input = mapInputTables,.Output = fmrOutputTables, .Executable = executable}; + TMapOperationParams mapOperationParams{.Input = mapInputTables,.Output = fmrOutputTables, .SerializedMapJobState = ""}; // TODO - fill TStartOperationRequest mapOperationRequest{ .TaskType = ETaskType::Map, .OperationParams = mapOperationParams, diff --git a/yt/yql/providers/yt/job/ya.make b/yt/yql/providers/yt/job/ya.make index f67ac0d8c2ab..dfa145186274 100644 --- a/yt/yql/providers/yt/job/ya.make +++ b/yt/yql/providers/yt/job/ya.make @@ -8,6 +8,7 @@ SRCS( yql_job_registry.h yql_job_stats_writer.cpp yql_job_user.cpp + yql_job_user_base.cpp ) PEERDIR( diff --git a/yt/yql/providers/yt/job/yql_job_base.cpp b/yt/yql/providers/yt/job/yql_job_base.cpp index 0874eb452441..3d6b78e3fa12 100644 --- a/yt/yql/providers/yt/job/yql_job_base.cpp +++ b/yt/yql/providers/yt/job/yql_job_base.cpp @@ -279,6 +279,12 @@ void TYqlJobBase::Init() { }, RuntimeLogLevel); } +void TYqlJobBase::Finish() { + if (JobStats) { + JobStats->SetStat(Job_ThreadsCount, GetRunnigThreadsCount()); + } +} + void TYqlJobBase::Save(IOutputStream& s) const { ::SaveMany(&s, UdfModules, @@ -303,13 +309,6 @@ void TYqlJobBase::Load(IInputStream& s) { ); } -void TYqlJobBase::Do(const NYT::TRawJobContext& jobContext) { - DoImpl(jobContext.GetInputFile(), jobContext.GetOutputFileList()); - if (JobStats) { - JobStats->SetStat(Job_ThreadsCount, GetRunnigThreadsCount()); - } -} - TCallableVisitFuncProvider TYqlJobBase::MakeTransformProvider(THashMap* extraArgs) const { return TJobTransformProvider(extraArgs); } diff --git a/yt/yql/providers/yt/job/yql_job_base.h b/yt/yql/providers/yt/job/yql_job_base.h index eb9e02587dc8..fae4a6eaec20 100644 --- a/yt/yql/providers/yt/job/yql_job_base.h +++ b/yt/yql/providers/yt/job/yql_job_base.h @@ -46,7 +46,7 @@ struct TJobCountersProvider : public NKikimr::NUdf::ICountersProvider, public NK /////////////////////////////////////////////////////////////////////////////////////////////////////// -class TYqlJobBase: public NYT::IRawJob { +class TYqlJobBase { protected: TYqlJobBase() = default; virtual ~TYqlJobBase(); @@ -79,16 +79,15 @@ class TYqlJobBase: public NYT::IRawJob { LangVer = langver; } - void Do(const NYT::TRawJobContext& jobContext) override; - void Save(IOutputStream& stream) const override; - void Load(IInputStream& stream) override; + virtual void Save(IOutputStream& stream) const; + virtual void Load(IInputStream& stream); protected: NKikimr::NMiniKQL::TCallableVisitFuncProvider MakeTransformProvider(THashMap* extraArgs = nullptr) const; void Init(); - virtual void DoImpl(const TFile& inHandle, const TVector& outHandles) = 0; + void Finish(); protected: // Serializable part (don't forget to add new members to Save/Load) diff --git a/yt/yql/providers/yt/job/yql_job_calc.cpp b/yt/yql/providers/yt/job/yql_job_calc.cpp index 4cb3adf6dba3..05d88f69ea68 100644 --- a/yt/yql/providers/yt/job/yql_job_calc.cpp +++ b/yt/yql/providers/yt/job/yql_job_calc.cpp @@ -38,6 +38,11 @@ void TYqlCalcJob::Load(IInputStream& stream) { ::Load(&stream, UseResultYson_); } +void TYqlCalcJob::Do(const NYT::TRawJobContext& jobContext) { + DoImpl(jobContext.GetInputFile(), jobContext.GetOutputFileList()); + Finish(); +} + void TYqlCalcJob::DoImpl(const TFile& inHandle, const TVector& outHandles) { NYT::TTableReader reader(MakeIntrusive(MakeIntrusive(inHandle))); NYT::TTableWriter writer(MakeIntrusive(MakeHolder(outHandles))); diff --git a/yt/yql/providers/yt/job/yql_job_calc.h b/yt/yql/providers/yt/job/yql_job_calc.h index 4e5f2398a220..49ba11c48a73 100644 --- a/yt/yql/providers/yt/job/yql_job_calc.h +++ b/yt/yql/providers/yt/job/yql_job_calc.h @@ -12,7 +12,7 @@ namespace NYql { -class TYqlCalcJob : public TYqlJobBase { +class TYqlCalcJob : public TYqlJobBase, public NYT::IRawJob { public: TYqlCalcJob() = default; ~TYqlCalcJob() = default; @@ -28,10 +28,12 @@ class TYqlCalcJob : public TYqlJobBase { void Save(IOutputStream& stream) const override; void Load(IInputStream& stream) override; -protected: - void DoImpl(const TFile& inHandle, const TVector& outHandles) override; +public: + void Do(const NYT::TRawJobContext& jobContext) override; private: + void DoImpl(const TFile& inHandle, const TVector& outHandles); + TMaybe> Columns_; bool UseResultYson_ = false; }; diff --git a/yt/yql/providers/yt/job/yql_job_factory.cpp b/yt/yql/providers/yt/job/yql_job_factory.cpp index 6b7cd4fd692a..3d707aaf4aae 100644 --- a/yt/yql/providers/yt/job/yql_job_factory.cpp +++ b/yt/yql/providers/yt/job/yql_job_factory.cpp @@ -16,12 +16,12 @@ using namespace NKikimr; using namespace NKikimr::NMiniKQL; TComputationNodeFactory GetJobFactory(NYql::NCommon::TCodecContext& codecCtx, const TString& optLLVM, - const TMkqlIOSpecs* specs, NYT::IReaderImplBase* reader, TJobMkqlWriterImpl* writer) + const TMkqlIOSpecs* specs, NYT::IReaderImplBase* reader, TMkqlWriterImpl* writer, const TString& prefix) { TMaybe exprContextObject; - return [&codecCtx, optLLVM, specs, reader, writer, exprContextObject](NMiniKQL::TCallable& callable, const TComputationNodeFactoryContext& ctx) mutable -> IComputationNode* { + return [&codecCtx, optLLVM, specs, reader, writer, exprContextObject, prefix](NMiniKQL::TCallable& callable, const TComputationNodeFactoryContext& ctx) mutable -> IComputationNode* { TStringBuf name = callable.GetType()->GetName(); - if (name.SkipPrefix("Yt") && name.ChopSuffix("Job")) { + if (name.SkipPrefix(prefix) && name.ChopSuffix("Job")) { if (name == "TableContent") { return WrapYtTableContent(codecCtx, ctx.Mutables, callable, optLLVM, {} /*empty pathPrefix inside job*/); } diff --git a/yt/yql/providers/yt/job/yql_job_factory.h b/yt/yql/providers/yt/job/yql_job_factory.h index aa5edfa06078..fe0ed2953327 100644 --- a/yt/yql/providers/yt/job/yql_job_factory.h +++ b/yt/yql/providers/yt/job/yql_job_factory.h @@ -12,6 +12,6 @@ namespace NYql { NKikimr::NMiniKQL::TComputationNodeFactory GetJobFactory(NYql::NCommon::TCodecContext& codecCtx, - const TString& optLLVM, const TMkqlIOSpecs* specs, NYT::IReaderImplBase* reader, TJobMkqlWriterImpl* writer); + const TString& optLLVM, const TMkqlIOSpecs* specs, NYT::IReaderImplBase* reader, TMkqlWriterImpl* writer, const TString& prefix = "Yt"); } // NYql diff --git a/yt/yql/providers/yt/job/yql_job_infer_schema.cpp b/yt/yql/providers/yt/job/yql_job_infer_schema.cpp index 3e7c3eb117ba..bc2b420304ef 100644 --- a/yt/yql/providers/yt/job/yql_job_infer_schema.cpp +++ b/yt/yql/providers/yt/job/yql_job_infer_schema.cpp @@ -8,6 +8,19 @@ namespace NYql { +void TYqlInferSchemaJob::Save(IOutputStream& stream) const { + TYqlJobBase::Save(stream); +} + +void TYqlInferSchemaJob::Load(IInputStream& stream) { + TYqlJobBase::Load(stream); +} + +void TYqlInferSchemaJob::Do(const NYT::TRawJobContext& jobContext) { + DoImpl(jobContext.GetInputFile(), jobContext.GetOutputFileList()); + Finish(); +} + void TYqlInferSchemaJob::DoImpl(const TFile& inHandle, const TVector& outHandles) { NYT::TTableReader reader(MakeIntrusive(MakeIntrusive(inHandle))); NYT::TTableWriter writer(MakeIntrusive(MakeHolder(outHandles))); diff --git a/yt/yql/providers/yt/job/yql_job_infer_schema.h b/yt/yql/providers/yt/job/yql_job_infer_schema.h index 8124eb7f3430..2311b9279fbb 100644 --- a/yt/yql/providers/yt/job/yql_job_infer_schema.h +++ b/yt/yql/providers/yt/job/yql_job_infer_schema.h @@ -8,13 +8,19 @@ namespace NYql { -class TYqlInferSchemaJob : public TYqlJobBase { +class TYqlInferSchemaJob : public TYqlJobBase, public NYT::IRawJob { public: TYqlInferSchemaJob() = default; ~TYqlInferSchemaJob() = default; -protected: - void DoImpl(const TFile& inHandle, const TVector& outHandles) override; +public: + void Do(const NYT::TRawJobContext& jobContext) override; + + void Save(IOutputStream& stream) const override; + void Load(IInputStream& stream) override; + +private: + void DoImpl(const TFile& inHandle, const TVector& outHandles); }; } // NYql diff --git a/yt/yql/providers/yt/job/yql_job_user.cpp b/yt/yql/providers/yt/job/yql_job_user.cpp index edbcce665099..a29a53c6f939 100644 --- a/yt/yql/providers/yt/job/yql_job_user.cpp +++ b/yt/yql/providers/yt/job/yql_job_user.cpp @@ -1,256 +1,35 @@ #include "yql_job_user.h" -#include "yql_job_factory.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include #include -#include - -#include -#include -#include -#include -#include -#include -#include - namespace NYql { -using namespace NKikimr; -using namespace NKikimr::NMiniKQL; - -namespace { - - const static TStatKey Mkql_TotalRuntimeNodes("Mkql_TotalRuntimeNodes", false); - const static TStatKey Mkql_BuildGraphRssDelta("Mkql_BuildGraphRssDelta", false); - const static TStatKey Job_InitTime("Job_InitTime", false); - const static TStatKey Job_CalcTime("Job_CalcTime", false); - - NYT::TFormat MakeTableYaMRFormat(const TString& inputSpec) { - NYT::TNode inAttrs; - TStringStream err; - if (!NCommon::ParseYson(inAttrs, inputSpec, err)) { - ythrow yexception() << "Invalid input attrs: " << err.Str(); - } - YQL_ENSURE(inAttrs.IsMap(), "Expect Map type of output meta attrs, but got type " << inAttrs.GetType()); - YQL_ENSURE(inAttrs.HasKey(YqlIOSpecTables), "Expect " << TString{YqlIOSpecTables}.Quote() << " key"); - - auto& inputSpecs = inAttrs[YqlIOSpecTables].AsList(); - YQL_ENSURE(!inputSpecs.empty(), "Expect list with at least one element in input attrs: " << inputSpec); - - TVector> formats; - THashMap specRegistry; - for (auto& attrs: inputSpecs) { - NYT::TNode spec; - if (attrs.IsString()) { - auto refName = attrs.AsString(); - if (auto p = specRegistry.FindPtr(refName)) { - spec = *p; - } else { - YQL_ENSURE(inAttrs.HasKey(YqlIOSpecRegistry) && inAttrs[YqlIOSpecRegistry].HasKey(refName), "Bad input registry reference: " << refName); - NYT::TNode& r = specRegistry[refName]; - r = inAttrs[YqlIOSpecRegistry][refName]; - spec = r; - } - } else { - spec = attrs; - } - formats.push_back(spec.HasKey(FORMAT_ATTR_NAME) ? MakeMaybe(spec[FORMAT_ATTR_NAME]) : Nothing()); - } - - NYT::TNode format = NYT::GetCommonTableFormat(formats).GetOrElse(NYT::TNode("yamred_dsv")); - format.Attributes()["lenval"] = true; - format.Attributes()["has_subkey"] = true; - format.Attributes()["enable_table_index"] = true; - return NYT::TFormat(format); - } -} - - -std::pair TYqlUserJob::GetIOFormats(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) const { - TScopedAlloc alloc(__LOCATION__); - TMkqlIOSpecs specs; - if (UseBlockInput) { - specs.SetUseBlockInput(); - } - if (UseBlockOutput) { - specs.SetUseBlockOutput(); - } - - if (!UseSkiff) { - return std::make_pair(YamrInput ? MakeTableYaMRFormat(InputSpec) : specs.MakeInputFormat(AuxColumns), specs.MakeOutputFormat()); - } - - TTypeEnvironment env(alloc); - NCommon::TCodecContext codecCtx(env, *functionRegistry); - - TType* itemType = nullptr; - if (InputType) { - TStringStream err; - TProgramBuilder pgmBuilder(env, *functionRegistry); - itemType = NCommon::ParseTypeFromYson(TStringBuf{InputType}, pgmBuilder, err); - YQL_ENSURE(itemType, << err.Str()); - } - - specs.SetUseSkiff(OptLLVM, SkiffSysFields); - specs.Init(codecCtx, InputSpec, InputGroups, TableNames, itemType, AuxColumns, OutSpec); - - return std::make_pair(YamrInput ? MakeTableYaMRFormat(InputSpec) : specs.MakeInputFormat(AuxColumns), specs.MakeOutputFormat()); +void TYqlUserJob::Save(IOutputStream& stream) const { + TYqlUserJobBase::Save(stream); } -void TYqlUserJob::Save(IOutputStream& s) const { - TYqlJobBase::Save(s); - ::SaveMany(&s, - UseSkiff, - UseBlockInput, - UseBlockOutput, - SkiffSysFields, - YamrInput, - LambdaCode, - InputSpec, - OutSpec, - InputGroups, - AuxColumns, - InputType, - RowOffsets - ); +void TYqlUserJob::Load(IInputStream& stream) { + TYqlUserJobBase::Load(stream); } -void TYqlUserJob::Load(IInputStream& s) { - TYqlJobBase::Load(s); - ::LoadMany(&s, - UseSkiff, - UseBlockInput, - UseBlockOutput, - SkiffSysFields, - YamrInput, - LambdaCode, - InputSpec, - OutSpec, - InputGroups, - AuxColumns, - InputType, - RowOffsets - ); +TString TYqlUserJob::GetJobFactoryPrefix() const { + return "Yt"; } -void TYqlUserJob::DoImpl(const TFile& inHandle, const TVector& outHandles) { - TYqlJobBase::Init(); - - TLambdaBuilder builder(FunctionRegistry.Get(), *Alloc, - Env.Get(), RandomProvider.Get(), TimeProvider.Get(), JobStats.Get(), &JobCountersProvider, - SecureParamsProvider.Get(), LogProvider.Get(), LangVer); - - TType* itemType = nullptr; - if (InputType) { - TStringStream err; - TProgramBuilder pgmBuilder(*Env, *FunctionRegistry); - itemType = NCommon::ParseTypeFromYson(TStringBuf{InputType}, pgmBuilder, err); - YQL_ENSURE(itemType, << err.Str()); - } - - YQL_ENSURE(LambdaCode); - TRuntimeNode rootNode = DeserializeRuntimeNode(LambdaCode, *Env); - THashMap extraArgs; - rootNode = builder.TransformAndOptimizeProgram(rootNode, MakeTransformProvider(&extraArgs)); - - MkqlIOSpecs.Reset(new TMkqlIOSpecs()); - if (UseSkiff) { - MkqlIOSpecs->SetUseSkiff(OptLLVM, SkiffSysFields); - } - if (UseBlockInput) { - MkqlIOSpecs->SetUseBlockInput(); - MkqlIOSpecs->SetInputBlockRepresentation(TMkqlIOSpecs::EBlockRepresentation::WideBlock); - } - if (UseBlockOutput) { - MkqlIOSpecs->SetUseBlockOutput(); - } - MkqlIOSpecs->Init(*CodecCtx, InputSpec, InputGroups, TableNames, itemType, AuxColumns, OutSpec, JobStats.Get()); - if (!RowOffsets.empty()) { - MkqlIOSpecs->SetTableOffsets(RowOffsets); - } - - TIntrusivePtr mkqlWriter = MakeIntrusive(*MkqlIOSpecs, outHandles); - TIntrusivePtr reader; - - if (itemType) { - if (YamrInput) { - reader = MakeIntrusive(MakeIntrusive(inHandle)); - } - else { - reader = MakeIntrusive(inHandle); - } - } - - std::vector entryPoints(1, rootNode.GetNode()); - for (auto& item: extraArgs) { - entryPoints.push_back(item.second.GetNode()); +TIntrusivePtr TYqlUserJob::MakeMkqlJobReader() { + if (YamrInput) { + return MakeIntrusive(MakeIntrusive(InputFile_)); } - auto maxRss = TRusage::Get().MaxRss; - CompGraph = builder.BuildGraph( - GetJobFactory(*CodecCtx, OptLLVM, MkqlIOSpecs.Get(), reader.Get(), mkqlWriter.Get()), - UdfValidateMode, - NUdf::EValidatePolicy::Fail, OptLLVM, - EGraphPerProcess::Single, - Explorer, - rootNode, - std::move(entryPoints) - ); - - MKQL_SET_STAT(JobStats, Mkql_BuildGraphRssDelta, TRusage::Get().MaxRss - maxRss); - MKQL_SET_STAT(JobStats, Mkql_TotalRuntimeNodes, Explorer.GetNodes().size()); - MKQL_SET_STAT(JobStats, Job_InitTime, (ThreadCPUTime() - StartTime) / 1000); - - auto beginCalcTime = ThreadCPUTime(); - - if (CompGraph) { - for (size_t i: xrange(extraArgs.size())) { - if (auto entry = CompGraph->GetEntryPoint(i + 1, false)) { - entry->SetValue(CompGraph->GetContext(), NUdf::TUnboxedValue::Zero()); - } - } - - CodecCtx->HolderFactory = &CompGraph->GetHolderFactory(); - CompGraph->Prepare(); - BindTerminator.Reset(new TBindTerminator(CompGraph->GetTerminator())); - - if (auto mkqlReader = dynamic_cast(reader.Get())) { - mkqlReader->SetSpecs(*MkqlIOSpecs, CompGraph->GetHolderFactory()); - mkqlReader->Next(); // Prefetch first record to unify behavior with TYaMRTableReader - } - } - - NUdf::TUnboxedValue value = CompGraph->GetValue(); - if (rootNode.GetStaticType()->IsStream()) { - NUdf::TUnboxedValue item; - const auto status = value.Fetch(item); - YQL_ENSURE(status == NUdf::EFetchStatus::Finish); - } else { - YQL_ENSURE(value.IsFinish()); - } - - MKQL_SET_STAT(JobStats, Job_CalcTime, (ThreadCPUTime() - beginCalcTime) / 1000); + return MakeIntrusive(InputFile_); +} - if (auto mkqlReader = dynamic_cast(reader.Get())) { - mkqlReader->Finish(); - } - reader.Drop(); - mkqlWriter->Finish(); - mkqlWriter.Drop(); +TIntrusivePtr TYqlUserJob::MakeMkqlJobWriter() { + return MakeIntrusive(OutputFileList_); +} - MkqlIOSpecs->Clear(); - MkqlIOSpecs.Destroy(); +void TYqlUserJob::Do(const NYT::TRawJobContext& jobContext) { + InputFile_ = jobContext.GetInputFile(); + OutputFileList_ = jobContext.GetOutputFileList(); + TYqlUserJobBase::Do(); } -} // NYql +} // namespace NYql diff --git a/yt/yql/providers/yt/job/yql_job_user.h b/yt/yql/providers/yt/job/yql_job_user.h index 654cf1cbd11e..30265dff222b 100644 --- a/yt/yql/providers/yt/job/yql_job_user.h +++ b/yt/yql/providers/yt/job/yql_job_user.h @@ -1,117 +1,33 @@ #pragma once -#include "yql_job_base.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include -#include -#include - -#include - -namespace NKikimr { - namespace NMiniKQL { - class IFunctionRegistry; - } -} +#include "yql_job_user_base.h" namespace NYql { -class TYqlUserJob: public TYqlJobBase { +class TYqlUserJob: public TYqlUserJobBase, public NYT::IRawJob { public: TYqlUserJob() - : TYqlJobBase() + : TYqlUserJobBase() { } virtual ~TYqlUserJob() = default; - void SetUseSkiff(bool useSkiff, TMkqlIOSpecs::TSystemFields sysFields) { - UseSkiff = useSkiff; - SkiffSysFields = sysFields; - } - - void SetUseBlockInput(bool useBlockInput) { - UseBlockInput = useBlockInput; - } - - void SetUseBlockOutput(bool useBlockOutput) { - UseBlockOutput = useBlockOutput; - } - - void SetYamrInput(bool yamrInput) { - YamrInput = yamrInput; - } - - void SetLambdaCode(const TString& code) { - LambdaCode = code; - } - - void SetInputSpec(const TString& spec) { - InputSpec = spec; - } - - void SetInputGroups(const TVector& inputGroups) { - InputGroups = inputGroups; - } - - void SetOutSpec(const TString& spec) { - OutSpec = spec; - } - - void SetAuxColumns(const THashSet& auxColumns) { - AuxColumns = auxColumns; - } - - void SetInputType(const TString& type) { - InputType = type; - } - - void SetRowOffsets(const TVector& rowOffsets) { - RowOffsets = rowOffsets; - } - - std::pair GetIOFormats(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) const; +public: + void Do(const NYT::TRawJobContext& jobContext) override; - void Save(IOutputStream& s) const override; - void Load(IInputStream& s) override; + void Save(IOutputStream& stream) const override; + void Load(IInputStream& stream) override; protected: - void DoImpl(const TFile& inHandle, const TVector& outHandles) final; + TIntrusivePtr MakeMkqlJobReader() override; -protected: - // Serializable part (don't forget to add new members to Save/Load) - bool UseSkiff = false; - bool UseBlockInput = false; - bool UseBlockOutput = false; - TMkqlIOSpecs::TSystemFields SkiffSysFields; - bool YamrInput = false; - TString LambdaCode; - TString InputSpec; - TString OutSpec; - TVector InputGroups; - THashSet AuxColumns; - TString InputType; - TVector RowOffsets; - // End of serializable part + TIntrusivePtr MakeMkqlJobWriter() override; - NKikimr::NMiniKQL::TExploringNodeVisitor Explorer; - THolder CompGraph; - THolder BindTerminator; + TString GetJobFactoryPrefix() const override; - THolder MkqlIOSpecs; +private: + TFile InputFile_; + TVector OutputFileList_; }; -} // NYql +} // namespace NYql diff --git a/yt/yql/providers/yt/job/yql_job_user_base.cpp b/yt/yql/providers/yt/job/yql_job_user_base.cpp new file mode 100644 index 000000000000..f1ecdea2c549 --- /dev/null +++ b/yt/yql/providers/yt/job/yql_job_user_base.cpp @@ -0,0 +1,258 @@ +#include "yql_job_user_base.h" +#include "yql_job_factory.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + + +namespace NYql { + +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +namespace { + + const static TStatKey Mkql_TotalRuntimeNodes("Mkql_TotalRuntimeNodes", false); + const static TStatKey Mkql_BuildGraphRssDelta("Mkql_BuildGraphRssDelta", false); + const static TStatKey Job_InitTime("Job_InitTime", false); + const static TStatKey Job_CalcTime("Job_CalcTime", false); + + NYT::TFormat MakeTableYaMRFormat(const TString& inputSpec) { + NYT::TNode inAttrs; + TStringStream err; + if (!NCommon::ParseYson(inAttrs, inputSpec, err)) { + ythrow yexception() << "Invalid input attrs: " << err.Str(); + } + YQL_ENSURE(inAttrs.IsMap(), "Expect Map type of output meta attrs, but got type " << inAttrs.GetType()); + YQL_ENSURE(inAttrs.HasKey(YqlIOSpecTables), "Expect " << TString{YqlIOSpecTables}.Quote() << " key"); + + auto& inputSpecs = inAttrs[YqlIOSpecTables].AsList(); + YQL_ENSURE(!inputSpecs.empty(), "Expect list with at least one element in input attrs: " << inputSpec); + + TVector> formats; + THashMap specRegistry; + for (auto& attrs: inputSpecs) { + NYT::TNode spec; + if (attrs.IsString()) { + auto refName = attrs.AsString(); + if (auto p = specRegistry.FindPtr(refName)) { + spec = *p; + } else { + YQL_ENSURE(inAttrs.HasKey(YqlIOSpecRegistry) && inAttrs[YqlIOSpecRegistry].HasKey(refName), "Bad input registry reference: " << refName); + NYT::TNode& r = specRegistry[refName]; + r = inAttrs[YqlIOSpecRegistry][refName]; + spec = r; + } + } else { + spec = attrs; + } + formats.push_back(spec.HasKey(FORMAT_ATTR_NAME) ? MakeMaybe(spec[FORMAT_ATTR_NAME]) : Nothing()); + } + + NYT::TNode format = NYT::GetCommonTableFormat(formats).GetOrElse(NYT::TNode("yamred_dsv")); + format.Attributes()["lenval"] = true; + format.Attributes()["has_subkey"] = true; + format.Attributes()["enable_table_index"] = true; + return NYT::TFormat(format); + } +} + + +std::pair TYqlUserJobBase::GetIOFormats(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) const { + TScopedAlloc alloc(__LOCATION__); + TMkqlIOSpecs specs; + if (UseBlockInput) { + specs.SetUseBlockInput(); + } + if (UseBlockOutput) { + specs.SetUseBlockOutput(); + } + + if (!UseSkiff) { + return std::make_pair(YamrInput ? MakeTableYaMRFormat(InputSpec) : specs.MakeInputFormat(AuxColumns), specs.MakeOutputFormat()); + } + + TTypeEnvironment env(alloc); + NCommon::TCodecContext codecCtx(env, *functionRegistry); + + TType* itemType = nullptr; + if (InputType) { + TStringStream err; + TProgramBuilder pgmBuilder(env, *functionRegistry); + itemType = NCommon::ParseTypeFromYson(TStringBuf{InputType}, pgmBuilder, err); + YQL_ENSURE(itemType, << err.Str()); + } + + specs.SetUseSkiff(OptLLVM, SkiffSysFields); + specs.Init(codecCtx, InputSpec, InputGroups, TableNames, itemType, AuxColumns, OutSpec); + + return std::make_pair(YamrInput ? MakeTableYaMRFormat(InputSpec) : specs.MakeInputFormat(AuxColumns), specs.MakeOutputFormat()); +} + +void TYqlUserJobBase::Save(IOutputStream& s) const { + TYqlJobBase::Save(s); + ::SaveMany(&s, + UseSkiff, + UseBlockInput, + UseBlockOutput, + SkiffSysFields, + YamrInput, + LambdaCode, + InputSpec, + OutSpec, + InputGroups, + AuxColumns, + InputType, + RowOffsets + ); +} + +void TYqlUserJobBase::Load(IInputStream& s) { + TYqlJobBase::Load(s); + ::LoadMany(&s, + UseSkiff, + UseBlockInput, + UseBlockOutput, + SkiffSysFields, + YamrInput, + LambdaCode, + InputSpec, + OutSpec, + InputGroups, + AuxColumns, + InputType, + RowOffsets + ); +} + +void TYqlUserJobBase::DoImpl() { + TYqlJobBase::Init(); + + TLambdaBuilder builder(FunctionRegistry.Get(), *Alloc, + Env.Get(), RandomProvider.Get(), TimeProvider.Get(), JobStats.Get(), &JobCountersProvider, + SecureParamsProvider.Get(), LogProvider.Get(), LangVer); + + TType* itemType = nullptr; + if (InputType) { + TStringStream err; + TProgramBuilder pgmBuilder(*Env, *FunctionRegistry); + itemType = NCommon::ParseTypeFromYson(TStringBuf{InputType}, pgmBuilder, err); + YQL_ENSURE(itemType, << err.Str()); + } + + YQL_ENSURE(LambdaCode); + TRuntimeNode rootNode = DeserializeRuntimeNode(LambdaCode, *Env); + THashMap extraArgs; + rootNode = builder.TransformAndOptimizeProgram(rootNode, MakeTransformProvider(&extraArgs)); + + MkqlIOSpecs.Reset(new TMkqlIOSpecs()); + if (UseSkiff) { + MkqlIOSpecs->SetUseSkiff(OptLLVM, SkiffSysFields); + } + if (UseBlockInput) { + MkqlIOSpecs->SetUseBlockInput(); + MkqlIOSpecs->SetInputBlockRepresentation(TMkqlIOSpecs::EBlockRepresentation::WideBlock); + } + if (UseBlockOutput) { + MkqlIOSpecs->SetUseBlockOutput(); + } + MkqlIOSpecs->Init(*CodecCtx, InputSpec, InputGroups, TableNames, itemType, AuxColumns, OutSpec, JobStats.Get()); + if (!RowOffsets.empty()) { + MkqlIOSpecs->SetTableOffsets(RowOffsets); + } + + TIntrusivePtr mkqlWriter = MakeMkqlJobWriter(); + mkqlWriter->SetSpecs(*MkqlIOSpecs); + + TIntrusivePtr reader; + + if (itemType) { + reader = MakeMkqlJobReader(); + } + + std::vector entryPoints(1, rootNode.GetNode()); + for (auto& item: extraArgs) { + entryPoints.push_back(item.second.GetNode()); + } + auto maxRss = TRusage::Get().MaxRss; + CompGraph = builder.BuildGraph( + GetJobFactory(*CodecCtx, OptLLVM, MkqlIOSpecs.Get(), reader.Get(), mkqlWriter.Get(), GetJobFactoryPrefix()), + UdfValidateMode, + NUdf::EValidatePolicy::Fail, OptLLVM, + EGraphPerProcess::Single, + Explorer, + rootNode, + std::move(entryPoints) + ); + + MKQL_SET_STAT(JobStats, Mkql_BuildGraphRssDelta, TRusage::Get().MaxRss - maxRss); + MKQL_SET_STAT(JobStats, Mkql_TotalRuntimeNodes, Explorer.GetNodes().size()); + MKQL_SET_STAT(JobStats, Job_InitTime, (ThreadCPUTime() - StartTime) / 1000); + + auto beginCalcTime = ThreadCPUTime(); + + if (CompGraph) { + for (size_t i: xrange(extraArgs.size())) { + if (auto entry = CompGraph->GetEntryPoint(i + 1, false)) { + entry->SetValue(CompGraph->GetContext(), NUdf::TUnboxedValue::Zero()); + } + } + + CodecCtx->HolderFactory = &CompGraph->GetHolderFactory(); + CompGraph->Prepare(); + BindTerminator.Reset(new TBindTerminator(CompGraph->GetTerminator())); + + if (auto mkqlReader = dynamic_cast(reader.Get())) { + mkqlReader->SetSpecs(*MkqlIOSpecs, CompGraph->GetHolderFactory()); + mkqlReader->Next(); // Prefetch first record to unify behavior with TYaMRTableReader + } + } + + NUdf::TUnboxedValue value = CompGraph->GetValue(); + if (rootNode.GetStaticType()->IsStream()) { + NUdf::TUnboxedValue item; + const auto status = value.Fetch(item); + YQL_ENSURE(status == NUdf::EFetchStatus::Finish); + } else { + YQL_ENSURE(value.IsFinish()); + } + + MKQL_SET_STAT(JobStats, Job_CalcTime, (ThreadCPUTime() - beginCalcTime) / 1000); + + if (auto mkqlReader = dynamic_cast(reader.Get())) { + mkqlReader->Finish(); + } + reader.Drop(); + mkqlWriter->Finish(); + mkqlWriter.Drop(); + + MkqlIOSpecs->Clear(); + MkqlIOSpecs.Destroy(); +} + +void TYqlUserJobBase::Do() { + DoImpl(); + Finish(); +} + +} // NYql diff --git a/yt/yql/providers/yt/job/yql_job_user_base.h b/yt/yql/providers/yt/job/yql_job_user_base.h new file mode 100644 index 000000000000..b66fad6ad77d --- /dev/null +++ b/yt/yql/providers/yt/job/yql_job_user_base.h @@ -0,0 +1,125 @@ +#pragma once + +#include "yql_job_base.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +#include + +namespace NKikimr { + namespace NMiniKQL { + class IFunctionRegistry; + } +} + +namespace NYql { + +class TYqlUserJobBase: public TYqlJobBase { +public: + TYqlUserJobBase() + : TYqlJobBase() + { + } + virtual ~TYqlUserJobBase() = default; + + void SetUseSkiff(bool useSkiff, TMkqlIOSpecs::TSystemFields sysFields) { + UseSkiff = useSkiff; + SkiffSysFields = sysFields; + } + + void SetUseBlockInput(bool useBlockInput) { + UseBlockInput = useBlockInput; + } + + void SetUseBlockOutput(bool useBlockOutput) { + UseBlockOutput = useBlockOutput; + } + + void SetYamrInput(bool yamrInput) { + YamrInput = yamrInput; + } + + void SetLambdaCode(const TString& code) { + LambdaCode = code; + } + + void SetInputSpec(const TString& spec) { + InputSpec = spec; + } + + void SetInputGroups(const TVector& inputGroups) { + InputGroups = inputGroups; + } + + void SetOutSpec(const TString& spec) { + OutSpec = spec; + } + + void SetAuxColumns(const THashSet& auxColumns) { + AuxColumns = auxColumns; + } + + void SetInputType(const TString& type) { + InputType = type; + } + + void SetRowOffsets(const TVector& rowOffsets) { + RowOffsets = rowOffsets; + } + + std::pair GetIOFormats(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) const; + + void Save(IOutputStream& s) const override; + void Load(IInputStream& s) override; + +protected: + virtual TIntrusivePtr MakeMkqlJobWriter() = 0; + + virtual TIntrusivePtr MakeMkqlJobReader() = 0; + + virtual TString GetJobFactoryPrefix() const = 0; + + void Do(); + +protected: + // Serializable part (don't forget to add new members to Save/Load) + bool UseSkiff = false; + bool UseBlockInput = false; + bool UseBlockOutput = false; + TMkqlIOSpecs::TSystemFields SkiffSysFields; + bool YamrInput = false; + TString LambdaCode; + TString InputSpec; + TString OutSpec; + TVector InputGroups; + THashSet AuxColumns; + TString InputType; + TVector RowOffsets; + // End of serializable part + + NKikimr::NMiniKQL::TExploringNodeVisitor Explorer; + THolder CompGraph; + THolder BindTerminator; + + THolder MkqlIOSpecs; +private: + void DoImpl(); +}; + +} // NYql diff --git a/yt/yt/client/unittests/replication_card_ut.cpp b/yt/yt/client/unittests/replication_card_ut.cpp index deb50b042f58..63cef390b5ed 100644 --- a/yt/yt/client/unittests/replication_card_ut.cpp +++ b/yt/yt/client/unittests/replication_card_ut.cpp @@ -94,8 +94,7 @@ INSTANTIATE_TEST_SUITE_P( .IncludeHistory = true, .IncludeReplicatedTableOptions = false, }, - false) -)); + false))); class TReplicationCardFetchOptionsOrTest @@ -181,8 +180,7 @@ INSTANTIATE_TEST_SUITE_P( .IncludeProgress = true, .IncludeHistory = true, .IncludeReplicatedTableOptions = true, - } -))); + }))); //////////////////////////////////////////////////////////////////////////////// @@ -279,8 +277,7 @@ INSTANTIATE_TEST_SUITE_P( ETableReplicaMode::Async, ETableReplicaState::Enabled, std::vector(), - false) -)); + false))); //////////////////////////////////////////////////////////////////////////////// @@ -411,8 +408,7 @@ INSTANTIATE_TEST_SUITE_P( TReplicationCardComputeReplicasLagTest, ::testing::Values( TReplicationCardComputeReplicasLagTest::CreateTestDataNormal1(), - TReplicationCardComputeReplicasLagTest::CreateTestDataLaggingSyncReplica() -)); + TReplicationCardComputeReplicasLagTest::CreateTestDataLaggingSyncReplica())); ////////////////////////////////////////////////////////////////////////////////