Skip to content

Commit 49b05ea

Browse files
Refactor TYqlUserJob, implement TFmrUserJob class for map operation
Refactor TYqlUserJob, implement TFmrUserJob class for map operation commit_hash:a916b2b857ce98b787484153ac5a00db0e2daab2
1 parent 315f35f commit 49b05ea

File tree

62 files changed

+2032
-415
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+2032
-415
lines changed

yt/yql/providers/yt/codec/yt_codec_job.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,10 @@ TJobMkqlReaderImpl::TJobMkqlReaderImpl(const TFile& in)
4646
{
4747
}
4848

49-
TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector<TFile>& outHandles)
49+
TJobMkqlWriterImpl::TJobMkqlWriterImpl(const TVector<TFile>& outHandles)
5050
: NPrivate::TOutStreamsHolder(outHandles)
5151
, TMkqlWriterImpl(GetVectorOfStreams(), YQL_JOB_CODEC_BLOCK_COUNT, YQL_JOB_CODEC_BLOCK_SIZE)
5252
{
53-
SetSpecs(specs);
5453
}
5554

5655
void TJobMkqlWriterImpl::DoFinish(bool abort) {

yt/yql/providers/yt/codec/yt_codec_job.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class TJobMkqlReaderImpl: protected NPrivate::TInStreamHolder, public TMkqlReade
4242

4343
class TJobMkqlWriterImpl: protected NPrivate::TOutStreamsHolder, public TMkqlWriterImpl {
4444
public:
45-
TJobMkqlWriterImpl(const TMkqlIOSpecs& specs, const TVector<TFile>& outHandles);
45+
TJobMkqlWriterImpl(const TVector<TFile>& outHandles);
4646
~TJobMkqlWriterImpl() = default;
4747

4848
private:

yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -619,7 +619,7 @@ class TFmrCoordinator: public IFmrCoordinator {
619619
});
620620

621621
mapTaskParams.Output = fmrTableOutputRefs;
622-
mapTaskParams.Executable = mapOperationParams.Executable; // TODO - change Executable to mapper
622+
mapTaskParams.SerializedMapJobState = mapOperationParams.SerializedMapJobState;
623623
TaskParams.emplace_back(mapTaskParams);
624624
}
625625
}

yt/yql/providers/yt/fmr/job/impl/ut/ya.make

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,19 @@ UNITTEST()
22

33
SRCS(
44
yql_yt_job_ut.cpp
5+
yql_yt_map_job_ut.cpp
6+
yql_yt_raw_table_queue_ut.cpp
57
yql_yt_table_data_service_reader_ut.cpp
68
yql_yt_table_data_service_writer_ut.cpp
79
)
810

