Skip to content

Commit 838079f

Browse files
committed
Supported unboxed values passing from parser to filters
1 parent 6af3402 commit 838079f

File tree

10 files changed

+444
-254
lines changed

10 files changed

+444
-254
lines changed

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <ydb/library/yql/providers/common/schema/parser/yql_type_parser.h>
12
#include <ydb/library/yql/public/udf/udf_version.h>
23
#include <ydb/library/yql/public/purecalc/purecalc.h>
34
#include <ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
@@ -23,6 +24,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) {
2324
.Add(fieldType);
2425
}
2526

27+
NYT::TNode CreateOptionalTypeNode(const TString& fieldType) {
28+
return NYT::TNode::CreateList()
29+
.Add("OptionalType")
30+
.Add(CreateTypeNode(fieldType));
31+
}
32+
2633
void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
2734
node.Add(
2835
NYT::TNode::CreateList()
@@ -31,18 +38,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
3138
);
3239
}
3340

34-
void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
35-
node.Add(NYT::TNode::CreateList()
36-
.Add(fieldName)
37-
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
41+
void AddTypedField(NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) {
42+
NYT::TNode parsedType;
43+
Y_ENSURE(NYql::NCommon::ParseYson(parsedType, fieldTypeYson, Cerr), "Invalid field type");
44+
45+
// TODO: remove this when the re-parsing is removed from pq read actor
46+
if (parsedType == CreateTypeNode("Json")) {
47+
parsedType = CreateTypeNode("String");
48+
} else if (parsedType == CreateOptionalTypeNode("Json")) {
49+
parsedType = CreateOptionalTypeNode("String");
50+
}
51+
52+
node.Add(
53+
NYT::TNode::CreateList()
54+
.Add(fieldName)
55+
.Add(parsedType)
3856
);
3957
}
4058

41-
NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
59+
NYT::TNode MakeInputSchema(const TVector<TString>& columns, const TVector<TString>& types) {
4260
auto structMembers = NYT::TNode::CreateList();
4361
AddField(structMembers, OffsetFieldName, "Uint64");
44-
for (const auto& col : columns) {
45-
AddOptionalField(structMembers, col, "String");
62+
for (size_t i = 0; i < columns.size(); ++i) {
63+
AddTypedField(structMembers, columns[i], types[i]);
4664
}
4765
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
4866
}
@@ -68,7 +86,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
6886
TVector<NYT::TNode> Schemas;
6987
};
7088

71-
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
89+
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
7290
public:
7391
TFilterInputConsumer(
7492
const TFilterInputSpec& spec,
@@ -106,15 +124,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
106124
}
107125
}
108126

109-
void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
127+
void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
110128
Y_ENSURE(FieldsPositions.size() == values.second.size());
111129

112130
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
113131
with_lock (Worker->GetScopedAlloc()) {
114132
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
115133

116134
// TODO: use blocks here
117-
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
135+
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
118136
NYql::NUdf::TUnboxedValue* items = nullptr;
119137

120138
NYql::NUdf::TUnboxedValue result = Cache.NewArray(
@@ -126,13 +144,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
126144

127145
size_t fieldId = 0;
128146
for (const auto& column : values.second) {
129-
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
130-
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
131-
: NKikimr::NUdf::TUnboxedValuePod();
147+
items[FieldsPositions[fieldId++]] = column->at(rowId);
132148
}
133149

134150
Worker->Push(std::move(result));
135151
}
152+
153+
// Clear cache after on each object because
154+
// values allocated on another allocator and should be released
155+
Cache.Clear();
136156
}
137157
}
138158

@@ -216,7 +236,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
216236
static constexpr bool IsPartial = false;
217237
static constexpr bool SupportPushStreamMode = true;
218238

219-
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;
239+
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;
220240

