Skip to content

Commit cd072fe

Browse files
committed
Supported optional columns as nulls
1 parent eca5b6e commit cd072fe

File tree

3 files changed

+47
-7
lines changed

3 files changed

+47
-7
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,32 @@ using TCallback = NFq::TJsonFilter::TCallback;
1616
const char* OffsetFieldName = "_offset";
1717
TString LogPrefix = "JsonFilter: ";
1818

19+
NYT::TNode CreateTypeNode(const TString& fieldType) {
20+
return NYT::TNode::CreateList()
21+
.Add("DataType")
22+
.Add(fieldType);
23+
}
24+
1925
void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
2026
node.Add(
2127
NYT::TNode::CreateList()
2228
.Add(fieldName)
23-
.Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType))
29+
.Add(CreateTypeNode(fieldType))
30+
);
31+
}
32+
33+
void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
34+
node.Add(NYT::TNode::CreateList()
35+
.Add(fieldName)
36+
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
2437
);
2538
}
2639

2740
NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
2841
auto structMembers = NYT::TNode::CreateList();
2942
AddField(structMembers, OffsetFieldName, "Uint64");
3043
for (const auto& col : columns) {
31-
AddField(structMembers, col, "String");
44+
AddOptionalField(structMembers, col, "String");
3245
}
3346
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
3447
}
@@ -112,7 +125,9 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
112125

113126
size_t fieldId = 0;
114127
for (const auto& column : values.second) {
115-
items[FieldsPositions[fieldId++]] = NKikimr::NMiniKQL::MakeString(column[rowId]);
128+
items[FieldsPositions[fieldId++]] = column[rowId].data()
129+
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
130+
: NKikimr::NUdf::TUnboxedValuePod();
116131
}
117132

118133
Worker->Push(std::move(result));

ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#include <ydb/core/base/backtrace.h>
2+
13
#include <ydb/core/fq/libs/ydb/ydb.h>
24
#include <ydb/core/fq/libs/events/events.h>
35

@@ -23,6 +25,8 @@ class TFixture : public NUnitTest::TBaseFixture {
2325
TAutoPtr<TAppPrepare> app = new TAppPrepare();
2426
Runtime.Initialize(app->Unwrap());
2527
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);
28+
29+
NKikimr::EnableYDBBacktraceFormat();
2630
}
2731

2832
void TearDown(NUnitTest::TTestContext& /* context */) override {
@@ -91,6 +95,20 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) {
9195
UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]);
9296
}
9397

98+
Y_UNIT_TEST_F(NullValues, TFixture) {
99+
TMap<ui64, TString> result;
100+
MakeFilter(
101+
{"a1"},
102+
{"Optional<String>"},
103+
"where a1 is null",
104+
[&](ui64 offset, const TString& json) {
105+
result[offset] = json;
106+
});
107+
Filter->Push({5}, {{std::string_view()}});
108+
UNIT_ASSERT_VALUES_EQUAL(1, result.size());
109+
UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null})", result[5]);
110+
}
111+
94112
Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
95113
MakeFilter(
96114
{"a1", "a2"},

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ def test_simple_not_null(self, kikimr, client):
155155
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
156156

157157
@yq_v1
158-
@pytest.mark.skip(reason="Is not implemented")
159158
def test_simple_optional(self, kikimr, client):
160159
client.create_yds_connection(
161160
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
@@ -249,6 +248,9 @@ def test_nested_types(self, kikimr, client):
249248
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
250249
stop_yds_query(client, query_id)
251250

251+
issues = str(client.describe_query(query_id).result.query.transient_issue)
252+
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues
253+
252254
@yq_v1
253255
def test_filter(self, kikimr, client):
254256
client.create_yds_connection(
@@ -296,14 +298,16 @@ def test_filter_missing_fields(self, kikimr, client):
296298
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
297299
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
298300
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL))
299-
WHERE data = "";'''
301+
WHERE data IS NULL;'''
300302

301303
query_id = start_yds_query(kikimr, client, sql)
302304
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
303305

304306
data = [
305307
'{"time": 101, "event": "event1"}',
306308
'{"time": 102, "data": null, "event": "event2"}',
309+
'{"time": 103, "data": "", "event": "event2"}',
310+
'{"time": 104, "data": "null", "event": "event2"}',
307311
]
308312

309313
self.write_stream(data)
@@ -313,6 +317,9 @@ def test_filter_missing_fields(self, kikimr, client):
313317
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
314318
stop_yds_query(client, query_id)
315319

320+
issues = str(client.describe_query(query_id).result.query.transient_issue)
321+
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues
322+
316323
@yq_v1
317324
def test_filter_use_unsupported_predicate(self, kikimr, client):
318325
client.create_yds_connection(
@@ -525,9 +532,9 @@ def test_stop_start_with_filter(self, kikimr, client):
525532
client.create_yds_connection(
526533
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
527534
)
528-
self.init_topics("test_stop_start", create_output=False)
535+
self.init_topics("test_stop_start_with_filter", create_output=False)
529536

530-
output_topic = "test_stop_start"
537+
output_topic = "test_stop_start_with_filter"
531538
create_stream(output_topic, partitions_count=1)
532539
create_read_rule(output_topic, self.consumer_name)
533540

0 commit comments

Comments
 (0)