Skip to content

Commit 3dc6d9b

Browse files
author
ermolovd
committed
introduce batching for concatenate
commit_hash:9db24e2e676d80f2972897ed8f5a9b84c4f8628c
1 parent 042929f commit 3dc6d9b

File tree

2 files changed

+46
-12
lines changed

2 files changed

+46
-12
lines changed

yt/cpp/mapreduce/client/client.cpp

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -256,21 +256,49 @@ void TClientBase::Concatenate(
256256
const TRichYPath& destinationPath,
257257
const TConcatenateOptions& options)
258258
{
259-
RequestWithRetry<void>(
260-
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
261-
[this, &sourcePaths, &destinationPath, &options] (TMutationId /*mutationId*/) {
262-
auto transaction = StartTransaction(TStartTransactionOptions());
259+
Y_ABORT_IF(options.MaxBatchSize_ <= 0);
260+
261+
ITransactionPtr outerTransaction;
262+
IClientBase* outerClient;
263+
if (std::ssize(sourcePaths) > options.MaxBatchSize_) {
264+
outerTransaction = StartTransaction(TStartTransactionOptions());
265+
outerClient = outerTransaction.Get();
266+
} else {
267+
outerClient = this;
268+
}
263269

264-
if (!options.Append_ && !sourcePaths.empty() && !transaction->Exists(destinationPath.Path_)) {
265-
auto typeNode = transaction->Get(CanonizeYPath(sourcePaths.front()).Path_ + "/@type");
266-
auto type = FromString<ENodeType>(typeNode.AsString());
267-
transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
268-
}
270+
TVector<TRichYPath> batch;
271+
for (ssize_t i = 0; i < std::ssize(sourcePaths); i += options.MaxBatchSize_) {
272+
auto begin = sourcePaths.begin() + i;
273+
auto end = sourcePaths.begin() + std::min(i + options.MaxBatchSize_, std::ssize(sourcePaths));
274+
batch.assign(begin, end);
269275

270-
RawClient_->Concatenate(transaction->GetId(), sourcePaths, destinationPath, options);
276+
bool firstBatch = i == 0;
277+
RequestWithRetry<void>(
278+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
279+
[this, &batch, &destinationPath, &options, outerClient, firstBatch] (TMutationId /*mutationId*/) {
280+
auto transaction = outerClient->StartTransaction(TStartTransactionOptions());
271281

272-
transaction->Commit();
273-
});
282+
if (firstBatch && !options.Append_ && !batch.empty() && !transaction->Exists(destinationPath.Path_)) {
283+
auto typeNode = transaction->Get(transaction->CanonizeYPath(batch.front()).Path_ + "/@type");
284+
auto type = FromString<ENodeType>(typeNode.AsString());
285+
transaction->Create(destinationPath.Path_, type, TCreateOptions().IgnoreExisting(true));
286+
}
287+
288+
TConcatenateOptions currentOptions = options;
289+
if (!firstBatch) {
290+
currentOptions.Append_ = true;
291+
}
292+
293+
RawClient_->Concatenate(transaction->GetId(), batch, destinationPath, currentOptions);
294+
295+
transaction->Commit();
296+
});
297+
}
298+
299+
if (outerTransaction) {
300+
outerTransaction->Commit();
301+
}
274302
}
275303

276304
TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)

yt/cpp/mapreduce/interface/client_method_options.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ struct TConcatenateOptions
264264

265265
/// Whether we should append to destination or rewrite it.
266266
FLUENT_FIELD_OPTION(bool, Append);
267+
268+
// Maximum number of items to process in single concat request.
269+
//
270+
// If number of items provided is greater then this parameter
271+
// client might split concatenate to several requests.
272+
FLUENT_FIELD_DEFAULT(int, MaxBatchSize, 20);
267273
};
268274

269275
///

0 commit comments

Comments
 (0)