221241
static TConsumerType MakeConsumer(
222242
const TFilterInputSpec& spec,
@@ -243,13 +263,19 @@ class TJsonFilter::TImpl {
243263
TImpl(const TVector<TString>& columns,
244264
const TVector<TString>& types,
245265
const TString& whereFilter,
246-
TCallback callback)
247-
: Sql(GenerateSql(columns, types, whereFilter)) {
248-
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());
266+
TCallback callback,
267+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc)
268+
: Sql(GenerateSql(whereFilter)) {
269+
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
270+
auto factory = NYql::NPureCalc::MakeProgramFactory(
271+
NYql::NPureCalc::TProgramFactoryOptions().SetScopedAlloc(std::move(alloc))
272+
);
249273

274+
// Program should be stateless because input values
275+
// allocated on another allocator and should be released
250276
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
251277
Program = factory->MakePushStreamProgram(
252-
TFilterInputSpec(MakeInputSchema(columns)),
278+
TFilterInputSpec(MakeInputSchema(columns, types)),
253279
TFilterOutputSpec(MakeOutputSchema()),
254280
Sql,
255281
NYql::NPureCalc::ETranslationMode::SQL
@@ -258,7 +284,7 @@ class TJsonFilter::TImpl {
258284
LOG_ROW_DISPATCHER_DEBUG("Program created");
259285
}
260286

261-
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
287+
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
262288
Y_ENSURE(values, "Expected non empty schema");
263289
InputConsumer->OnObject(std::make_pair(offsets, values));
264290
}
@@ -268,29 +294,9 @@ class TJsonFilter::TImpl {
268294
}
269295

270296
private:
271-
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes, const TString& whereFilter) {
297+
TString GenerateSql(const TString& whereFilter) {
272298
TStringStream str;
273-
str << "$fields = SELECT ";
274-
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
275-
str << OffsetFieldName << ", ";
276-
for (size_t i = 0; i < columnNames.size(); ++i) {
277-
TString columnType = columnTypes[i];
278-
TString columnName = NFq::EncloseAndEscapeString(columnNames[i], '`');
279-
if (columnType == "Json") {
280-
columnType = "String";
281-
} else if (columnType == "Optional<Json>") {
282-
columnType = "Optional<String>";
283-
}
284-
285-
if (columnType.StartsWith("Optional")) {
286-
str << "IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << ")), NULL)";
287-
} else {
288-
str << "Unwrap(CAST(" << columnName << " as " << columnType << "))";
289-
}
290-
str << " as " << columnName << ((i != columnNames.size() - 1) ? "," : "");
291-
}
292-
str << " FROM Input;\n";
293-
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
299+
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";
294300

295301
str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
296302
str << "\"])))) as data FROM $filtered";
@@ -300,22 +306,23 @@ class TJsonFilter::TImpl {
300306

301307
private:
302308
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
303-
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
309+
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
304310
const TString Sql;
305311
};
306312

307313
TJsonFilter::TJsonFilter(
308314
const TVector<TString>& columns,
309315
const TVector<TString>& types,
310316
const TString& whereFilter,
311-
TCallback callback)
312-
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback)) {
317+
TCallback callback,
318+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc)
319+
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, std::move(alloc))) {
313320
}
314321

315322
TJsonFilter::~TJsonFilter() {
316323
}
317324

318-
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
325+
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
319326
Impl->Push(offsets, values);
320327
}
321328

@@ -327,8 +334,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
327334
const TVector<TString>& columns,
328335
const TVector<TString>& types,
329336
const TString& whereFilter,
330-
TCallback callback) {
331-
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback));
337+
TCallback callback,
338+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc) {
339+
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, std::move(alloc)));
332340
}
333341

334342
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#pragma once
22

3-
#include <ydb/library/yql/public/udf/udf_data_type.h>
4-
#include <ydb/library/yql/public/udf/udf_value.h>
3+
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
54

65
namespace NFq {
76

@@ -14,11 +13,12 @@ class TJsonFilter {
1413
const TVector<TString>& columns,
1514
const TVector<TString>& types,
1615
const TString& whereFilter,
17-
TCallback callback);
16+
TCallback callback,
17+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc);
1818

1919
~TJsonFilter();
2020

21-
void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
21+
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
2222
TString GetSql();
2323

2424
private:
@@ -30,6 +30,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
3030
const TVector<TString>& columns,
3131
const TVector<TString>& types,
3232
const TString& whereFilter,
33-
TJsonFilter::TCallback callback);
33+
TJsonFilter::TCallback callback,
34+
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc);
3435

3536
} // namespace NFq

0 commit comments

Comments
 (0)