Skip to content

YQ-3704 RD use CH udf in json parsing #9878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions ydb/core/fq/libs/row_dispatcher/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ class TJsonParser::TImpl {
public:
TImpl(
const TVector<TString>& columns,
const TVector<TString>& types,
TCallback callback)
: Sql(GenerateSql(columns)) {
: Sql(GenerateSql(columns, types)) {
auto options = NYql::NPureCalc::TProgramFactoryOptions();
auto factory = NYql::NPureCalc::MakeProgramFactory(options);

Expand All @@ -240,7 +241,7 @@ class TJsonParser::TImpl {
TParserInputSpec(),
TParserOutputSpec(MakeOutputSchema(columns)),
Sql,
NYql::NPureCalc::ETranslationMode::SQL
NYql::NPureCalc::ETranslationMode::SExpr
);
LOG_ROW_DISPATCHER_DEBUG("Program created");
InputConsumer = Program->Apply(MakeHolder<TParserOutputConsumer>(callback));
Expand All @@ -257,19 +258,51 @@ class TJsonParser::TImpl {
}

private:
TString GenerateSql(const TVector<TString>& columns) {
TStringStream str;
str << "$json = SELECT CAST(data AS Json) as `Json`, " << OffsetFieldName << " FROM Input;";
str << "\nSELECT " << OffsetFieldName << ", ";
for (auto it = columns.begin(); it != columns.end(); ++it) {
str << R"(CAST(Unwrap(JSON_VALUE(`Json`, "$.)" << *it << "\")) as String) as "
<< *it << ((it != columns.end() - 1) ? "," : "");
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes) {
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size(), "Unexpected column types size");

TStringStream udfOutputType;
TStringStream resultType;
for (size_t i = 0; i < columnNames.size(); ++i) {
const TString& lastSymbol = i + 1 == columnNames.size() ? "" : " ";
const TString& column = columnNames[i];
const TString& type = SkipOptional(columnTypes[i]);

udfOutputType << "'('" << column << " (DataType '" << type << "))" << lastSymbol;
resultType << "'('" << column << " (SafeCast (Member $parsed '" << column << ") $string_type))" << lastSymbol;
}
str << " FROM $json;";

TStringStream str;
str << R"(
(
(let $string_type (DataType 'String))

(let $input_type (TupleType $string_type (DataType 'Uint64)))
(let $output_type (TupleType (StructType )" << udfOutputType.Str() << R"() (DataType 'Uint64)))
(let $udf_argument_type (TupleType $input_type (StructType) $output_type))
(let $udf_callable_type (CallableType '('1) '((StreamType $output_type)) '((StreamType $input_type)) '((OptionalType (DataType 'Utf8)))))
(let $udf (Udf 'ClickHouseClient.ParseFormat (Void) $udf_argument_type 'json_each_row $udf_callable_type (VoidType) '"" '()))

(return (Map (Apply $udf (Map (Self '0) (lambda '($input) (block '(
(return '((Member $input 'data) (Member $input ')" << OffsetFieldName << R"()))
))))) (lambda '($output) (block '(
(let $parsed (Nth $output '0))
(return (AsStruct '(')" << OffsetFieldName << R"( (Nth $output '1)) )" << resultType.Str() << R"())
)))))
)
)";
LOG_ROW_DISPATCHER_DEBUG("GenerateSql " << str.Str());
return str.Str();
}

static TString SkipOptional(TStringBuf type) {
if (type.StartsWith("Optional")) {
Y_ABORT_UNLESS(type.SkipPrefix("Optional<"));
Y_ABORT_UNLESS(type.ChopSuffix(">"));
}
return TString(type);
}

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TParserInputSpec, TParserOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<TInputConsumerArg>> InputConsumer;
Expand All @@ -278,8 +311,9 @@ class TJsonParser::TImpl {

TJsonParser::TJsonParser(
const TVector<TString>& columns,
const TVector<TString>& types,
TCallback callback)
: Impl(std::make_unique<TJsonParser::TImpl>(columns, callback)) {
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, callback)) {
}

TJsonParser::~TJsonParser() {
Expand All @@ -295,8 +329,9 @@ TString TJsonParser::GetSql() {

std::unique_ptr<TJsonParser> NewJsonParser(
const TVector<TString>& columns,
const TVector<TString>& types,
TCallback callback) {
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, callback));
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, callback));
}

} // namespace NFq
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/json_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TJsonParser {
public:
TJsonParser(
const TVector<TString>& columns,
const TVector<TString>& types,
TCallback callback);
~TJsonParser();
void Push(ui64 offset, const TString& value);
Expand All @@ -25,6 +26,7 @@ class TJsonParser {

std::unique_ptr<TJsonParser> NewJsonParser(
const TVector<TString>& columns,
const TVector<TString>& types,
TJsonParser::TCallback callback);

} // namespace NFq
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& source
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
Parser = NewJsonParser(
GetVector(sourceParams.GetColumns()),
GetVector(sourceParams.GetColumnTypes()),
[actorSystem, selfId = SelfId()](ui64 offset, TList<TString>&& value){
actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(offset, std::move(value)));
});
Expand Down
24 changes: 18 additions & 6 deletions ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/basics/helpers.h>
#include <ydb/core/testlib/actor_helpers.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <library/cpp/testing/unittest/registar.h>

namespace {
Expand All @@ -31,10 +34,19 @@ class TFixture : public NUnitTest::TBaseFixture {
}
}

void MakeParser(TVector<TString> columns, TVector<TString> types, NFq::TJsonParser::TCallback callback) {
try {
Parser = NFq::NewJsonParser(
columns,
types,
callback);
} catch (NYql::NPureCalc::TCompileError compileError) {
UNIT_ASSERT_C(false, TStringBuilder() << "Failed to create json parser: " << compileError.what() << "\nQuery text:\n" << compileError.GetYql() << "Reason:\n" << compileError.GetIssues());
}
}

void MakeParser(TVector<TString> columns, NFq::TJsonParser::TCallback callback) {
Parser = NFq::NewJsonParser(
columns,
callback);
MakeParser(columns, TVector<TString>(columns.size(), "String"), callback);
}

TActorSystemStub actorSystemStub;
Expand All @@ -46,11 +58,11 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
Y_UNIT_TEST_F(Simple1, TFixture) {
TList<TString> result;
ui64 resultOffset;
MakeParser({"a1", "a2"}, [&](ui64 offset, TList<TString>&& value){
MakeParser({"a1", "a2"}, {"String", "Optional<Uint64>"}, [&](ui64 offset, TList<TString>&& value){
resultOffset = offset;
result = std::move(value);
});
Parser->Push(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})");
Parser->Push(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})");
UNIT_ASSERT_VALUES_EQUAL(5, resultOffset);
UNIT_ASSERT_VALUES_EQUAL(2, result.size());
UNIT_ASSERT_VALUES_EQUAL("hello1", result.front());
Expand Down Expand Up @@ -102,7 +114,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {

MakeParser({"a2", "a1"}, [&](ui64, TList<TString>&&){ });
UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, " Failed to unwrap empty optional");
UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, "DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)");
}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TFixture : public NUnitTest::TBaseFixture {
settings.SetDatabase(GetDefaultPqDatabase());
settings.AddColumns("dt");
settings.AddColumns("value");
settings.AddColumnTypes("UInt64");
settings.AddColumnTypes("Uint64");
settings.AddColumnTypes("String");
if (!emptyPredicate) {
settings.SetPredicate("WHERE true");
Expand Down Expand Up @@ -263,7 +263,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
const std::vector<TString> data = { "not json", "noch einmal / nicht json" };
PQWrite(data, topicName);

ExpectSessionError(ReadActorId1, "Failed to unwrap empty optional");
ExpectSessionError(ReadActorId1, "DB::ParsingException: Cannot parse input: expected '{' before: 'not json': (at row 1)");
StopSession(ReadActorId1, source);
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ PEERDIR(
ydb/library/yql/udfs/common/yson2
ydb/tests/fq/pq_async_io
ydb/library/yql/sql/pg_dummy
ydb/library/yql/udfs/common/clickhouse/client
)

SIZE(MEDIUM)
Expand Down
Loading