From cd072fe1abd50e8f67d376ceda8eacac79d2ac17 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 15 Oct 2024 12:28:04 +0000 Subject: [PATCH 1/4] Supported optional columns as nulls --- .../fq/libs/row_dispatcher/json_filter.cpp | 21 ++++++++++++++++--- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 18 ++++++++++++++++ ydb/tests/fq/yds/test_row_dispatcher.py | 15 +++++++++---- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 3ef0da86e6d5..30ee6e6b50bd 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -16,11 +16,24 @@ using TCallback = NFq::TJsonFilter::TCallback; const char* OffsetFieldName = "_offset"; TString LogPrefix = "JsonFilter: "; +NYT::TNode CreateTypeNode(const TString& fieldType) { + return NYT::TNode::CreateList() + .Add("DataType") + .Add(fieldType); +} + void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { node.Add( NYT::TNode::CreateList() .Add(fieldName) - .Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType)) + .Add(CreateTypeNode(fieldType)) + ); +} + +void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { + node.Add(NYT::TNode::CreateList() + .Add(fieldName) + .Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType))) ); } @@ -28,7 +41,7 @@ NYT::TNode MakeInputSchema(const TVector& columns) { auto structMembers = NYT::TNode::CreateList(); AddField(structMembers, OffsetFieldName, "Uint64"); for (const auto& col : columns) { - AddField(structMembers, col, "String"); + AddOptionalField(structMembers, col, "String"); } return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); } @@ -112,7 +125,9 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumerPush(std::move(result)); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 7bda6cd4c1fe..9c2305711715 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -1,3 +1,5 @@ +#include + #include #include @@ -23,6 +25,8 @@ class TFixture : public NUnitTest::TBaseFixture { TAutoPtr app = new TAppPrepare(); Runtime.Initialize(app->Unwrap()); Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG); + + NKikimr::EnableYDBBacktraceFormat(); } void TearDown(NUnitTest::TTestContext& /* context */) override { @@ -91,6 +95,20 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } + Y_UNIT_TEST_F(NullValues, TFixture) { + TMap result; + MakeFilter( + {"a1"}, + {"Optional"}, + "where a1 is null", + [&](ui64 offset, const TString& json) { + result[offset] = json; + }); + Filter->Push({5}, {{std::string_view()}}); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null})", result[5]); + } + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeFilter( {"a1", "a2"}, diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 61b760f3d98b..ad40c3ed3ff5 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -155,7 +155,6 @@ def test_simple_not_null(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @yq_v1 - @pytest.mark.skip(reason="Is not implemented") def test_simple_optional(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True @@ -249,6 +248,9 @@ def test_nested_types(self, kikimr, client): wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) stop_yds_query(client, query_id) + issues = str(client.describe_query(query_id).result.query.transient_issue) + assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues + @yq_v1 def test_filter(self, kikimr, client): client.create_yds_connection( @@ -296,7 +298,7 @@ def test_filter_missing_fields(self, kikimr, client): INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL)) - WHERE data = "";''' + WHERE data IS NULL;''' query_id = start_yds_query(kikimr, client, sql) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) @@ -304,6 +306,8 @@ def test_filter_missing_fields(self, kikimr, client): data = [ '{"time": 101, "event": "event1"}', '{"time": 102, "data": null, "event": "event2"}', + '{"time": 103, "data": "", "event": "event2"}', + '{"time": 104, "data": "null", "event": "event2"}', ] self.write_stream(data) @@ -313,6 +317,9 @@ def test_filter_missing_fields(self, kikimr, client): wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) stop_yds_query(client, query_id) + issues = str(client.describe_query(query_id).result.query.transient_issue) + assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues + @yq_v1 def test_filter_use_unsupported_predicate(self, kikimr, client): client.create_yds_connection( @@ -525,9 +532,9 @@ def test_stop_start_with_filter(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True ) - self.init_topics("test_stop_start", create_output=False) + self.init_topics("test_stop_start_with_filter", create_output=False) - output_topic = "test_stop_start" + output_topic = "test_stop_start_with_filter" create_stream(output_topic, partitions_count=1) create_read_rule(output_topic, self.consumer_name) From ccdcf6bc110997aea83086940f1012cd66713d72 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 15 Oct 2024 13:02:31 +0000 Subject: [PATCH 2/4] Added parsing validations --- ydb/core/fq/libs/row_dispatcher/json_filter.cpp | 8 +++++++- .../fq/libs/row_dispatcher/ut/json_filter_ut.cpp | 14 ++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 30ee6e6b50bd..9b61ab17ea89 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -279,7 +279,13 @@ class TJsonFilter::TImpl { } else if (columnType == "Optional") { columnType = "Optional"; } - str << "CAST(" << columnNames[i] << " as " << columnType << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : ""); + + if (columnType.StartsWith("Optional")) { + str << "IF(" << columnNames[i] << " IS NOT NULL, Unwrap(CAST(" << columnNames[i] << " as " << columnType << ")), NULL)"; + } else { + str << "Unwrap(CAST(" << columnNames[i] << " as " << columnType << "))"; + } + str << " as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : ""); } str << " FROM Input;\n"; str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n"; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 9c2305711715..7682485b4644 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -79,6 +79,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { Filter->Push({6}, {{"101"}, {"hello2"}}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); + UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({7}, {{"102"}, {std::string_view()}}), yexception, "Failed to unwrap empty optional"); + UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({8}, {{"str"}, {"hello3"}}), yexception, "Failed to unwrap empty optional"); } Y_UNIT_TEST_F(ManyValues, TFixture) { @@ -98,18 +100,19 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { Y_UNIT_TEST_F(NullValues, TFixture) { TMap result; MakeFilter( - {"a1"}, - {"Optional"}, + {"a1", "a2"}, + {"Optional", "String"}, "where a1 is null", [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {{std::string_view()}}); + Filter->Push({5}, {{std::string_view()}, {"str"}}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null})", result[5]); + UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null,"a2":"str"})", result[5]); + UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({5}, {{"hello1"}, {"str"}}), yexception, "Failed to unwrap empty optional"); } - Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeFilter( {"a1", "a2"}, {"String", "UInt64"}, @@ -120,4 +123,3 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { } } - From c77a5c6c63ac533ca621487001c0aae49ec31063 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 15 Oct 2024 14:39:48 +0000 Subject: [PATCH 3/4] Fixed style --- ydb/tests/fq/yds/test_row_dispatcher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index ad40c3ed3ff5..7d59d333d620 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -89,7 +89,7 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client): ) connections = client.list_connections(fq.Acl.Visibility.PRIVATE).result.connection assert len(connections) == 1 - assert connections[0].content.setting.data_streams.shared_reading == True + assert connections[0].content.setting.data_streams.shared_reading is True self.init_topics("test_read_raw_format_without_row_dispatcher", create_output=False) output_topic = "pq_test_pq_read_write_output" @@ -106,7 +106,7 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client): assert self.read_stream(len(data), topic_path=output_topic) == data wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) stop_yds_query(client, query_id) - + sql2 = Rf'''INSERT INTO {YDS_CONNECTION}.`{output_topic}` SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=raw, SCHEMA (data String NOT NULL)) WHERE data != "romashka";''' From 73c1a86b26e04070a8b8deccd56325cd1c3c1c2c Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 15 Oct 2024 15:14:42 +0000 Subject: [PATCH 4/4] Added comment --- ydb/core/fq/libs/row_dispatcher/json_filter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 9b61ab17ea89..04cf6771118b 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -125,7 +125,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer