Skip to content

Commit 7d04dad

Browse files
nepalmaximyurchuk
authored andcommitted
Merge PR #10587: Block input for YT map operations
initial commit_hash:61c8442fd8a0ebe277511b5d98b334cf6bc95337
1 parent db924b1 commit 7d04dad

File tree

6 files changed

+189
-0
lines changed

6 files changed

+189
-0
lines changed

yql/essentials/core/yql_opt_utils.cpp

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2216,5 +2216,95 @@ TVector<TString> GenNoClashColumns(const TStructExprType& source, TStringBuf pre
22162216
return result;
22172217
}
22182218

2219+
bool CheckSupportedTypes(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler) {
2220+
TSet<ETypeAnnotationKind> supported;
2221+
for (const auto &e: supportedTypes) {
2222+
if (e == "pg") {
2223+
supported.insert(ETypeAnnotationKind::Pg);
2224+
} else if (e == "tuple") {
2225+
supported.emplace(ETypeAnnotationKind::Tuple);
2226+
} else if (e == "struct") {
2227+
supported.emplace(ETypeAnnotationKind::Struct);
2228+
} else if (e == "dict") {
2229+
supported.emplace(ETypeAnnotationKind::Dict);
2230+
} else if (e == "list") {
2231+
supported.emplace(ETypeAnnotationKind::List);
2232+
} else if (e == "variant") {
2233+
supported.emplace(ETypeAnnotationKind::Variant);
2234+
} else {
2235+
// Unknown type
2236+
unsupportedTypeHandler(TStringBuilder() << "unknown type: " << e);
2237+
return false;
2238+
}
2239+
}
2240+
if (supportedDataTypes.size()) {
2241+
supported.emplace(ETypeAnnotationKind::Data);
2242+
}
2243+
auto checkType = [&] (const TTypeAnnotationNode* type) {
2244+
if (type->GetKind() == ETypeAnnotationKind::Data) {
2245+
if (!supported.contains(ETypeAnnotationKind::Data)) {
2246+
unsupportedTypeHandler(TStringBuilder() << "unsupported data types");
2247+
return false;
2248+
}
2249+
if (!supportedDataTypes.contains(type->Cast<TDataExprType>()->GetSlot())) {
2250+
unsupportedTypeHandler(TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot());
2251+
return false;
2252+
}
2253+
} else if (type->GetKind() == ETypeAnnotationKind::Pg) {
2254+
if (!supported.contains(ETypeAnnotationKind::Pg)) {
2255+
unsupportedTypeHandler(TStringBuilder() << "unsupported pg");
2256+
return false;
2257+
}
2258+
auto name = type->Cast<TPgExprType>()->GetName();
2259+
if (name == "float4" && !supportedDataTypes.contains(NUdf::EDataSlot::Float)) {
2260+
unsupportedTypeHandler(TStringBuilder() << "PgFloat4 unsupported yet since float is no supported");
2261+
return false;
2262+
}
2263+
} else {
2264+
unsupportedTypeHandler(TStringBuilder() << "unsupported annotation kind: " << type->GetKind());
2265+
return false;
2266+
}
2267+
return true;
2268+
};
2269+
2270+
TVector<const TTypeAnnotationNode*> stack(typesToCheck.begin(), typesToCheck.end());
2271+
while (!stack.empty()) {
2272+
auto el = stack.back();
2273+
stack.pop_back();
2274+
if (el->GetKind() == ETypeAnnotationKind::Optional) {
2275+
stack.push_back(el->Cast<TOptionalExprType>()->GetItemType());
2276+
continue;
2277+
}
2278+
if (!supported.contains(el->GetKind())) {
2279+
unsupportedTypeHandler(TStringBuilder() << "unsupported " << el->GetKind());
2280+
return false;
2281+
}
2282+
if (el->GetKind() == ETypeAnnotationKind::Tuple) {
2283+
for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
2284+
stack.push_back(e);
2285+
}
2286+
continue;
2287+
} else if (el->GetKind() == ETypeAnnotationKind::Struct) {
2288+
for (auto e: el->Cast<TStructExprType>()->GetItems()) {
2289+
stack.push_back(e->GetItemType());
2290+
}
2291+
continue;
2292+
} else if (el->GetKind() == ETypeAnnotationKind::List) {
2293+
stack.push_back(el->Cast<TListExprType>()->GetItemType());
2294+
continue;
2295+
} else if (el->GetKind() == ETypeAnnotationKind::Dict) {
2296+
stack.push_back(el->Cast<TDictExprType>()->GetKeyType());
2297+
stack.push_back(el->Cast<TDictExprType>()->GetPayloadType());
2298+
continue;
2299+
} else if (el->GetKind() == ETypeAnnotationKind::Variant) {
2300+
stack.push_back(el->Cast<TVariantExprType>()->GetUnderlyingType());
2301+
continue;
2302+
}
2303+
if (!checkType(el)) {
2304+
return false;
2305+
}
2306+
}
2307+
return true;
2308+
}
22192309

