Skip to content

Commit 949bebf

Browse files
committed
YQL-20059: Remove dechunking from fallback handler
commit_hash:20bcd0062beca6963b3c420835a33b9873f43ab2
1 parent caec1b0 commit 949bebf

File tree

1 file changed

+6
-41
lines changed

1 file changed

+6
-41
lines changed

yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -230,48 +230,15 @@ class TBlockBuilder {
230230
++RowsCnt_;
231231
}
232232

233-
std::vector<TResultBatch::TPtr> Build() {
233+
TResultBatch::TPtr Build() {
234234
std::vector<arrow::Datum> columns;
235235
columns.reserve(ColumnBuilders_.size());
236236
for (size_t i = 0; i < ColumnBuilders_.size(); ++i) {
237237
columns.emplace_back(std::move(ColumnBuilders_[i]->Build(false)));
238238
}
239-
std::vector<std::shared_ptr<TResultBatch>> blocks;
240-
int64_t offset = 0;
241-
std::vector<int64_t> currentChunk(columns.size()), inChunkOffset(columns.size());
242-
while (RowsCnt_) {
243-
int64_t max_curr_len = RowsCnt_;
244-
for (size_t i = 0; i < columns.size(); ++i) {
245-
if (arrow::Datum::Kind::CHUNKED_ARRAY == columns[i].kind()) {
246-
auto& c_arr = columns[i].chunked_array();
247-
while (currentChunk[i] < c_arr->num_chunks() && !c_arr->chunk(currentChunk[i])) {
248-
++currentChunk[i];
249-
}
250-
YQL_ENSURE(currentChunk[i] < c_arr->num_chunks());
251-
max_curr_len = std::min(max_curr_len, c_arr->chunk(currentChunk[i])->length() - inChunkOffset[i]);
252-
}
253-
}
254-
RowsCnt_ -= max_curr_len;
255-
decltype(columns) result_columns;
256-
result_columns.reserve(columns.size());
257-
offset += max_curr_len;
258-
for (size_t i = 0; i < columns.size(); ++i) {
259-
auto& e = columns[i];
260-
if (arrow::Datum::Kind::CHUNKED_ARRAY == e.kind()) {
261-
result_columns.emplace_back(e.chunked_array()->chunk(currentChunk[i])->Slice(inChunkOffset[i], max_curr_len));
262-
if (max_curr_len + inChunkOffset[i] == e.chunked_array()->chunk(currentChunk[i])->length()) {
263-
++currentChunk[i];
264-
inChunkOffset[i] = 0;
265-
} else {
266-
inChunkOffset[i] += max_curr_len;
267-
}
268-
} else {
269-
result_columns.emplace_back(e.array()->Slice(offset - max_curr_len, max_curr_len));
270-
}
271-
}
272-
blocks.emplace_back(std::make_shared<TResultBatch>(max_curr_len, std::move(result_columns)));
273-
}
274-
return blocks;
239+
auto res = std::make_shared<TResultBatch>(RowsCnt_, std::move(columns));
240+
RowsCnt_ = 0;
241+
return res;
275242
}
276243

277244
private:
@@ -480,9 +447,7 @@ class TSource : public TNonCopyable {
480447
}
481448
}
482449
if (payload) {
483-
for (auto &e: FallbackHandler(inputIdx, payload)) {
484-
Listener_->HandleFallback(std::move(e));
485-
}
450+
Listener_->HandleFallback(FallbackHandler(inputIdx, payload));
486451
InputDone(inputIdx);
487452
RunRead();
488453
}
@@ -494,7 +459,7 @@ class TSource : public TNonCopyable {
494459
}
495460
}
496461

497-
std::vector<TResultBatch::TPtr> FallbackHandler(size_t idx, NYT::TSharedRef payload) {
462+
TResultBatch::TPtr FallbackHandler(size_t idx, NYT::TSharedRef payload) {
498463
if (!payload.Size()) {
499464
return {};
500465
}

0 commit comments

Comments
 (0)