Skip to content

Commit b46ef3e

Browse files
authored
YQ-3535 RD supported optional columns as NULLs (#10452)
1 parent 1aab418 commit b46ef3e

File tree

3 files changed

+60
-12
lines changed

3 files changed

+60
-12
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,32 @@ using TCallback = NFq::TJsonFilter::TCallback;
1616
const char* OffsetFieldName = "_offset";
1717
TString LogPrefix = "JsonFilter: ";
1818

19+
NYT::TNode CreateTypeNode(const TString& fieldType) {
20+
return NYT::TNode::CreateList()
21+
.Add("DataType")
22+
.Add(fieldType);
23+
}
24+
1925
void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
2026
node.Add(
2127
NYT::TNode::CreateList()
2228
.Add(fieldName)
23-
.Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType))
29+
.Add(CreateTypeNode(fieldType))
30+
);
31+
}
32+
33+
void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
34+
node.Add(NYT::TNode::CreateList()
35+
.Add(fieldName)
36+
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
2437
);
2538
}
2639

2740
NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
2841
auto structMembers = NYT::TNode::CreateList();
2942
AddField(structMembers, OffsetFieldName, "Uint64");
3043
for (const auto& col : columns) {
31-
AddField(structMembers, col, "String");
44+
AddOptionalField(structMembers, col, "String");
3245
}
3346
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
3447
}
@@ -112,7 +125,9 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
112125

113126
size_t fieldId = 0;
114127
for (const auto& column : values.second) {
115-
items[FieldsPositions[fieldId++]] = NKikimr::NMiniKQL::MakeString(column[rowId]);
128+
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
129+
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
130+
: NKikimr::NUdf::TUnboxedValuePod();
116131
}
117132

118133
Worker->Push(std::move(result));
@@ -264,7 +279,13 @@ class TJsonFilter::TImpl {
264279
} else if (columnType == "Optional<Json>") {
265280
columnType = "Optional<String>";
266281
}
267-
str << "CAST(" << columnNames[i] << " as " << columnType << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
282+
283+
if (columnType.StartsWith("Optional")) {
284+
str << "IF(" << columnNames[i] << " IS NOT NULL, Unwrap(CAST(" << columnNames[i] << " as " << columnType << ")), NULL)";
285+
} else {
286+
str << "Unwrap(CAST(" << columnNames[i] << " as " << columnType << "))";
287+
}
288+
str << " as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
268289
}
269290
str << " FROM Input;\n";
270291
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";

ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include <ydb/core/base/backtrace.h>
2+
13
#include <ydb/core/fq/libs/ydb/ydb.h>
24
#include <ydb/core/fq/libs/events/events.h>
35

@@ -23,6 +25,8 @@ class TFixture : public NUnitTest::TBaseFixture {
2325
TAutoPtr<TAppPrepare> app = new TAppPrepare();
2426
Runtime.Initialize(app->Unwrap());
2527
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);
28+
29+
NKikimr::EnableYDBBacktraceFormat();
2630
}
2731

2832
void TearDown(NUnitTest::TTestContext& /* context */) override {
@@ -75,6 +79,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) {
7579
Filter->Push({6}, {{"101"}, {"hello2"}});
7680
UNIT_ASSERT_VALUES_EQUAL(1, result.size());
7781
UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]);
82+
UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({7}, {{"102"}, {std::string_view()}}), yexception, "Failed to unwrap empty optional");
83+
UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({8}, {{"str"}, {"hello3"}}), yexception, "Failed to unwrap empty optional");
7884
}
7985

8086
Y_UNIT_TEST_F(ManyValues, TFixture) {
@@ -91,7 +97,22 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) {
9197
UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]);
9298
}
9399

