Skip to content

Commit b3ab002

Browse files
authored
Always copy the wide consumed data by DqOutput (#12373)
1 parent a9cc770 commit b3ab002

File tree

2 files changed

+12
-8
lines changed

2 files changed

+12
-8
lines changed

ydb/library/yql/dq/runtime/dq_output_consumer.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
368368
YQL_ENSURE(false, "Consume() called on wide block stream");
369369
}
370370

371-
void WideConsume(TUnboxedValue* values, ui32 count) final {
371+
void WideConsume(TUnboxedValue values[], ui32 count) final {
372372
YQL_ENSURE(!IsWaitingFlag_);
373373
YQL_ENSURE(count == OutputWidth_);
374374

@@ -379,7 +379,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
379379

380380
TVector<const arrow::Datum*> datums;
381381
datums.reserve(count - 1);
382-
for (ui32 i = 0; i + 1 < count; ++i) {
382+
for (ui32 i = 0; i < count - 1; ++i) {
383383
datums.push_back(&TArrowBlock::From(values[i]).GetDatum());
384384
}
385385

@@ -404,9 +404,10 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
404404
if (src->is_scalar()) {
405405
output.emplace_back(*src);
406406
} else {
407-
IArrayBuilder::TArrayDataItem dataItem;
408-
dataItem.Data = src->array().get();
409-
dataItem.StartOffset = 0;
407+
IArrayBuilder::TArrayDataItem dataItem {
408+
.Data = src->array().get(),
409+
.StartOffset = 0,
410+
};
410411
Builders_[j]->AddMany(&dataItem, 1, indexes, outputBlockLen);
411412
output.emplace_back(Builders_[j]->Build(true));
412413
}
@@ -418,11 +419,14 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
418419
}
419420

420421
void DoConsume(TVector<std::unique_ptr<TArgsDechunker>>&& outputData) const {
422+
Y_ENSURE(outputData.size() == Outputs_.size());
423+
421424
while (!outputData.empty()) {
422425
bool hasData = false;
423426
for (size_t i = 0; i < Outputs_.size(); ++i) {
424427
if (Outputs_[i]->IsFull()) {
425428
IsWaitingFlag_ = true;
429+
Y_ENSURE(OutputData_.empty());
426430
OutputData_ = std::move(outputData);
427431
return;
428432
}
@@ -474,7 +478,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
474478
return !IsWaitingFlag_;
475479
}
476480

477-
size_t GetHashPartitionIndex(const arrow::Datum** values, ui64 blockIndex) {
481+
size_t GetHashPartitionIndex(const arrow::Datum* values[], ui64 blockIndex) {
478482
ui64 hash = 0;
479483
for (size_t keyId = 0; keyId < KeyColumns_.size(); keyId++) {
480484
const ui32 columnIndex = KeyColumns_[keyId].Index;
@@ -512,7 +516,7 @@ class TDqOutputHashPartitionConsumerBlock : public IDqOutputConsumer {
512516
if (blockType->GetShape() == NMiniKQL::TBlockType::EShape::Many) {
513517
auto itemType = blockType->GetItemType();
514518
YQL_ENSURE(!itemType->IsPg(), "pg types are not supported yet");
515-
Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr));
519+
Builders_.emplace_back(MakeArrayBuilder(helper, itemType, *NYql::NUdf::GetYqlMemoryPool(), maxBlockLen, nullptr, {.MinFillPercentage=100}));
516520
} else {
517521
Builders_.emplace_back();
518522
}

ydb/library/yql/dq/runtime/dq_output_consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class IDqOutputConsumer : public TSimpleRefCount<IDqOutputConsumer>,
2828
}
2929
virtual bool IsFull() const = 0;
3030
virtual void Consume(NKikimr::NUdf::TUnboxedValue&& value) = 0;
31-
virtual void WideConsume(NKikimr::NUdf::TUnboxedValue* values, ui32 count) = 0;
31+
virtual void WideConsume(NKikimr::NUdf::TUnboxedValue values[], ui32 count) = 0;
3232
virtual void Consume(NDqProto::TCheckpoint&& checkpoint) = 0;
3333
virtual void Finish() = 0;
3434
bool IsFinishing() const {

0 commit comments

Comments
 (0)