Skip to content

Commit 22d7f65

Browse files
committed
YT-21942: Automate selection of buffer row count
ee58dd90305b5dd3c29edfc95a3ddb7c2395d2bf
1 parent 6d6f164 commit 22d7f65

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

yt/yt/client/table_client/adapters.cpp

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "adapters.h"
2+
23
#include "row_batch.h"
34

45
#include <yt/yt/client/api/table_writer.h>
@@ -15,6 +16,10 @@ using namespace NCrypto;
1516

1617
////////////////////////////////////////////////////////////////////////////////
1718

19+
const NLogging::TLogger Logger("TableClientAdapters");
20+
21+
////////////////////////////////////////////////////////////////////////////////
22+
1823
class TApiFromSchemalessWriterAdapter
1924
: public NApi::ITableWriter
2025
{
@@ -178,7 +183,7 @@ void PipeReaderToWriterByBatches(
178183
const NFormats::ISchemalessFormatWriterPtr& writer,
179184
const TRowBatchReadOptions& options,
180185
TDuration pipeDelay)
181-
{
186+
try {
182187
TPeriodicYielder yielder(TDuration::Seconds(1));
183188

184189
while (auto batch = reader->Read(options)) {
@@ -202,6 +207,50 @@ void PipeReaderToWriterByBatches(
202207

203208
WaitFor(writer->Close())
204209
.ThrowOnError();
210+
} catch (const std::exception& ex) {
211+
YT_LOG_ERROR(ex, "PipeReaderToWriterByBatches failed");
212+
213+
THROW_ERROR_EXCEPTION(ex);
214+
}
215+
216+
void PipeReaderToAdaptiveWriterByBatches(
217+
const ITableReaderPtr& reader,
218+
const NFormats::ISchemalessFormatWriterPtr& writer,
219+
TRowBatchReadOptions options,
220+
TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater,
221+
TDuration pipeDelay)
222+
try {
223+
TPeriodicYielder yielder(TDuration::Seconds(1));
224+
225+
while (auto batch = reader->Read(options)) {
226+
yielder.TryYield();
227+
228+
if (batch->IsEmpty()) {
229+
WaitFor(reader->GetReadyEvent())
230+
.ThrowOnError();
231+
continue;
232+
}
233+
234+
if (!batch->IsEmpty() && pipeDelay != TDuration::Zero()) {
235+
TDelayedExecutor::WaitForDuration(pipeDelay);
236+
}
237+
238+
NProfiling::TWallTimer timer;
239+
240+
if (!writer->WriteBatch(batch)) {
241+
WaitFor(writer->GetReadyEvent())
242+
.ThrowOnError();
243+
}
244+
245+
optionsUpdater(&options, timer.GetElapsedTime());
246+
}
247+
248+
WaitFor(writer->Close())
249+
.ThrowOnError();
250+
} catch (const std::exception& ex) {
251+
YT_LOG_ERROR(ex, "PipeReaderToAdaptiveWriterByBatches failed");
252+
253+
THROW_ERROR_EXCEPTION(ex);
205254
}
206255

207256
void PipeInputToOutput(

yt/yt/client/table_client/adapters.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ void PipeReaderToWriterByBatches(
4444
const TRowBatchReadOptions& options,
4545
TDuration pipeDelay = TDuration::Zero());
4646

47+
void PipeReaderToAdaptiveWriterByBatches(
48+
const NApi::ITableReaderPtr& reader,
49+
const NFormats::ISchemalessFormatWriterPtr& writer,
50+
TRowBatchReadOptions startingOptions,
51+
TCallback<void(TRowBatchReadOptions* mutableOptions, TDuration timeForBatch)> optionsUpdater,
52+
TDuration pipeDelay = TDuration::Zero());
53+
4754
void PipeInputToOutput(
4855
IInputStream* input,
4956
IOutputStream* output,

0 commit comments

Comments
 (0)