22202310
}

yql/essentials/core/yql_opt_utils.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,4 +168,6 @@ TPartOfConstraintBase::TSetType GetPathsToKeys(const TExprNode& body, const TExp
168168
// prefix should start with "_yql"
169169
TVector<TString> GenNoClashColumns(const TStructExprType& source, TStringBuf prefix, size_t count);
170170

171+
bool CheckSupportedTypes(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& typesSupported, const TSet<NUdf::EDataSlot>& dataSlotsSupported, std::function<void(const TString&)> unsupportedTypeHandler);
172+
171173
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
contrib/libs/apache/arrow
5+
)
6+
7+
SRCS(
8+
yql_codec_buf_input_stream.cpp
9+
)
10+
11+
END()
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include "yql_codec_buf_input_stream.h"
2+
3+
#include <yql/essentials/public/udf/arrow/defs.h>
4+
5+
#include <arrow/buffer.h>
6+
#include <arrow/buffer.h>
7+
8+
namespace NYql {
9+
namespace NCommon {
10+
11+
arrow::Result<int64_t> TInputBufArrowInputStream::Read(int64_t bytesToRead, void* outBuffer) {
12+
auto outBufferPtr = static_cast<char*>(outBuffer);
13+
14+
YQL_ENSURE(bytesToRead > 0);
15+
if (!Buffer_.TryRead(*outBufferPtr)) {
16+
EOSReached_ = true;
17+
return 0;
18+
}
19+
20+
Buffer_.ReadMany(outBufferPtr + 1, bytesToRead - 1);
21+
BytesRead_ += bytesToRead;
22+
return bytesToRead;
23+
}
24+
25+
arrow::Result<std::shared_ptr<arrow::Buffer>> TInputBufArrowInputStream::Read(int64_t nbytes) {
26+
auto outBuffer = ARROW_RESULT(AllocateResizableBuffer(nbytes, Pool_));
27+
auto bytesRead = ARROW_RESULT(Read(nbytes, outBuffer->mutable_data()));
28+
if (bytesRead == 0) {
29+
return std::make_shared<arrow::Buffer>(nullptr, 0);
30+
}
31+
32+
YQL_ENSURE(bytesRead == nbytes);
33+
return outBuffer;
34+
}
35+
36+
} // NCommon
37+
} // NYql
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#include <yql/essentials/providers/common/codec/yql_codec_buf.h>
2+
3+
#include <arrow/io/interfaces.h>
4+
#include <arrow/result.h>
5+
6+
namespace NYql {
7+
namespace NCommon {
8+
9+
class TInputBufArrowInputStream : public arrow::io::InputStream {
10+
public:
11+
explicit TInputBufArrowInputStream(TInputBuf& buffer, arrow::MemoryPool* pool)
12+
: Buffer_(buffer)
13+
, Pool_(pool)
14+
{
15+
}
16+
17+
arrow::Result<int64_t> Read(int64_t bytesToRead, void* outBuffer) override;
18+
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
19+
20+
arrow::Status Close() override {
21+
return arrow::Status::OK();
22+
}
23+
24+
arrow::Result<int64_t> Tell() const override {
25+
return BytesRead_;
26+
}
27+
28+
bool closed() const override {
29+
return false;
30+
}
31+
32+
bool EOSReached() const {
33+
return EOSReached_;
34+
}
35+
36+
private:
37+
TInputBuf& Buffer_;
38+
int64_t BytesRead_ = 0;
39+
bool EOSReached_ = false;
40+
41+
arrow::MemoryPool* Pool_;
42+
};
43+
44+
} // NCommon
45+
} // NYql

yql/essentials/providers/common/codec/ya.make

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ GENERATE_ENUM_SERIALIZATION(yql_codec_type_flags.h)
2828

2929
END()
3030

31+
RECURSE(
32+
arrow
33+
)
34+
3135
RECURSE_FOR_TESTS(
3236
ut
3337
)

0 commit comments

Comments
 (0)