911
PEERDIR(
1012
yt/cpp/mapreduce/common
1113
yt/yql/providers/yt/fmr/job/impl
14+
yt/yql/providers/yt/fmr/process
15+
yt/yql/providers/yt/fmr/table_data_service/helpers
1216
yt/yql/providers/yt/fmr/yt_job_service/mock
13-
yt/yql/providers/yt/fmr/table_data_service/local/impl
17+
yt/yql/providers/yt/fmr/utils
1418
yql/essentials/utils/log
1519
yql/essentials/parser/pg_wrapper
1620
yql/essentials/parser/pg_wrapper/interface
@@ -23,6 +27,8 @@ PEERDIR(
2327
yt/yql/providers/yt/codec/codegen/llvm16
2428
yql/essentials/minikql/codegen/llvm16
2529
yql/essentials/minikql/computation/llvm16
30+
yql/essentials/minikql/comp_nodes/llvm16
31+
yt/yql/providers/yt/comp_nodes/llvm16
2632
)
2733

2834
YQL_LAST_ABI_VERSION()

yt/yql/providers/yt/fmr/job/impl/ut/yql_yt_job_ut.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
#include <library/cpp/testing/unittest/registar.h>
2-
#include <yt/cpp/mapreduce/common/helpers.h>
1+
#include "yql_yt_job_ut.h"
32
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
43
#include <yt/yql/providers/yt/fmr/table_data_service/local/impl/yql_yt_table_data_service_local.h>
54
#include <yt/yql/providers/yt/fmr/utils/yql_yt_table_data_service_key.h>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
#include <yt/cpp/mapreduce/common/helpers.h>
5+
6+
namespace NYql::NFmr {
7+
// helper functions for yson reformatting
8+
9+
TString GetBinaryYson(const TString& textYsonContent);
10+
11+
TString GetTextYson(const TString& binaryYsonContent);
12+
13+
} // namespace NYql::NFmr
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#include "yql_yt_job_ut.h"
2+
#include <util/string/split.h>
3+
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
4+
#include <yql/essentials/minikql/mkql_node_printer.h>
5+
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.h>
6+
#include <yt/yql/providers/yt/fmr/process/yql_yt_job_fmr.h>
7+
#include <yt/yql/providers/yt/fmr/table_data_service/helpers/yql_yt_table_data_service_helpers.h>
8+
9+
using namespace NKikimr::NMiniKQL;
10+
11+
namespace NYql::NFmr {
12+
13+
TString inputYsonContent = "{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};\n"
14+
"{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};\n";
15+
16+
Y_UNIT_TEST_SUITE(MapTests) {
17+
Y_UNIT_TEST(RunMapJob) {
18+
TFmrUserJob mapJob;
19+
20+
TTempFileHandle tableDataServiceHostsFile;
21+
ui16 port = 2345;
22+
auto tableDataServiceServer = MakeTableDataServiceServer(port);
23+
std::vector<TTableDataServiceServerConnection> connections{{.Host = "localhost", .Port = port}};
24+
WriteHostsToFile(tableDataServiceHostsFile, 1, connections);
25+
26+
TTempFileHandle inputYsonContentFile{};
27+
TFileOutput fileWriter(inputYsonContentFile.Name());
28+
fileWriter.Write(inputYsonContent.data(), inputYsonContent.size());
29+
fileWriter.Flush();
30+
31+
TYtTableTaskRef fileTask{.FilePaths = {inputYsonContentFile.Name()}};
32+
TFmrTableOutputRef fmrOutputRef{.TableId = "table_id", .PartId = "part_id"};
33+
TTaskTableRef taskTableRef(fileTask);
34+
TMapTaskParams mapTaskParams{
35+
.Input = TTaskTableInputRef{.Inputs ={taskTableRef}},
36+
.Output = {fmrOutputRef}
37+
};
38+
FillMapFmrJob(mapJob, mapTaskParams, {}, tableDataServiceHostsFile.Name(), true);
39+
40+
{
41+
auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
42+
TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(),functionRegistry->SupportsSizedAllocators());
43+
alloc.Ref().ForcefullySetMemoryYellowZone(false);
44+
45+
TTypeEnvironment env(alloc);
46+
TProgramBuilder pgmBuilder(env, *functionRegistry);
47+
48+
const auto structType = pgmBuilder.NewStructType({
49+
{"key", pgmBuilder.NewDataType(NUdf::EDataSlot::String)},
50+
{"subkey", pgmBuilder.NewDataType(NUdf::EDataSlot::String)},
51+
{"value", pgmBuilder.NewDataType(NUdf::EDataSlot::String)}
52+
});
53+
54+
auto dataType = pgmBuilder.NewFlowType(structType);
55+
TCallableBuilder inputCallableBuilder(env, "FmrInputJob", dataType);
56+
auto inputNode = TRuntimeNode(inputCallableBuilder.Build(), false);
57+
58+
const auto prefix = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>("prefix_");
59+
const auto map = pgmBuilder.Map(inputNode,
60+
[&](TRuntimeNode item) {
61+
TRuntimeNode prefixKey = pgmBuilder.Concat(prefix, pgmBuilder.Member(item, "key"));
62+
TRuntimeNode subkey = pgmBuilder.Member(item, "subkey");
63+
TRuntimeNode value = pgmBuilder.Member(item, "value");
64+
return pgmBuilder.NewStruct(structType, {{"key", prefixKey}, {"subkey", subkey}, {"value", value}});
65+
});
66+
67+
TCallableBuilder outputCallableBuilder(env, "FmrOutputJob", dataType);
68+
outputCallableBuilder.Add(map);
69+
auto outputNode = TRuntimeNode(outputCallableBuilder.Build(), false);
70+
auto pgmReturn = pgmBuilder.Discard(outputNode);
71+
72+
TString serializedMapLambda = SerializeRuntimeNode(pgmReturn, env);
73+
mapJob.SetLambdaCode(serializedMapLambda);
74+
75+
const TString spec = R"({
76+
tables = [{
77+
"_yql_row_spec" = {
78+
"Type" = [
79+
"StructType"; [
80+
["key"; ["DataType"; "String"]];
81+
["subkey"; ["DataType"; "String"]];
82+
["value"; ["DataType"; "String"]];
83+
]
84+
]
85+
}
86+
}]
87+
})";
88+
const TString type = R"(
89+
[
90+
"StructType";
91+
[
92+
["key"; ["DataType"; "String"]];
93+
["subkey"; ["DataType"; "String"]];
94+
["value"; ["DataType"; "String"]];
95+
]
96+
])";
97+
mapJob.SetInputSpec(spec);
98+
mapJob.SetOutSpec(spec);
99+
mapJob.SetInputType(type);
100+
}
101+
102+
mapJob.DoFmrJob();
103+
104+
// Checking correctness
105+
auto tableDataServiceClient = MakeTableDataServiceClient(port);
106+
TString key = "table_id_part_id:0";
107+
auto gottenBinaryTableContent = tableDataServiceClient->Get(key).GetValueSync();
108+
UNIT_ASSERT(gottenBinaryTableContent);
109+
110+
// Reformating data
111+
auto textTableContent = GetTextYson(*gottenBinaryTableContent);
112+
THashMap<TString, std::vector<TString>> expectedFormattedContent{
113+
{"key", {"prefix_075", "prefix_800"}},
114+
{"subkey", {"1", "2"}},
115+
{"value", {"abc", "ddd"}}
116+
};
117+
THashMap<TString, std::vector<TString>> gottenFormattedContent;
118+
119+
std::vector<TString> splittedTableContent;
120+
StringSplitter(textTableContent).SplitBySet("\n\";{}=").SkipEmpty().Collect(&splittedTableContent);
121+
122+
for (ui64 i = 0; i < splittedTableContent.size(); i += 2) {
123+
TString columnKey = splittedTableContent[i], columnValue = splittedTableContent[i + 1];
124+
gottenFormattedContent[columnKey].emplace_back(columnValue);
125+
}
126+
UNIT_ASSERT_VALUES_EQUAL(expectedFormattedContent, gottenFormattedContent);
127+
}
128+
}
129+
130+
} // namespace NYql::NFmr
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#include <library/cpp/testing/unittest/registar.h>
2+
#include <util/string/split.h>
3+
#include <util/thread/pool.h>
4+
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_reader.h>
5+
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_raw_table_queue_writer.h>
6+
7+
namespace NYql::NFmr {
8+
9+
std::vector<TString> TableContentRows = {
10+
"{\"key\"=\"075\";\"subkey\"=\"1\";\"value\"=\"abc\"};",
11+
"{\"key\"=\"800\";\"subkey\"=\"2\";\"value\"=\"ddd\"};",
12+
"{\"key\"=\"020\";\"subkey\"=\"3\";\"value\"=\"q\"};",
13+
"{\"key\"=\"150\";\"subkey\"=\"4\";\"value\"=\"qzz\"};"
14+
};
15+
16+
17+
Y_UNIT_TEST_SUITE(FmrRawTableQueueTests) {
18+
Y_UNIT_TEST(ReadWriteManyThreads) {
19+
ui64 inputStreamsNum = 5, repeatsNum = 10;
20+
TFmrRawTableQueueSettings queueSettings{.MaxInflightBytes = 128};
21+
auto queue = MakeIntrusive<TFmrRawTableQueue>(inputStreamsNum, queueSettings);
22+
char rowEnd = '|'; // Adding end symbol to row for splitting simplicity
23+
24+
auto threadPool = CreateThreadPool(3);
25+
for (ui64 i = 0; i < inputStreamsNum; ++i) {
26+
// each thread writes repeatsNum copies of TableContent to queue
27+
threadPool->SafeAddFunc([queue, repeatsNum, i, rowEnd] {
28+
TFmrRawTableQueueWriterSettings writerSettings{.ChunkSize = 100};
29+
TFmrRawTableQueueWriter queueWriter(queue, writerSettings);
30+
for (ui64 j = 0; j < repeatsNum; ++j) {
31+
for (auto& row: TableContentRows) {
32+
queueWriter.Write(row + rowEnd);
33+
queueWriter.NotifyRowEnd();
34+
}
35+
}
36+
queueWriter.Flush();
37+
queue->NotifyInputFinished(i);
38+
});
39+
}
40+
TFmrRawTableQueueReader reader{queue};
41+
TString result = reader.ReadAll();
42+
std::vector<TString> splittedRows;
43+
StringSplitter(result).Split(rowEnd).AddTo(&splittedRows);
44+
std::unordered_map<TString, ui64> gottenRows;
45+
for (auto& row: splittedRows) {
46+
if (!row.empty()) {
47+
++gottenRows[row];
48+
}
49+
}
50+
UNIT_ASSERT_VALUES_EQUAL(gottenRows.size(), TableContentRows.size());
51+
for (auto& row: TableContentRows) {
52+
UNIT_ASSERT_VALUES_EQUAL(gottenRows[row], inputStreamsNum * repeatsNum);
53+
}
54+
}
55+
}
56+
57+
} // namespace NYql::NFmr

