Skip to content

Commit 07f9e3c

Browse files
authored
YQ RD support strings parsing in Json columns (#11341)
1 parent 979a3d1 commit 07f9e3c

File tree

3 files changed

+36
-15
lines changed

3 files changed

+36
-15
lines changed

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ class TColumnParser {
139139
}
140140

141141
static TParser GetJsonValueParser(NYql::NUdf::EDataSlot dataSlot, bool optional) {
142+
if (dataSlot == NYql::NUdf::EDataSlot::Json) {
143+
return GetJsonValueExtractor();
144+
}
145+
142146
const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot);
143147
return [dataSlot, optional, &typeInfo](simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
144148
switch (jsonValue.type()) {
@@ -199,16 +203,7 @@ class TColumnParser {
199203

200204
case simdjson::builtin::ondemand::json_type::array:
201205
case simdjson::builtin::ondemand::json_type::object: {
202-
const auto rawJson = jsonValue.raw_json().value();
203-
if (Y_UNLIKELY(dataSlot != NYql::NUdf::EDataSlot::Json)) {
204-
throw yexception() << "found unexpected nested value (raw: '" << TruncateString(rawJson) << "'), expected data type " <<typeInfo.Name << ", please use Json type for nested values";
205-
}
206-
if (Y_UNLIKELY(!NYql::NDom::IsValidJson(rawJson))) {
207-
throw yexception() << "found bad json value: '" << TruncateString(rawJson) << "'";
208-
}
209-
resultValue = NKikimr::NMiniKQL::MakeString(rawJson);
210-
LockObject(resultValue);
211-
break;
206+
throw yexception() << "found unexpected nested value (raw: '" << TruncateString(jsonValue.raw_json().value()) << "'), expected data type " <<typeInfo.Name << ", please use Json type for nested values";
212207
}
213208

214209
case simdjson::builtin::ondemand::json_type::boolean: {
@@ -230,6 +225,17 @@ class TColumnParser {
230225
};
231226
}
232227

228+
static TParser GetJsonValueExtractor() {
229+
return [](simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
230+
const auto rawJson = jsonValue.raw_json().value();
231+
if (Y_UNLIKELY(!NYql::NDom::IsValidJson(rawJson))) {
232+
throw yexception() << "found bad json value: '" << TruncateString(rawJson) << "'";
233+
}
234+
resultValue = NKikimr::NMiniKQL::MakeString(rawJson);
235+
LockObject(resultValue);
236+
};
237+
}
238+
233239
template <typename TResult, typename TJsonNumber>
234240
static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) {
235241
if (number < std::numeric_limits<TResult>::min() || std::numeric_limits<TResult>::max() < number) {
@@ -239,8 +245,13 @@ class TColumnParser {
239245
}
240246

241247
static void LockObject(NYql::NUdf::TUnboxedValue& value) {
248+
// All UnboxedValue's with type Boxed or String should be locked
249+
// because after parsing they will be used under another MKQL allocator in purecalc filters
250+
242251
const i32 numberRefs = value.LockRef();
243-
Y_ENSURE(numberRefs == -1 || numberRefs == 1);
252+
253+
// -1 - value is embbeded or empty, otherwise value should have exactly one ref
254+
Y_ENSURE(numberRefs == -1 || numberRefs == 1);
244255
}
245256

246257
static TString TruncateString(std::string_view rawString, size_t maxSize = 1_KB) {

ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,19 +185,27 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
185185

186186
Parser->AddMessages({
187187
GetMessage(42, R"({"a1": "hello1", "nested": {"key": "value"}})"),
188-
GetMessage(43, R"({"a1": "hello2", "nested": ["key1", "key2"]})")
188+
GetMessage(43, R"({"a1": "hello2", "nested": ["key1", "key2"]})"),
189+
GetMessage(43, R"({"a1": "hello3", "nested": "some string"})"),
190+
GetMessage(43, R"({"a1": "hello4", "nested": 123456})")
189191
});
190192

191193
const auto& result = Parser->Parse();
192194
ResultNumberValues = result.front().size();
193-
UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues);
195+
UNIT_ASSERT_VALUES_EQUAL(4, ResultNumberValues);
194196

195197
UNIT_ASSERT_VALUES_EQUAL(2, result.size());
196198
UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef()));
197199
UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef()));
198200

199201
UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef()));
200202
UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef()));
203+
204+
UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0][2].AsStringRef()));
205+
UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1][2].AsStringRef()));
206+
207+
UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0][3].AsStringRef()));
208+
UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1][3].AsStringRef()));
201209
}
202210

203211
Y_UNIT_TEST_F(SimpleBooleans, TFixture) {

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ def test_nested_types(self, kikimr, client):
241241
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
242242
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
243243
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data Json NOT NULL, event String NOT NULL))
244-
WHERE event = "event1" or event = "event2";'''
244+
WHERE event = "event1" or event = "event2" or event = "event4";'''
245245

246246
query_id = start_yds_query(kikimr, client, sql)
247247
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
@@ -251,12 +251,14 @@ def test_nested_types(self, kikimr, client):
251251
'{"time": 101, "data": {"key": "value", "second_key":"' + large_string + '"}, "event": "event1"}',
252252
'{"time": 102, "data": ["key1", "key2", "' + large_string + '"], "event": "event2"}',
253253
'{"time": 103, "data": ["' + large_string + '"], "event": "event3"}',
254+
'{"time": 104, "data": "' + large_string + '", "event": "event4"}',
254255
]
255256

256257
self.write_stream(data)
257258
expected = [
258259
'{"key": "value", "second_key":"' + large_string + '"}',
259-
'["key1", "key2", "' + large_string + '"]'
260+
'["key1", "key2", "' + large_string + '"]',
261+
'"' + large_string + '"'
260262
]
261263
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
262264

0 commit comments

Comments
 (0)