From 0c1e47ccafbb6e10164e36e7c5e475df05a9281b Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sun, 29 Sep 2024 07:45:12 +0000 Subject: [PATCH 1/4] Parse json using ClickHouse parser --- .../fq/libs/row_dispatcher/json_parser.cpp | 27 ++++++++++++++----- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 15 ++++++++--- ydb/core/fq/libs/row_dispatcher/ut/ya.make | 1 + 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 2c3a3d38f024..20dd62ba8b61 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -258,14 +258,29 @@ class TJsonParser::TImpl { private: TString GenerateSql(const TVector& columns) { - TStringStream str; - str << "$json = SELECT CAST(data AS Json) as `Json`, " << OffsetFieldName << " FROM Input;"; - str << "\nSELECT " << OffsetFieldName << ", "; + TStringStream structType; + structType << "Struct<"; for (auto it = columns.begin(); it != columns.end(); ++it) { - str << R"(CAST(Unwrap(JSON_VALUE(`Json`, "$.)" << *it << "\")) as String) as " - << *it << ((it != columns.end() - 1) ? "," : ""); + structType << *it << ": String" << ((it != columns.end() - 1) ? ", " : ">"); } - str << " FROM $json;"; + + TStringStream str; + str << R"( + $parse_json_each_row = YQL::Udf( + AsAtom("ClickHouseClient.ParseFormat"), + Void(), + TupleType( + TupleType(String, Uint64), + Struct<>, + TupleType()" << structType.Str() << R"(, Uint64) + ), + AsAtom("json_each_row") + ); + + $parsed_tuples = SELECT YQL::Collect($parse_json_each_row(YQL::ToStream([(data, )" << OffsetFieldName << R"()]))) AS parsed_value FROM Input; + $parsed_structs = SELECT AddMember(parsed_value.0, ")" << OffsetFieldName << R"(", parsed_value.1) FROM $parsed_tuples FLATTEN LIST BY parsed_value; + SELECT * FROM $parsed_structs FLATTEN COLUMNS; + )"; LOG_ROW_DISPATCHER_DEBUG("GenerateSql " << str.Str()); return str.Str(); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 58f19a4ee262..5b41de41607d 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -6,6 +6,9 @@ #include #include #include + +#include + #include namespace { @@ -32,9 +35,13 @@ class TFixture : public NUnitTest::TBaseFixture { } void MakeParser(TVector columns, NFq::TJsonParser::TCallback callback) { - Parser = NFq::NewJsonParser( - columns, - callback); + try { + Parser = NFq::NewJsonParser( + columns, + callback); + } catch (NYql::NPureCalc::TCompileError compileError) { + UNIT_ASSERT_C(false, TStringBuilder() << "Failed to create json parser: " << compileError.what() << "\nQuery text:\n" << compileError.GetYql() << "Reason:\n" << compileError.GetIssues()); + } } TActorSystemStub actorSystemStub; @@ -102,7 +109,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeParser({"a2", "a1"}, [&](ui64, TList&&){ }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, " Failed to unwrap empty optional"); + UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, " DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)"); } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/ya.make b/ydb/core/fq/libs/row_dispatcher/ut/ya.make index bb66ec57798f..25242d092f28 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ut/ya.make @@ -20,6 +20,7 @@ PEERDIR( ydb/library/yql/udfs/common/yson2 ydb/tests/fq/pq_async_io ydb/library/yql/sql/pg_dummy + ydb/library/yql/udfs/common/clickhouse/client ) SIZE(MEDIUM) From 25f881ae2694ee5f62c2190f722dd46790d98a65 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 30 Sep 2024 09:03:23 +0000 Subject: [PATCH 2/4] Supported column types --- .../fq/libs/row_dispatcher/json_parser.cpp | 58 +++++++++++-------- ydb/core/fq/libs/row_dispatcher/json_parser.h | 2 + .../fq/libs/row_dispatcher/topic_session.cpp | 1 + .../libs/row_dispatcher/ut/json_parser_ut.cpp | 7 ++- 4 files changed, 44 insertions(+), 24 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 20dd62ba8b61..368078611274 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -230,8 +230,9 @@ class TJsonParser::TImpl { public: TImpl( const TVector& columns, + const TVector& types, TCallback callback) - : Sql(GenerateSql(columns)) { + : Sql(GenerateSql(columns, types)) { auto options = NYql::NPureCalc::TProgramFactoryOptions(); auto factory = NYql::NPureCalc::MakeProgramFactory(options); @@ -240,7 +241,7 @@ class TJsonParser::TImpl { TParserInputSpec(), TParserOutputSpec(MakeOutputSchema(columns)), Sql, - NYql::NPureCalc::ETranslationMode::SQL + NYql::NPureCalc::ETranslationMode::SExpr ); LOG_ROW_DISPATCHER_DEBUG("Program created"); InputConsumer = Program->Apply(MakeHolder(callback)); @@ -257,29 +258,38 @@ class TJsonParser::TImpl { } private: - TString GenerateSql(const TVector& columns) { - TStringStream structType; - structType << "Struct<"; - for (auto it = columns.begin(); it != columns.end(); ++it) { - structType << *it << ": String" << ((it != columns.end() - 1) ? ", " : ">"); + TString GenerateSql(const TVector& columnNames, const TVector& columnTypes) { + Y_ABORT_UNLESS(columnNames.size() == columnTypes.size(), "Unexpected column types size"); + + TStringStream udfOutputType; + TStringStream resultType; + for (size_t i = 0; i < columnNames.size(); ++i) { + const TString& lastSymbol = i + 1 == columnNames.size() ? "" : " "; + const TString& column = columnNames[i]; + const TString& type = columnTypes[i]; + + udfOutputType << "'('" << column << " (DataType '" << type << "))" << lastSymbol; + resultType << "'('" << column << " (SafeCast (Member $parsed '" << column << ") $string_type))" << lastSymbol; } TStringStream str; str << R"( - $parse_json_each_row = YQL::Udf( - AsAtom("ClickHouseClient.ParseFormat"), - Void(), - TupleType( - TupleType(String, Uint64), - Struct<>, - TupleType()" << structType.Str() << R"(, Uint64) - ), - AsAtom("json_each_row") - ); - - $parsed_tuples = SELECT YQL::Collect($parse_json_each_row(YQL::ToStream([(data, )" << OffsetFieldName << R"()]))) AS parsed_value FROM Input; - $parsed_structs = SELECT AddMember(parsed_value.0, ")" << OffsetFieldName << R"(", parsed_value.1) FROM $parsed_tuples FLATTEN LIST BY parsed_value; - SELECT * FROM $parsed_structs FLATTEN COLUMNS; + ( + (let $string_type (DataType 'String)) + + (let $input_type (TupleType (DataType 'String) (DataType 'Uint64))) + (let $output_type (TupleType (StructType )" << udfOutputType.Str() << R"() (DataType 'Uint64))) + (let $udf_argument_type (TupleType $input_type (StructType) $output_type)) + (let $udf_callable_type (CallableType '('1) '((StreamType $output_type)) '((StreamType $input_type)) '((OptionalType (DataType 'Utf8))))) + (let $udf (Udf 'ClickHouseClient.ParseFormat (Void) $udf_argument_type 'json_each_row $udf_callable_type (VoidType) '"" '())) + + (return (Map (Apply $udf (Map (Self '0) (lambda '($input) (block '( + (return '((Member $input 'data) (Member $input ')" << OffsetFieldName << R"())) + ))))) (lambda '($output) (block '( + (let $parsed (Nth $output '0)) + (return (AsStruct '(')" << OffsetFieldName << R"( (Nth $output '1)) )" << resultType.Str() << R"()) + ))))) + ) )"; LOG_ROW_DISPATCHER_DEBUG("GenerateSql " << str.Str()); return str.Str(); @@ -293,8 +303,9 @@ class TJsonParser::TImpl { TJsonParser::TJsonParser( const TVector& columns, + const TVector& types, TCallback callback) - : Impl(std::make_unique(columns, callback)) { + : Impl(std::make_unique(columns, types, callback)) { } TJsonParser::~TJsonParser() { @@ -310,8 +321,9 @@ TString TJsonParser::GetSql() { std::unique_ptr NewJsonParser( const TVector& columns, + const TVector& types, TCallback callback) { - return std::unique_ptr(new TJsonParser(columns, callback)); + return std::unique_ptr(new TJsonParser(columns, types, callback)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index d67761b401ce..cb5137105e6b 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -13,6 +13,7 @@ class TJsonParser { public: TJsonParser( const TVector& columns, + const TVector& types, TCallback callback); ~TJsonParser(); void Push(ui64 offset, const TString& value); @@ -25,6 +26,7 @@ class TJsonParser { std::unique_ptr NewJsonParser( const TVector& columns, + const TVector& types, TJsonParser::TCallback callback); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 2294f097a5fc..9623806ee87a 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -676,6 +676,7 @@ void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& source NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); Parser = NewJsonParser( GetVector(sourceParams.GetColumns()), + GetVector(sourceParams.GetColumnTypes()), [actorSystem, selfId = SelfId()](ui64 offset, TList&& value){ actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(offset, std::move(value))); }); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 5b41de41607d..1a84f7ccd2cd 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -34,16 +34,21 @@ class TFixture : public NUnitTest::TBaseFixture { } } - void MakeParser(TVector columns, NFq::TJsonParser::TCallback callback) { + void MakeParser(TVector columns, TVector types, NFq::TJsonParser::TCallback callback) { try { Parser = NFq::NewJsonParser( columns, + types, callback); } catch (NYql::NPureCalc::TCompileError compileError) { UNIT_ASSERT_C(false, TStringBuilder() << "Failed to create json parser: " << compileError.what() << "\nQuery text:\n" << compileError.GetYql() << "Reason:\n" << compileError.GetIssues()); } } + void MakeParser(TVector columns, NFq::TJsonParser::TCallback callback) { + MakeParser(columns, TVector(columns.size(), "String"), callback); + } + TActorSystemStub actorSystemStub; NActors::TTestActorRuntime Runtime; std::unique_ptr Parser; From cc26fb8e4df8d17c3ab38930078d524e6f331f04 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 30 Sep 2024 09:21:36 +0000 Subject: [PATCH 3/4] Fixed unit tests --- ydb/core/fq/libs/row_dispatcher/json_parser.cpp | 2 +- ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp | 6 +++--- ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 368078611274..5459d6b84acd 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -277,7 +277,7 @@ class TJsonParser::TImpl { ( (let $string_type (DataType 'String)) - (let $input_type (TupleType (DataType 'String) (DataType 'Uint64))) + (let $input_type (TupleType $string_type (DataType 'Uint64))) (let $output_type (TupleType (StructType )" << udfOutputType.Str() << R"() (DataType 'Uint64))) (let $udf_argument_type (TupleType $input_type (StructType) $output_type)) (let $udf_callable_type (CallableType '('1) '((StreamType $output_type)) '((StreamType $input_type)) '((OptionalType (DataType 'Utf8))))) diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 1a84f7ccd2cd..686a4facdf59 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -58,11 +58,11 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple1, TFixture) { TList result; ui64 resultOffset; - MakeParser({"a1", "a2"}, [&](ui64 offset, TList&& value){ + MakeParser({"a1", "a2"}, {"String", "Int32"}, [&](ui64 offset, TList&& value){ resultOffset = offset; result = std::move(value); }); - Parser->Push(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); + Parser->Push(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); @@ -114,7 +114,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeParser({"a2", "a1"}, [&](ui64, TList&&){ }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, " DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)"); + UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, "DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)"); } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 8e7036f057b5..ba24378e0a35 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -81,7 +81,7 @@ class TFixture : public NUnitTest::TBaseFixture { settings.SetDatabase(GetDefaultPqDatabase()); settings.AddColumns("dt"); settings.AddColumns("value"); - settings.AddColumnTypes("UInt64"); + settings.AddColumnTypes("Uint64"); settings.AddColumnTypes("String"); if (!emptyPredicate) { settings.SetPredicate("WHERE true"); @@ -263,7 +263,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { "not json", "noch einmal / nicht json" }; PQWrite(data, topicName); - ExpectSessionError(ReadActorId1, "Failed to unwrap empty optional"); + ExpectSessionError(ReadActorId1, "DB::ParsingException: Cannot parse input: expected '{' before: 'not json': (at row 1)"); StopSession(ReadActorId1, source); } From c532befc70c756dcef139ba5c557afada232d472 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 30 Sep 2024 09:46:28 +0000 Subject: [PATCH 4/4] Added SkipOptional --- ydb/core/fq/libs/row_dispatcher/json_parser.cpp | 10 +++++++++- ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 5459d6b84acd..84ca3018b509 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -266,7 +266,7 @@ class TJsonParser::TImpl { for (size_t i = 0; i < columnNames.size(); ++i) { const TString& lastSymbol = i + 1 == columnNames.size() ? "" : " "; const TString& column = columnNames[i]; - const TString& type = columnTypes[i]; + const TString& type = SkipOptional(columnTypes[i]); udfOutputType << "'('" << column << " (DataType '" << type << "))" << lastSymbol; resultType << "'('" << column << " (SafeCast (Member $parsed '" << column << ") $string_type))" << lastSymbol; @@ -295,6 +295,14 @@ class TJsonParser::TImpl { return str.Str(); } + static TString SkipOptional(TStringBuf type) { + if (type.StartsWith("Optional")) { + Y_ABORT_UNLESS(type.SkipPrefix("Optional<")); + Y_ABORT_UNLESS(type.ChopSuffix(">")); + } + return TString(type); + } + private: THolder> Program; THolder> InputConsumer; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 686a4facdf59..a9c389d3900f 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -58,7 +58,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple1, TFixture) { TList result; ui64 resultOffset; - MakeParser({"a1", "a2"}, {"String", "Int32"}, [&](ui64 offset, TList&& value){ + MakeParser({"a1", "a2"}, {"String", "Optional"}, [&](ui64 offset, TList&& value){ resultOffset = offset; result = std::move(value); });