yt/yql/providers/yt/fmr/job/impl/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ LIBRARY()
22

33
SRCS(
44
yql_yt_job_impl.cpp
5+
yql_yt_raw_table_queue.cpp
6+
yql_yt_raw_table_queue_reader.cpp
7+
yql_yt_raw_table_queue_writer.cpp
58
yql_yt_table_data_service_reader.cpp
69
yql_yt_table_data_service_writer.cpp
710
)
@@ -12,6 +15,7 @@ PEERDIR(
1215
yt/cpp/mapreduce/interface
1316
yt/yql/providers/yt/fmr/job/interface
1417
yt/yql/providers/yt/fmr/utils
18+
yt/yql/providers/yt/fmr/process
1519
yt/yql/providers/yt/fmr/table_data_service/interface
1620
yql/essentials/utils
1721
yql/essentials/utils/log

yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <yt/yql/providers/yt/fmr/job/impl/yql_yt_table_data_service_writer.h>
1010
#include <yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h>
1111
#include <yt/yql/providers/yt/fmr/utils/yql_yt_parse_records.h>
12+
#include <yt/yql/providers/yt/fmr/utils/yql_yt_table_input_streams.h>
1213
#include <yt/yql/providers/yt/fmr/yt_job_service/interface/yql_yt_job_service.h>
1314

1415
#include <yql/essentials/utils/log/log.h>
@@ -36,7 +37,7 @@ class TFmrJob: public IFmrJob {
3637

3738
YQL_ENSURE(clusterConnections.size() == 1);
3839

39-
std::vector<NYT::TRawTableReaderPtr> ytTableReaders = GetYtTableReaders(ytTableTaskRef, clusterConnections);
40+
std::vector<NYT::TRawTableReaderPtr> ytTableReaders = GetYtTableReaders(YtJobService_, ytTableTaskRef, clusterConnections);
4041
auto tableDataServiceWriter = MakeIntrusive<TFmrTableDataServiceWriter>(tableId, partId, TableDataService_, Settings_.FmrWriterSettings);
4142

4243
for (auto& ytTableReader: ytTableReaders) {
@@ -95,7 +96,7 @@ class TFmrJob: public IFmrJob {
9596
for (const auto& inputTableRef : taskTableInputRef.Inputs) {
9697
threadPool->SafeAddFunc([&, tableDataServiceWriter] {
9798
try {
98-
auto inputTableReaders = GetTableInputStreams(inputTableRef, clusterConnections);
99+
auto inputTableReaders = GetTableInputStreams(YtJobService_, TableDataService_, inputTableRef, clusterConnections);
99100
for (auto& tableReader: inputTableReaders) {
100101
ParseRecords(tableReader, tableDataServiceWriter, parseRecordSettings.MergeReadBlockCount, parseRecordSettings.MergeReadBlockSize, cancelFlag, mutex);
101102
}
@@ -124,42 +125,6 @@ class TFmrJob: public IFmrJob {
124125
ythrow yexception() << "Not implemented";
125126
}
126127

127-
private:
128-
std::vector<NYT::TRawTableReaderPtr> GetTableInputStreams(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const {
129-
auto ytTableTaskRef = std::get_if<TYtTableTaskRef>(&tableRef);
130-
auto fmrTable = std::get_if<TFmrTableInputRef>(&tableRef);
131-
if (ytTableTaskRef) {
132-
return GetYtTableReaders(*ytTableTaskRef, clusterConnections);
133-
} else if (fmrTable) {
134-
return {MakeIntrusive<TFmrTableDataServiceReader>(fmrTable->TableId, fmrTable->TableRanges, TableDataService_, Settings_.FmrReaderSettings)};
135-
} else {
136-
ythrow yexception() << "Unsupported table type";
137-
}
138-
}
139-
140-
std::vector<NYT::TRawTableReaderPtr> GetYtTableReaders(const TYtTableTaskRef& ytTableTaskRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const {
141-
std::vector<NYT::TRawTableReaderPtr> ytTableReaders;
142-
if (!ytTableTaskRef.FilePaths.empty()) {
143-
// underlying gateway is file, so create readers from filepaths.
144-
for (auto& filePath: ytTableTaskRef.FilePaths) {
145-
ytTableReaders.emplace_back(YtJobService_->MakeReader(filePath));
146-
YQL_CLOG(DEBUG, FastMapReduce) << "Creating reader for file path " << filePath;
147-
}
148-
} else {
149-
for (auto& richPath: ytTableTaskRef.RichPaths) {
150-
YQL_ENSURE(richPath.Cluster_);
151-
152-
// TODO - вместо этого написать нормальные хелперы из RichPath в структуры и назад
153-
TStringBuf choppedPath;
154-
YQL_ENSURE(TStringBuf(richPath.Path_).AfterPrefix("//", choppedPath));
155-
auto fmrTableId = TFmrTableId(*richPath.Cluster_, TString(choppedPath));
156-
auto clusterConnection = clusterConnections.at(fmrTableId);
157-
ytTableReaders.emplace_back(YtJobService_->MakeReader(richPath, clusterConnection, Settings_.YtReaderSettings));
158-
}
159-
}
160-
return ytTableReaders;
161-
}
162-
163128
private:
164129
ITableDataService::TPtr TableDataService_;
165130
IYtJobService::TPtr YtJobService_;
@@ -209,6 +174,20 @@ TJobResult RunJob(
209174
return {ETaskStatus::Completed, *statistics};
210175
};
211176

177+
void FillMapFmrJob(
178+
TFmrUserJob& mapJob,
179+
const TMapTaskParams& mapTaskParams,
180+
const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections,
181+
const TString& tableDataServiceDiscoveryFilePath,
182+
bool useFileGateway
183+
) {
184+
mapJob.SetTableDataService(tableDataServiceDiscoveryFilePath);
185+
mapJob.SetTaskInputTables(mapTaskParams.Input);
186+
mapJob.SetTaskFmrOutputTables(mapTaskParams.Output);
187+
mapJob.SetClusterConnections(clusterConnections);
188+
mapJob.SetYtJobService(useFileGateway);
189+
}
190+
212191
TFmrJobSettings GetJobSettingsFromTask(TTask::TPtr task) {
213192
if (!task->JobSettings) {
214193
return TFmrJobSettings();

0 commit comments

Comments
 (0)