Skip to content

Commit 183b869

Browse files
author
hiddenpath
committed
YT-23616: Make BatchTransform implementation being common
commit_hash:4191c9aa7cde449475eddf88d8c04e1ebf0b8ad9
1 parent abccd55 commit 183b869

File tree

5 files changed

+16
-13
lines changed

5 files changed

+16
-13
lines changed

yt/cpp/mapreduce/client/operation.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,10 @@ TStructuredJobTableList ApplyProtobufColumnFilters(
225225
}
226226

227227
auto isDynamic = NRawClient::BatchTransform(
228-
CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
229-
preparer.GetContext(),
228+
preparer.GetClient()->GetRawClient(),
230229
tableList,
231-
[&] (IRawBatchRequest& batch, const auto& table) {
232-
return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
230+
[&] (IRawBatchRequestPtr batch, const auto& table) {
231+
return batch->Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
233232
});
234233

235234
auto newTableList = tableList;
@@ -642,7 +641,7 @@ TNode BuildAutoMergeSpec(const TAutoMergeSpec& options)
642641
return result;
643642
}
644643

645-
TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec)
644+
[[maybe_unused]] TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec)
646645
{
647646
TNode result;
648647
if (profilerSpec.ProfilingBinary_) {

yt/cpp/mapreduce/client/skiff.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
279279
}
280280

281281
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
282+
const IRawClientPtr& rawClient,
282283
const TClientContext& context,
283284
const IClientRetryPolicyPtr& clientRetryPolicy,
284285
const TTransactionId& transactionId,
@@ -304,18 +305,17 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
304305
}
305306

306307
auto nodes = NRawClient::BatchTransform(
307-
clientRetryPolicy->CreatePolicyForGenericRequest(),
308-
context,
308+
rawClient,
309309
NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
310-
[&] (IRawBatchRequest& batch, const TRichYPath& path) {
310+
[&] (IRawBatchRequestPtr batch, const TRichYPath& path) {
311311
auto getOptions = TGetOptions()
312312
.AttributeFilter(
313313
TAttributeFilter()
314314
.AddAttribute("schema")
315315
.AddAttribute("dynamic")
316316
.AddAttribute("type")
317317
);
318-
return batch.Get(transactionId, path.Path_, getOptions);
318+
return batch->Get(transactionId, path.Path_, getOptions);
319319
});
320320

321321
TVector<NSkiff::TSkiffSchemaPtr> schemas;

yt/cpp/mapreduce/client/skiff.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node);
5959
TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema);
6060

6161
NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
62+
const IRawClientPtr& rawClient,
6263
const TClientContext& context,
6364
const IClientRetryPolicyPtr& clientRetryPolicy,
6465
const TTransactionId& transactionId,

yt/cpp/mapreduce/client/structured_table_formats.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ namespace NDetail {
121121
////////////////////////////////////////////////////////////////////////////////
122122

123123
NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
124+
const IRawClientPtr& rawClient,
124125
const TClientContext& context,
125126
const IClientRetryPolicyPtr& clientRetryPolicy,
126127
const TTransactionId& transactionId,
@@ -135,6 +136,7 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
135136
return nullptr;
136137
}
137138
return CreateSkiffSchemaIfNecessary(
139+
rawClient,
138140
context,
139141
clientRetryPolicy,
140142
transactionId,
@@ -434,6 +436,7 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
434436
tableList.emplace_back(*table.RichYPath);
435437
}
436438
skiffSchema = TryCreateSkiffSchema(
439+
RawClient_,
437440
Context_,
438441
ClientRetryPolicy_,
439442
TransactionId_,

yt/cpp/mapreduce/raw_client/raw_requests.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <yt/cpp/mapreduce/interface/client.h>
1010
#include <yt/cpp/mapreduce/interface/client_method_options.h>
1111
#include <yt/cpp/mapreduce/interface/operation.h>
12+
#include <yt/cpp/mapreduce/interface/raw_client.h>
1213

1314
namespace NYT {
1415

@@ -53,19 +54,18 @@ TAuthorizationInfo WhoAmI(const TClientContext& context);
5354

5455
template<typename TSrc, typename TBatchAdder>
5556
auto BatchTransform(
56-
const IRequestRetryPolicyPtr& retryPolicy,
57-
const TClientContext& context,
57+
const IRawClientPtr& rawClient,
5858
const TSrc& src,
5959
TBatchAdder batchAdder,
6060
const TExecuteBatchOptions& executeBatchOptions = {})
6161
{
62-
THttpRawBatchRequest batch(context, retryPolicy);
62+
auto batch = rawClient->CreateRawBatchRequest();
6363
using TFuture = decltype(batchAdder(batch, *std::begin(src)));
6464
TVector<TFuture> futures;
6565
for (const auto& el : src) {
6666
futures.push_back(batchAdder(batch, el));
6767
}
68-
batch.ExecuteBatch(executeBatchOptions);
68+
batch->ExecuteBatch(executeBatchOptions);
6969
using TDst = decltype(futures[0].ExtractValueSync());
7070
TVector<TDst> result;
7171
result.reserve(std::size(src));

0 commit comments

Comments
 (0)