Skip to content

Commit 2a34cb8

Browse files
left side control in stream (#16783)
1 parent 9901c01 commit 2a34cb8

File tree

5 files changed

+89
-37
lines changed

5 files changed

+89
-37
lines changed

ydb/core/formats/arrow/accessor/sparsed/accessor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ class TSparsedArrayChunk {
5858
: TSparsedArrayChunk(original) {
5959
AFL_VERIFY(!original.GetNotDefaultRecordsCount());
6060
RecordsCount = recordsCount;
61+
AFL_VERIFY(RemapExternalToInternal.size() == 1);
62+
AFL_VERIFY(RemapExternalToInternal[0].GetStartExt() == 0);
63+
AFL_VERIFY(RemapExternalToInternal[0].GetStartInt() == 0);
64+
AFL_VERIFY(RemapExternalToInternal[0].GetIsDefault());
65+
RemapExternalToInternal[0] = TInternalChunkInfo(0, 0, recordsCount, true);
6166
}
6267

6368
public:

ydb/core/formats/arrow/program/collection.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99

1010
namespace NKikimr::NArrow::NAccessor {
1111

12+
void TAccessorsCollection::Upsert(const ui32 columnId, const std::shared_ptr<IChunkedArray>& data, const bool withFilter) {
13+
Remove(columnId, true);
14+
AddVerified(columnId, data, withFilter);
15+
}
16+
1217
void TAccessorsCollection::AddVerified(const ui32 columnId, const arrow::Datum& data, const bool withFilter) {
1318
AddVerified(columnId, TAccessorCollectedContainer(data), withFilter);
1419
}

ydb/core/formats/arrow/program/collection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class TAccessorsCollection {
149149
void AddVerified(const ui32 columnId, const arrow::Datum& data, const bool withFilter);
150150
void AddVerified(const ui32 columnId, const std::shared_ptr<IChunkedArray>& data, const bool withFilter);
151151
void AddVerified(const ui32 columnId, const TAccessorCollectedContainer& data, const bool withFilter);
152+
void Upsert(const ui32 columnId, const std::shared_ptr<IChunkedArray>& data, const bool withFilter);
152153

153154
void AddConstantVerified(const ui32 columnId, const std::shared_ptr<arrow::Scalar>& scalar) {
154155
AFL_VERIFY(columnId);

ydb/core/formats/arrow/program/stream_logic.cpp

Lines changed: 75 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,72 +29,59 @@ TConclusion<bool> TStreamLogicProcessor::OnInputReady(
2929
const ui32 inputId, const TProcessorContext& context, const TExecutionNodeContext& /*nodeContext*/) const {
3030
auto accInput = context.GetResources()->GetAccessorVerified(inputId);
3131

32-
std::shared_ptr<arrow::Scalar> monoValue;
3332
AFL_VERIFY(!context.GetResources()->HasMarker(FinishMarker));
3433
const auto accResult = context.GetResources()->GetAccessorOptional(GetOutputColumnIdOnce());
3534

36-
const auto isMonoValue = accInput->CheckOneValueAccessor(monoValue);
37-
if (isMonoValue && *isMonoValue) {
38-
const auto isFalseConclusion = ScalarIsFalse(monoValue);
39-
if (isFalseConclusion.IsFail()) {
40-
return isFalseConclusion;
41-
}
42-
const auto isTrueConclusion = ScalarIsTrue(monoValue);
43-
if (isTrueConclusion.IsFail()) {
44-
return isTrueConclusion;
45-
}
46-
AFL_VERIFY(*isFalseConclusion || *isTrueConclusion);
35+
TConclusion<std::optional<bool>> isMonoInput = GetMonoInput(accInput);
36+
if (isMonoInput.IsFail()) {
37+
return isMonoInput;
38+
}
39+
40+
if (isMonoInput.GetResult()) {
41+
const bool monoValue = *isMonoInput.GetResult();
4742
if (Operation == NKernels::EOperation::And) {
48-
if (*isTrueConclusion) {
43+
if (monoValue) {
4944
if (!accResult) {
5045
context.GetResources()->AddVerified(GetOutputColumnIdOnce(),
51-
std::make_shared<NAccessor::TSparsedArray>(
52-
std::make_shared<arrow::UInt8Scalar>(1), arrow::uint8(), context.GetResources()->GetRecordsCountActualVerified()),
53-
false);
46+
NAccessor::TSparsedArray::BuildTrueArrayUI8(context.GetResources()->GetRecordsCountActualVerified()), false);
5447
}
5548
return false;
5649
} else {
57-
if (accResult) {
58-
context.GetResources()->Remove(GetOutputColumnIdOnce(), true);
59-
}
60-
context.GetResources()->AddVerified(GetOutputColumnIdOnce(),
61-
std::make_shared<NAccessor::TSparsedArray>(
62-
std::make_shared<arrow::UInt8Scalar>(0), arrow::uint8(), context.GetResources()->GetRecordsCountActualVerified()),
63-
false);
50+
context.GetResources()->Upsert(GetOutputColumnIdOnce(),
51+
NAccessor::TSparsedArray::BuildFalseArrayUI8(context.GetResources()->GetRecordsCountActualVerified()), false);
6452
return true;
6553
}
6654
} else if (Operation == NKernels::EOperation::Or) {
67-
if (*isFalseConclusion) {
55+
if (!monoValue) {
6856
if (!accResult) {
6957
context.GetResources()->AddVerified(GetOutputColumnIdOnce(),
70-
std::make_shared<NAccessor::TSparsedArray>(
71-
std::make_shared<arrow::UInt8Scalar>(0), arrow::uint8(), context.GetResources()->GetRecordsCountActualVerified()),
72-
false);
58+
NAccessor::TSparsedArray::BuildFalseArrayUI8(context.GetResources()->GetRecordsCountActualVerified()), false);
7359
}
7460
return false;
7561
} else {
76-
if (accResult) {
77-
context.GetResources()->Remove(GetOutputColumnIdOnce(), true);
78-
}
79-
context.GetResources()->AddVerified(GetOutputColumnIdOnce(),
80-
std::make_shared<NAccessor::TSparsedArray>(
81-
std::make_shared<arrow::UInt8Scalar>(1), arrow::uint8(), context.GetResources()->GetRecordsCountActualVerified()),
82-
false);
62+
context.GetResources()->Upsert(GetOutputColumnIdOnce(),
63+
NAccessor::TSparsedArray::BuildTrueArrayUI8(context.GetResources()->GetRecordsCountActualVerified()), false);
8364
return true;
8465
}
8566
}
8667
}
8768

8869
if (!accResult) {
70+
AFL_VERIFY(accInput->GetDataType()->id() == arrow::uint8()->id())("type", accInput->GetDataType()->ToString());
8971
context.GetResources()->AddVerified(GetOutputColumnIdOnce(), accInput, false);
9072
} else {
9173
auto result = Function->Call(TColumnChainInfo::BuildVector({ GetOutputColumnIdOnce(), inputId }), context.GetResources());
9274
if (result.IsFail()) {
9375
return result;
9476
}
77+
auto datum = result.DetachResult();
9578
context.GetResources()->Remove(GetOutputColumnIdOnce());
96-
context.GetResources()->AddVerified(GetOutputColumnIdOnce(), std::move(*result), false);
79+
context.GetResources()->AddVerified(GetOutputColumnIdOnce(), datum, false);
80+
if (IsFinishDatum(datum)) {
81+
return true;
82+
}
9783
}
84+
9885
return false;
9986
}
10087

@@ -162,4 +149,57 @@ NJson::TJsonValue TStreamLogicProcessor::DoDebugJson() const {
162149
return result;
163150
}
164151

152+
bool TStreamLogicProcessor::IsFinishDatum(const arrow::Datum& datum) const {
153+
const auto arrChecker = [&](const arrow::Array& arr) {
154+
AFL_VERIFY(arr.type()->id() == arrow::uint8()->id());
155+
const arrow::UInt8Array& ui8Arr = static_cast<const arrow::UInt8Array&>(arr);
156+
const ui8* values = ui8Arr.raw_values();
157+
if (Operation == NKernels::EOperation::And) {
158+
for (ui32 i = 0; i < ui8Arr.length(); ++i) {
159+
if (values[i] != 0) {
160+
return false;
161+
}
162+
}
163+
} else if (Operation == NKernels::EOperation::Or) {
164+
for (ui32 i = 0; i < ui8Arr.length(); ++i) {
165+
if (values[i] == 0) {
166+
return false;
167+
}
168+
}
169+
} else {
170+
AFL_VERIFY(false)("op", Operation);
171+
}
172+
return true;
173+
};
174+
if (datum.is_array()) {
175+
auto arr = datum.make_array();
176+
return arrChecker(*arr);
177+
} else if (datum.is_arraylike()) {
178+
auto arr = datum.chunked_array();
179+
AFL_VERIFY(arr->type()->id() == arrow::uint8()->id());
180+
for (auto&& chunk : arr->chunks()) {
181+
if (!arrChecker(*chunk)) {
182+
return false;
183+
}
184+
}
185+
return true;
186+
} else {
187+
AFL_VERIFY(false)("kind", (ui32)datum.kind());
188+
return false;
189+
}
190+
}
191+
192+
TConclusion<std::optional<bool>> TStreamLogicProcessor::GetMonoInput(const std::shared_ptr<IChunkedArray>& inputArray) const {
193+
std::shared_ptr<arrow::Scalar> monoValue;
194+
const auto isMonoValue = inputArray->CheckOneValueAccessor(monoValue);
195+
if (!isMonoValue || !*isMonoValue) {
196+
return std::optional<bool>();
197+
}
198+
const auto isFalseConclusion = ScalarIsFalse(monoValue);
199+
if (isFalseConclusion.IsFail()) {
200+
return isFalseConclusion;
201+
}
202+
return !*isFalseConclusion;
203+
}
204+
165205
} // namespace NKikimr::NArrow::NSSA

ydb/core/formats/arrow/program/stream_logic.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@ class TStreamLogicProcessor: public IResourceProcessor {
2121
return false;
2222
}
2323

24+
TConclusion<std::optional<bool>> GetMonoInput(const std::shared_ptr<IChunkedArray>& inputArray) const;
25+
26+
bool IsFinishDatum(const arrow::Datum& datum) const;
2427
virtual ui64 DoGetWeight() const override;
2528

2629
public:
2730
NKernels::EOperation GetOperation() const {
2831
return Operation;
2932
}
30-
3133
TConclusion<bool> OnInputReady(const ui32 inputId, const TProcessorContext& context, const TExecutionNodeContext& nodeContext) const;
32-
3334
TStreamLogicProcessor(std::vector<TColumnChainInfo>&& input, const TColumnChainInfo& output, const NKernels::EOperation op);
3435
};
3536

0 commit comments

Comments
 (0)