@@ -27,20 +27,23 @@ std::optional<NArrow::NMerger::TCursor> TBaseMergeTask::DrainMergerLinearScan(co
27
27
return lastResultPosition;
28
28
}
29
29
30
- void TBaseMergeTask::PrepareResultBatch () {
30
+ TConclusionStatus TBaseMergeTask::PrepareResultBatch () {
31
31
if (!ResultBatch || ResultBatch->num_rows () == 0 ) {
32
32
AllocationGuard = nullptr ;
33
33
ResultBatch = nullptr ;
34
34
LastPK = nullptr ;
35
- return ;
35
+ return TConclusionStatus::Success () ;
36
36
}
37
37
const ui64 dataSizeBefore = NArrow::GetTableDataSize (ResultBatch);
38
38
const ui64 memorySizeBefore = NArrow::GetTableMemorySize (ResultBatch);
39
39
{
40
40
ResultBatch = NArrow::TColumnOperator ().VerifyIfAbsent ().Extract (ResultBatch, Context->GetProgramInputColumns ()->GetColumnNamesVector ());
41
41
AFL_VERIFY ((ui32)ResultBatch->num_columns () == Context->GetProgramInputColumns ()->GetColumnNamesVector ().size ());
42
42
auto accessors = std::make_shared<NArrow::NAccessor::TAccessorsCollection>(ResultBatch, *Context->GetCommonContext ()->GetResolver ());
43
- Context->GetReadMetadata ()->GetProgram ().ApplyProgram (accessors).Validate ();
43
+ auto conclusion = Context->GetReadMetadata ()->GetProgram ().ApplyProgram (accessors);
44
+ if (conclusion.IsFail ()) {
45
+ return conclusion;
46
+ }
44
47
if (accessors->GetRecordsCountOptional ().value_or (0 ) == 0 ) {
45
48
ResultBatch = nullptr ;
46
49
} else {
@@ -64,6 +67,7 @@ void TBaseMergeTask::PrepareResultBatch() {
64
67
ResultBatch = nullptr ;
65
68
LastPK = nullptr ;
66
69
}
70
+ return TConclusionStatus::Success ();
67
71
}
68
72
69
73
bool TBaseMergeTask::DoApply (IDataReader& indexedDataRead) const {
@@ -109,7 +113,10 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
109
113
if (Context->GetCommonContext ()->IsReverse ()) {
110
114
ResultBatch = NArrow::ReverseRecords (ResultBatch);
111
115
}
112
- PrepareResultBatch ();
116
+ auto conclusion = PrepareResultBatch ();
117
+ if (conclusion.IsFail ()) {
118
+ return conclusion;
119
+ }
113
120
}
114
121
Sources.clear ();
115
122
AFL_VERIFY (!!LastPK == (!!ResultBatch && ResultBatch->num_rows ()));
@@ -162,8 +169,7 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
162
169
LastPK = lastResultPosition->ExtractSortingPosition (MergingContext->GetFinish ().GetSortFields ());
163
170
}
164
171
AFL_VERIFY (!!LastPK == (!!ResultBatch && ResultBatch->num_rows ()));
165
- PrepareResultBatch ();
166
- return TConclusionStatus::Success ();
172
+ return PrepareResultBatch ();
167
173
}
168
174
169
175
TStartMergeTask::TStartMergeTask (const std::shared_ptr<TMergingContext>& mergingContext, const std::shared_ptr<TSpecialReadContext>& readContext,
@@ -187,8 +193,7 @@ TConclusionStatus TContinueMergeTask::DoExecuteImpl() {
187
193
LastPK = lastResultPosition->ExtractSortingPosition (MergingContext->GetFinish ().GetSortFields ());
188
194
}
189
195
AFL_VERIFY (!!LastPK == (!!ResultBatch && ResultBatch->num_rows ()));
190
- PrepareResultBatch ();
191
- return TConclusionStatus::Success ();
196
+ return PrepareResultBatch ();
192
197
}
193
198
194
199
} // namespace NKikimr::NOlap::NReader::NPlain
0 commit comments