diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 2c3a3d38f024..84ca3018b509 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -230,8 +230,9 @@ class TJsonParser::TImpl { public: TImpl( const TVector& columns, + const TVector& types, TCallback callback) - : Sql(GenerateSql(columns)) { + : Sql(GenerateSql(columns, types)) { auto options = NYql::NPureCalc::TProgramFactoryOptions(); auto factory = NYql::NPureCalc::MakeProgramFactory(options); @@ -240,7 +241,7 @@ class TJsonParser::TImpl { TParserInputSpec(), TParserOutputSpec(MakeOutputSchema(columns)), Sql, - NYql::NPureCalc::ETranslationMode::SQL + NYql::NPureCalc::ETranslationMode::SExpr ); LOG_ROW_DISPATCHER_DEBUG("Program created"); InputConsumer = Program->Apply(MakeHolder(callback)); @@ -257,19 +258,51 @@ class TJsonParser::TImpl { } private: - TString GenerateSql(const TVector& columns) { - TStringStream str; - str << "$json = SELECT CAST(data AS Json) as `Json`, " << OffsetFieldName << " FROM Input;"; - str << "\nSELECT " << OffsetFieldName << ", "; - for (auto it = columns.begin(); it != columns.end(); ++it) { - str << R"(CAST(Unwrap(JSON_VALUE(`Json`, "$.)" << *it << "\")) as String) as " - << *it << ((it != columns.end() - 1) ? "," : ""); + TString GenerateSql(const TVector& columnNames, const TVector& columnTypes) { + Y_ABORT_UNLESS(columnNames.size() == columnTypes.size(), "Unexpected column types size"); + + TStringStream udfOutputType; + TStringStream resultType; + for (size_t i = 0; i < columnNames.size(); ++i) { + const TString& lastSymbol = i + 1 == columnNames.size() ? "" : " "; + const TString& column = columnNames[i]; + const TString& type = SkipOptional(columnTypes[i]); + + udfOutputType << "'('" << column << " (DataType '" << type << "))" << lastSymbol; + resultType << "'('" << column << " (SafeCast (Member $parsed '" << column << ") $string_type))" << lastSymbol; } - str << " FROM $json;"; + + TStringStream str; + str << R"( + ( + (let $string_type (DataType 'String)) + + (let $input_type (TupleType $string_type (DataType 'Uint64))) + (let $output_type (TupleType (StructType )" << udfOutputType.Str() << R"() (DataType 'Uint64))) + (let $udf_argument_type (TupleType $input_type (StructType) $output_type)) + (let $udf_callable_type (CallableType '('1) '((StreamType $output_type)) '((StreamType $input_type)) '((OptionalType (DataType 'Utf8))))) + (let $udf (Udf 'ClickHouseClient.ParseFormat (Void) $udf_argument_type 'json_each_row $udf_callable_type (VoidType) '"" '())) + + (return (Map (Apply $udf (Map (Self '0) (lambda '($input) (block '( + (return '((Member $input 'data) (Member $input ')" << OffsetFieldName << R"())) + ))))) (lambda '($output) (block '( + (let $parsed (Nth $output '0)) + (return (AsStruct '(')" << OffsetFieldName << R"( (Nth $output '1)) )" << resultType.Str() << R"()) + ))))) + ) + )"; LOG_ROW_DISPATCHER_DEBUG("GenerateSql " << str.Str()); return str.Str(); } + static TString SkipOptional(TStringBuf type) { + if (type.StartsWith("Optional")) { + Y_ABORT_UNLESS(type.SkipPrefix("Optional<")); + Y_ABORT_UNLESS(type.ChopSuffix(">")); + } + return TString(type); + } + private: THolder> Program; THolder> InputConsumer; @@ -278,8 +311,9 @@ class TJsonParser::TImpl { TJsonParser::TJsonParser( const TVector& columns, + const TVector& types, TCallback callback) - : Impl(std::make_unique(columns, callback)) { + : Impl(std::make_unique(columns, types, callback)) { } TJsonParser::~TJsonParser() { @@ -295,8 +329,9 @@ TString TJsonParser::GetSql() { std::unique_ptr NewJsonParser( const TVector& columns, + const TVector& types, TCallback callback) { - return std::unique_ptr(new TJsonParser(columns, callback)); + return std::unique_ptr(new TJsonParser(columns, types, callback)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index d67761b401ce..cb5137105e6b 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -13,6 +13,7 @@ class TJsonParser { public: TJsonParser( const TVector& columns, + const TVector& types, TCallback callback); ~TJsonParser(); void Push(ui64 offset, const TString& value); @@ -25,6 +26,7 @@ class TJsonParser { std::unique_ptr NewJsonParser( const TVector& columns, + const TVector& types, TJsonParser::TCallback callback); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 2294f097a5fc..9623806ee87a 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -676,6 +676,7 @@ void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& source NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); Parser = NewJsonParser( GetVector(sourceParams.GetColumns()), + GetVector(sourceParams.GetColumnTypes()), [actorSystem, selfId = SelfId()](ui64 offset, TList&& value){ actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(offset, std::move(value))); }); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 58f19a4ee262..a9c389d3900f 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -6,6 +6,9 @@ #include #include #include + +#include + #include namespace { @@ -31,10 +34,19 @@ class TFixture : public NUnitTest::TBaseFixture { } } + void MakeParser(TVector columns, TVector types, NFq::TJsonParser::TCallback callback) { + try { + Parser = NFq::NewJsonParser( + columns, + types, + callback); + } catch (NYql::NPureCalc::TCompileError compileError) { + UNIT_ASSERT_C(false, TStringBuilder() << "Failed to create json parser: " << compileError.what() << "\nQuery text:\n" << compileError.GetYql() << "Reason:\n" << compileError.GetIssues()); + } + } + void MakeParser(TVector columns, NFq::TJsonParser::TCallback callback) { - Parser = NFq::NewJsonParser( - columns, - callback); + MakeParser(columns, TVector(columns.size(), "String"), callback); } TActorSystemStub actorSystemStub; @@ -46,11 +58,11 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple1, TFixture) { TList result; ui64 resultOffset; - MakeParser({"a1", "a2"}, [&](ui64 offset, TList&& value){ + MakeParser({"a1", "a2"}, {"String", "Optional"}, [&](ui64 offset, TList&& value){ resultOffset = offset; result = std::move(value); }); - Parser->Push(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); + Parser->Push(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); @@ -102,7 +114,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeParser({"a2", "a1"}, [&](ui64, TList&&){ }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, " Failed to unwrap empty optional"); + UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, "DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)"); } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 8e7036f057b5..ba24378e0a35 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -81,7 +81,7 @@ class TFixture : public NUnitTest::TBaseFixture { settings.SetDatabase(GetDefaultPqDatabase()); settings.AddColumns("dt"); settings.AddColumns("value"); - settings.AddColumnTypes("UInt64"); + settings.AddColumnTypes("Uint64"); settings.AddColumnTypes("String"); if (!emptyPredicate) { settings.SetPredicate("WHERE true"); @@ -263,7 +263,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { "not json", "noch einmal / nicht json" }; PQWrite(data, topicName); - ExpectSessionError(ReadActorId1, "Failed to unwrap empty optional"); + ExpectSessionError(ReadActorId1, "DB::ParsingException: Cannot parse input: expected '{' before: 'not json': (at row 1)"); StopSession(ReadActorId1, source); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/ya.make b/ydb/core/fq/libs/row_dispatcher/ut/ya.make index bb66ec57798f..25242d092f28 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ut/ya.make @@ -20,6 +20,7 @@ PEERDIR( ydb/library/yql/udfs/common/yson2 ydb/tests/fq/pq_async_io ydb/library/yql/sql/pg_dummy + ydb/library/yql/udfs/common/clickhouse/client ) SIZE(MEDIUM)