Skip to content

Commit 0ba4810

Browse files
committed
Fixed issues
1 parent 596d386 commit 0ba4810

File tree

6 files changed

+15
-31
lines changed

6 files changed

+15
-31
lines changed

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ struct TParquetFileInfo {
274274
class TS3ReadCoroImpl : public TActorCoroImpl {
275275
friend class TS3StreamReadActor;
276276
277-
static constexpr ui64 BLOCK_BATCH_METADATA_SIZE = 1_MB;
277+
static constexpr double BLOCK_MIN_FILL_RATIO = 0.9; // Blocks will be shrinked to fit only if size <= (min fill ratio) * capacity
278278
static constexpr ui64 BLOCK_ROW_METADATA_SIZE = sizeof(ui64);
279279
280280
public:
@@ -639,10 +639,10 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
639639
void SendRecordBatchEvent(std::shared_ptr<arrow::RecordBatch> batch, std::vector<TColumnConverter>& columnConverters, ui64& decodedBytes, ui64& numRows) {
640640
auto convertedBatch = ConvertArrowColumns(batch, columnConverters);
641641
auto ev = std::make_unique<TEvS3Provider::TEvNextRecordBatch>(PathIndex, TakeIngressDelta(), TakeCpuTimeDelta());
642-
ArrowBlockSplitter.SplitRecordBatch(convertedBatch, numRows, ev->SplitedBatch);
642+
ArrowBlockSplitter.SplitRecordBatch(convertedBatch, numRows, ev->SplittedBatch);
643643
644644
ui64 size = 0;
645-
for (const auto& batch : ev->SplitedBatch) {
645+
for (const auto& batch : ev->SplittedBatch) {
646646
size += NKikimr::NArrow::GetBatchDataSize(batch);
647647
}
648648
decodedBytes += size;
@@ -1068,7 +1068,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
10681068
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
10691069
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
10701070
PathIndex(pathIndex), Path(path), Url(url), RowsRemained(maxRows),
1071-
ArrowBlockSplitter(channelBufferSize, BLOCK_ROW_METADATA_SIZE, BLOCK_BATCH_METADATA_SIZE),
1071+
ArrowBlockSplitter(BLOCK_MIN_FILL_RATIO * channelBufferSize, BLOCK_ROW_METADATA_SIZE),
10721072
SourceContext(queueBufferCounter), DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
10731073
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize), AsyncDecompressing(asyncDecompressing) {
10741074
}
@@ -1770,7 +1770,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
17701770
void HandleNextRecordBatch(TEvS3Provider::TEvNextRecordBatch::TPtr& next) {
17711771
YQL_ENSURE(ReadSpec->Arrow);
17721772
ui64 rows = 0;
1773-
for (const auto& batch : next->Get()->SplitedBatch) {
1773+
for (const auto& batch : next->Get()->SplittedBatch) {
17741774
rows += batch->num_rows();
17751775
IngressStats.Chunks++;
17761776
if (Counters) {

ydb/library/yql/providers/s3/common/util.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ TString TUrlBuilder::Build() const {
9595
return std::move(result);
9696
}
9797

98-
TArrowBlockSplitter::TArrowBlockSplitter(ui32 chunkSizeLimit, ui32 rowMetaSize, ui32 batchMetaSize)
98+
TArrowBlockSplitter::TArrowBlockSplitter(ui32 chunkSizeLimit, ui32 rowMetaSize)
9999
: ChunkSizeLimit(chunkSizeLimit)
100100
, RowMetaSize(rowMetaSize)
101-
,BatchMetaSize(batchMetaSize)
102101
{}
103102

104103
void TArrowBlockSplitter::SplitRecordBatch(std::shared_ptr<arrow::RecordBatch> batch, ui64 firstRowId, std::vector<std::shared_ptr<arrow::RecordBatch>>& result) {
@@ -132,7 +131,7 @@ void TArrowBlockSplitter::SplitRecordBatch(std::shared_ptr<arrow::RecordBatch> b
132131

133132
bool TArrowBlockSplitter::CheckBatchSize(const std::shared_ptr<arrow::RecordBatch>& batch) const {
134133
const ui64 batchSize = NKikimr::NArrow::GetBatchDataSize(batch) + batch->num_rows() * RowMetaSize;
135-
return batchSize + BatchMetaSize <= ChunkSizeLimit;
134+
return batchSize <= ChunkSizeLimit;
136135
}
137136

138137
}

ydb/library/yql/providers/s3/common/util.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class TUrlBuilder {
3636

3737
class TArrowBlockSplitter {
3838
public:
39-
TArrowBlockSplitter(ui32 chunkSizeLimit, ui32 rowMetaSize, ui32 batchMetaSize);
39+
TArrowBlockSplitter(ui32 chunkSizeLimit, ui32 rowMetaSize);
4040

4141
void SplitRecordBatch(std::shared_ptr<arrow::RecordBatch> batch, ui64 firstRowId, std::vector<std::shared_ptr<arrow::RecordBatch>>& result);
4242

@@ -46,7 +46,6 @@ class TArrowBlockSplitter {
4646
private:
4747
const ui32 ChunkSizeLimit;
4848
const ui32 RowMetaSize;
49-
const ui32 BatchMetaSize;
5049
std::vector<std::shared_ptr<arrow::RecordBatch>> SplitStack;
5150
};
5251

ydb/library/yql/providers/s3/common/util_ut.cpp

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ Y_UNIT_TEST_SUITE(TestArrowBlockSplitter) {
9393
const ui64 totalSize = GetBatchDataSize(batch);
9494
constexpr ui64 numberParts = 8;
9595

96-
TArrowBlockSplitter splitter(totalSize / numberParts, 0, 0);
96+
TArrowBlockSplitter splitter(totalSize / numberParts, 0);
9797
std::vector<std::shared_ptr<arrow::RecordBatch>> splttedBatches;
9898
splitter.SplitRecordBatch(batch, 0, splttedBatches);
9999
ValidateSplit(batch, numberParts, splttedBatches);
@@ -107,35 +107,21 @@ Y_UNIT_TEST_SUITE(TestArrowBlockSplitter) {
107107
const ui64 totalSize = GetBatchDataSize(batch) + rowOverhead * batch->num_rows();
108108
constexpr ui64 numberParts = 8;
109109

110-
TArrowBlockSplitter splitter(totalSize / numberParts, rowOverhead, 0);
110+
TArrowBlockSplitter splitter(totalSize / numberParts, rowOverhead);
111111
std::vector<std::shared_ptr<arrow::RecordBatch>> splttedBatches;
112112
splitter.SplitRecordBatch(batch, 0, splttedBatches);
113113
ValidateSplit(batch, numberParts, splttedBatches);
114114
}
115115

116-
Y_UNIT_TEST(SplitByMetaSize) {
117-
NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::Int64Type>>>("field");
118-
std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048);
119-
120-
const ui64 totalSize = GetBatchDataSize(batch);
121-
const ui64 batchOverhead = totalSize / 2;
122-
123-
TArrowBlockSplitter splitter(totalSize, 0, batchOverhead);
124-
std::vector<std::shared_ptr<arrow::RecordBatch>> splttedBatches;
125-
splitter.SplitRecordBatch(batch, 0, splttedBatches);
126-
ValidateSplit(batch, 2, splttedBatches);
127-
}
128-
129116
Y_UNIT_TEST(PassSmallBlock) {
130117
NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TStringPoolFiller>>(
131118
"field", NConstruction::TStringPoolFiller(8, 512));
132119
std::shared_ptr<arrow::RecordBatch> batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(2048);
133120

134121
constexpr ui64 rowOverhead = sizeof(ui64);
135122
const ui64 totalSize = GetBatchDataSize(batch) + rowOverhead * batch->num_rows();
136-
const ui64 batchOverhead = 1_MB;
137123

138-
TArrowBlockSplitter splitter(totalSize + batchOverhead, rowOverhead, batchOverhead);
124+
TArrowBlockSplitter splitter(totalSize, rowOverhead);
139125
std::vector<std::shared_ptr<arrow::RecordBatch>> splttedBatches;
140126
splitter.SplitRecordBatch(batch, 0, splttedBatches);
141127
ValidateSplit(batch, 1, splttedBatches);
@@ -150,7 +136,7 @@ Y_UNIT_TEST_SUITE(TestArrowBlockSplitter) {
150136
const ui64 totalSize = GetBatchDataSize(batch);
151137

152138
constexpr ui64 rowId = 42;
153-
TArrowBlockSplitter splitter(strSize / 2, 0, 0);
139+
TArrowBlockSplitter splitter(strSize / 2, 0);
154140
std::vector<std::shared_ptr<arrow::RecordBatch>> splttedBatches;
155141
UNIT_ASSERT_EXCEPTION_CONTAINS(splitter.SplitRecordBatch(batch, rowId, splttedBatches), parquet::ParquetException, TStringBuilder() << "Row " << rowId + 1 << " size is " << totalSize << ", that is larger than allowed limit " << strSize / 2);
156142
}

ydb/library/yql/providers/s3/common/ya.make

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ SRCS(
1414
)
1515

1616
PEERDIR(
17+
contrib/libs/apache/arrow
1718
ydb/library/formats/arrow
1819
ydb/library/yql/providers/common/http_gateway
1920
ydb/library/yql/providers/s3/events
21+
yql/essentials/ast
2022
yql/essentials/public/issue
2123
yql/essentials/public/issue/protos
22-
yql/essentials/ast
23-
contrib/libs/apache/arrow
2424
)
2525

2626
IF (CLANG AND NOT WITH_VALGRIND)

ydb/library/yql/providers/s3/events/events.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ struct TEvS3Provider {
191191
TEvNextRecordBatch(size_t pathInd, ui64 ingressDelta, TDuration cpuTimeDelta)
192192
: PathIndex(pathInd), IngressDelta(ingressDelta), CpuTimeDelta(cpuTimeDelta) {
193193
}
194-
std::vector<std::shared_ptr<arrow::RecordBatch>> SplitedBatch;
194+
std::vector<std::shared_ptr<arrow::RecordBatch>> SplittedBatch;
195195
const size_t PathIndex;
196196
const ui64 IngressDelta;
197197
const TDuration CpuTimeDelta;

0 commit comments

Comments
 (0)