94-
Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
100+
Y_UNIT_TEST_F(NullValues, TFixture) {
101+
TMap<ui64, TString> result;
102+
MakeFilter(
103+
{"a1", "a2"},
104+
{"Optional<UInt64>", "String"},
105+
"where a1 is null",
106+
[&](ui64 offset, const TString& json) {
107+
result[offset] = json;
108+
});
109+
Filter->Push({5}, {{std::string_view()}, {"str"}});
110+
UNIT_ASSERT_VALUES_EQUAL(1, result.size());
111+
UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null,"a2":"str"})", result[5]);
112+
UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({5}, {{"hello1"}, {"str"}}), yexception, "Failed to unwrap empty optional");
113+
}
114+
115+
Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
95116
MakeFilter(
96117
{"a1", "a2"},
97118
{"String", "UInt64"},
@@ -102,4 +123,3 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) {
102123
}
103124

104125
}
105-

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client):
8989
)
9090
connections = client.list_connections(fq.Acl.Visibility.PRIVATE).result.connection
9191
assert len(connections) == 1
92-
assert connections[0].content.setting.data_streams.shared_reading == True
92+
assert connections[0].content.setting.data_streams.shared_reading is True
9393

9494
self.init_topics("test_read_raw_format_without_row_dispatcher", create_output=False)
9595
output_topic = "pq_test_pq_read_write_output"
@@ -106,7 +106,7 @@ def test_read_raw_format_with_row_dispatcher(self, kikimr, client):
106106
assert self.read_stream(len(data), topic_path=output_topic) == data
107107
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
108108
stop_yds_query(client, query_id)
109-
109+
110110
sql2 = Rf'''INSERT INTO {YDS_CONNECTION}.`{output_topic}`
111111
SELECT * FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=raw, SCHEMA (data String NOT NULL))
112112
WHERE data != "romashka";'''
@@ -155,7 +155,6 @@ def test_simple_not_null(self, kikimr, client):
155155
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
156156

157157
@yq_v1
158-
@pytest.mark.skip(reason="Is not implemented")
159158
def test_simple_optional(self, kikimr, client):
160159
client.create_yds_connection(
161160
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
@@ -249,6 +248,9 @@ def test_nested_types(self, kikimr, client):
249248
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
250249
stop_yds_query(client, query_id)
251250

251+
issues = str(client.describe_query(query_id).result.query.transient_issue)
252+
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues
253+
252254
@yq_v1
253255
def test_filter(self, kikimr, client):
254256
client.create_yds_connection(
@@ -296,14 +298,16 @@ def test_filter_missing_fields(self, kikimr, client):
296298
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
297299
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
298300
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL))
299-
WHERE data = "";'''
301+
WHERE data IS NULL;'''
300302

301303
query_id = start_yds_query(kikimr, client, sql)
302304
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
303305

304306
data = [
305307
'{"time": 101, "event": "event1"}',
306308
'{"time": 102, "data": null, "event": "event2"}',
309+
'{"time": 103, "data": "", "event": "event2"}',
310+
'{"time": 104, "data": "null", "event": "event2"}',
307311
]
308312

309313
self.write_stream(data)
@@ -313,6 +317,9 @@ def test_filter_missing_fields(self, kikimr, client):
313317
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
314318
stop_yds_query(client, query_id)
315319

320+
issues = str(client.describe_query(query_id).result.query.transient_issue)
321+
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues
322+
316323
@yq_v1
317324
def test_filter_use_unsupported_predicate(self, kikimr, client):
318325
client.create_yds_connection(
@@ -525,9 +532,9 @@ def test_stop_start_with_filter(self, kikimr, client):
525532
client.create_yds_connection(
526533
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
527534
)
528-
self.init_topics("test_stop_start", create_output=False)
535+
self.init_topics("test_stop_start_with_filter", create_output=False)
529536

530-
output_topic = "test_stop_start"
537+
output_topic = "test_stop_start_with_filter"
531538
create_stream(output_topic, partitions_count=1)
532539
create_read_rule(output_topic, self.consumer_name)
533540

0 commit comments

Comments
 (0)