Skip to content

Commit f592ff5

Browse files
committed
Intermediate changes
commit_hash:9accadbf8068e1d496f6fc9ea6b878476f55f89b
1 parent 1bfd53d commit f592ff5

File tree

5 files changed

+187
-75
lines changed

5 files changed

+187
-75
lines changed

yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.cpp

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,37 @@ class TFmrCoordinator: public IFmrCoordinator {
290290
};
291291
}
292292

293+
std::vector<TTaskTableRef> TaskInputTablesFromOperationInputTables(const std::vector<TOperationTableRef>& operationTables) {
294+
std::vector<TTaskTableRef> taskInputTables;
295+
for (auto& elem: operationTables) {
296+
if (const TYtTableRef* ytTableRef = std::get_if<TYtTableRef>(&elem)) {
297+
taskInputTables.emplace_back(*ytTableRef);
298+
} else {
299+
TFmrTableRef fmrTableRef = std::get<TFmrTableRef>(elem);
300+
TString inputTableId = fmrTableRef.FmrTableId.Id;
301+
TFmrTableInputRef tableInput{
302+
.TableId = inputTableId,
303+
.TableRanges = {GetTableRangeFromId(inputTableId)}
304+
};
305+
taskInputTables.emplace_back(tableInput);
306+
}
307+
}
308+
return taskInputTables;
309+
}
310+
311+
std::vector<TFmrTableOutputRef> TaskOutputTablesFromOperationOutputTables(const std::vector<TFmrTableRef>& operationTables) {
312+
std::vector<TFmrTableOutputRef> taskOutputTables;
313+
for (auto& fmrTableRef: operationTables) {
314+
TString outputTableId = fmrTableRef.FmrTableId.Id;
315+
TFmrTableOutputRef tableOutput{
316+
.TableId = outputTableId,
317+
.PartId = GetTableRangeFromId(outputTableId).PartId
318+
};
319+
taskOutputTables.emplace_back(tableOutput);
320+
}
321+
return taskOutputTables;
322+
}
323+
293324
TTaskParams MakeDefaultTaskParamsFromOperation(const TOperationParams& operationParams) {
294325
if (const TUploadOperationParams* uploadOperationParams = std::get_if<TUploadOperationParams>(&operationParams)) {
295326
TUploadTaskParams uploadTaskParams{};
@@ -311,27 +342,20 @@ class TFmrCoordinator: public IFmrCoordinator {
311342
};
312343
downloadTaskParams.Output = fmrTableOutput;
313344
return downloadTaskParams;
314-
} else {
315-
TMergeOperationParams mergeOperationParams = std::get<TMergeOperationParams>(operationParams);
345+
} else if (const TMergeOperationParams* mergeOperationParams = std::get_if<TMergeOperationParams>(&operationParams)) {
316346
TMergeTaskParams mergeTaskParams;
317-
std::vector<TTaskTableRef> mergeInputTasks;
318-
for (auto& elem: mergeOperationParams.Input) {
319-
if (const TYtTableRef* ytTableRef = std::get_if<TYtTableRef>(&elem)) {
320-
mergeInputTasks.emplace_back(*ytTableRef);
321-
} else {
322-
TFmrTableRef fmrTableRef = std::get<TFmrTableRef>(elem);
323-
TString inputTableId = fmrTableRef.FmrTableId.Id;
324-
TFmrTableInputRef tableInput{
325-
.TableId = inputTableId,
326-
.TableRanges = {GetTableRangeFromId(inputTableId)}
327-
};
328-
mergeInputTasks.emplace_back(tableInput);
329-
}
330-
}
331-
mergeTaskParams.Input = mergeInputTasks;
347+
mergeTaskParams.Input = TaskInputTablesFromOperationInputTables(mergeOperationParams->Input);
332348
TFmrTableOutputRef outputTable;
333-
mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams.Output.FmrTableId.Id};
349+
mergeTaskParams.Output = TFmrTableOutputRef{.TableId = mergeOperationParams->Output.FmrTableId.Id};
334350
return mergeTaskParams;
351+
} else if (const TMapOperationParams* mapOperationParams = std::get_if<TMapOperationParams>(&operationParams)) {
352+
TMapTaskParams mapTaskParams;
353+
mapTaskParams.Input = TaskInputTablesFromOperationInputTables(mapOperationParams->Input);
354+
mapTaskParams.Output = TaskOutputTablesFromOperationOutputTables(mapOperationParams->Output);
355+
mapTaskParams.Executable = mapOperationParams->Executable;
356+
return mapTaskParams;
357+
} else {
358+
ythrow yexception() << "Unknown operation params";
335359
}
336360
}
337361

yt/yql/providers/yt/fmr/job/impl/yql_yt_job_impl.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ class TFmrJob: public IFmrJob {
107107
}
108108
}
109109

110+
virtual std::variant<TError, TStatistics> Map(
111+
const TMapTaskParams& /* params */,
112+
const std::unordered_map<TFmrTableId, TClusterConnection>& /* clusterConnections */,
113+
std::shared_ptr<std::atomic<bool>> /* cancelFlag */
114+
) override {
115+
Cerr << "MAP NOT IMPLEMENTED" << Endl;
116+
YQL_CLOG(ERROR, FastMapReduce) << "MAP NOT IMPLEMENTED";
117+
ythrow yexception() << "Not implemented";
118+
}
119+
110120
private:
111121
NYT::TRawTableReaderPtr GetTableInputStream(const TTaskTableRef& tableRef, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections) const {
112122
auto ytTable = std::get_if<TYtTableRef>(&tableRef);
@@ -154,8 +164,10 @@ TJobResult RunJob(
154164
return job->Download(taskParams, task->ClusterConnections, cancelFlag);
155165
} else if constexpr (std::is_same_v<T, TMergeTaskParams>) {
156166
return job->Merge(taskParams, task->ClusterConnections, cancelFlag);
167+
} else if constexpr (std::is_same_v<T, TMapTaskParams>) {
168+
return job->Map(taskParams, task->ClusterConnections, cancelFlag);;
157169
} else {
158-
throw std::runtime_error{"Unsupported task type"};
170+
ythrow yexception() << "Unsupported task type";
159171
}
160172
};
161173

yt/yql/providers/yt/fmr/job/interface/yql_yt_job.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ class IFmrJob: public TThrRefBase {
1515
virtual std::variant<TError, TStatistics> Upload(const TUploadTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
1616

1717
virtual std::variant<TError, TStatistics> Merge(const TMergeTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
18+
19+
virtual std::variant<TError, TStatistics> Map(const TMapTaskParams& params, const std::unordered_map<TFmrTableId, TClusterConnection>& clusterConnections = {}, std::shared_ptr<std::atomic<bool>> cancelFlag = nullptr) = 0;
1820
};
1921

2022
} // namespace NYql

yt/yql/providers/yt/fmr/request_options/yql_yt_request_options.h

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ enum class ETaskType {
3131
Unknown,
3232
Download,
3333
Upload,
34-
Merge
34+
Merge,
35+
Map
3536
};
3637

3738
enum class EFmrComponent {
@@ -183,9 +184,21 @@ struct TMergeTaskParams {
183184
TFmrTableOutputRef Output;
184185
};
185186

186-
using TOperationParams = std::variant<TUploadOperationParams, TDownloadOperationParams, TMergeOperationParams>;
187+
struct TMapOperationParams {
188+
std::vector<TOperationTableRef> Input;
189+
std::vector<TFmrTableRef> Output;
190+
TString Executable;
191+
};
192+
193+
struct TMapTaskParams {
194+
std::vector<TTaskTableRef> Input;
195+
std::vector<TFmrTableOutputRef> Output;
196+
TString Executable;
197+
};
198+
199+
using TOperationParams = std::variant<TUploadOperationParams, TDownloadOperationParams, TMergeOperationParams, TMapOperationParams>;
187200

188-
using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams>;
201+
using TTaskParams = std::variant<TUploadTaskParams, TDownloadTaskParams, TMergeTaskParams, TMapTaskParams>;
189202

190203
struct TClusterConnection {
191204
TString TransactionId;

0 commit comments

Comments
 (0)