Skip to content

Commit f1765fb

Browse files
authored
Support PG literals in KQP (#11446)
1 parent d478158 commit f1765fb

File tree

10 files changed

+354
-63
lines changed

10 files changed

+354
-63
lines changed

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,20 @@ struct TSerializerCtx {
131131
};
132132

133133
TString GetExprStr(const TExprBase& scalar, bool quoteStr = true) {
134+
TMaybe<TString> literal = Nothing();
134135
if (auto maybeData = scalar.Maybe<TCoDataCtor>()) {
135-
auto literal = TString(maybeData.Cast().Literal());
136-
CollapseText(literal, 32);
136+
literal = TString(maybeData.Cast().Literal());
137+
} else if (auto maybeData = scalar.Maybe<TCoPgConst>()) {
138+
literal = TString(maybeData.Cast().Value());
139+
}
140+
141+
if (literal) {
142+
CollapseText(*literal, 32);
137143

138144
if (quoteStr) {
139-
return TStringBuilder() << '"' << literal << '"';
145+
return TStringBuilder() << '"' << *literal << '"';
140146
} else {
141-
return literal;
147+
return *literal;
142148
}
143149
}
144150

@@ -1709,6 +1715,8 @@ class TxPlanSerializer {
17091715
res = NUuid::UuidBytesToString(literal.Cast().Literal().StringValue());
17101716
} else if (auto literal = key.Maybe<TCoDataCtor>()) {
17111717
res = literal.Cast().Literal().StringValue();
1718+
} else if (auto literal = key.Maybe<TCoPgConst>()) {
1719+
res = literal.Cast().Value().StringValue();
17121720
} else if (auto literal = key.Maybe<TCoNothing>()) {
17131721
res = TString("null");
17141722
}

ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/kqp/common/kqp_yql.h>
44
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
55
#include <ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h>
6+
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
67
#include <ydb/core/tx/schemeshard/schemeshard_utils.h>
78

89
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
@@ -82,19 +83,31 @@ TMaybeNode<TDqPhyPrecompute> BuildLookupKeysPrecompute(const TExprBase& input, T
8283
bool IsLiteralNothing(TExprBase node) {
8384
if (node.Maybe<TCoNothing>()) {
8485
auto* type = node.Raw()->GetTypeAnn();
85-
if (type->GetKind() != ETypeAnnotationKind::Optional) {
86-
return false;
87-
}
88-
type = type->Cast<TOptionalExprType>()->GetItemType();
86+
switch (type->GetKind()) {
87+
case ETypeAnnotationKind::Optional: {
88+
type = type->Cast<TOptionalExprType>()->GetItemType();
8989

90-
if (type->GetKind() != ETypeAnnotationKind::Data) {
91-
return false;
92-
}
90+
if (type->GetKind() != ETypeAnnotationKind::Data) {
91+
return false;
92+
}
9393

94-
auto slot = type->Cast<TDataExprType>()->GetSlot();
95-
auto typeId = NKikimr::NUdf::GetDataTypeInfo(slot).TypeId;
94+
auto slot = type->Cast<TDataExprType>()->GetSlot();
95+
auto typeId = NKikimr::NUdf::GetDataTypeInfo(slot).TypeId;
9696

97-
return (NKikimr::NScheme::NTypeIds::IsYqlType(typeId) && NKikimr::NSchemeShard::IsAllowedKeyType(NKikimr::NScheme::TTypeInfo(typeId)));
97+
return (
98+
NKikimr::NScheme::NTypeIds::IsYqlType(typeId)
99+
&& NKikimr::NSchemeShard::IsAllowedKeyType(NKikimr::NScheme::TTypeInfo(typeId))
100+
);
101+
}
102+
case ETypeAnnotationKind::Pg: {
103+
auto pgTypeId = type->Cast<TPgExprType>()->GetId();
104+
return NKikimr::NSchemeShard::IsAllowedKeyType(
105+
NKikimr::NScheme::TTypeInfo(NKikimr::NPg::TypeDescFromPgTypeId(pgTypeId))
106+
);
107+
}
108+
default:
109+
return false;
110+
}
98111
} else {
99112
return false;
100113
}
@@ -119,7 +132,11 @@ TExprBase KqpBuildReadTableStage(TExprBase node, TExprContext& ctx, const TKqpOp
119132
return false;
120133
}
121134

122-
if (!value.Maybe<TCoDataCtor>() && !value.Maybe<TCoParameter>() && !IsLiteralNothing(value)) {
135+
if (!value.Maybe<TCoDataCtor>()
136+
&& !value.Maybe<TCoParameter>()
137+
&& !value.Maybe<TCoPgConst>()
138+
&& !IsLiteralNothing(value))
139+
{
123140
literalRange = false;
124141
}
125142

@@ -404,8 +421,7 @@ bool RequireLookupPrecomputeStage(const TKqlLookupTable& lookup) {
404421
if (tuple.Value().Maybe<TCoParameter>()) {
405422
// pass
406423
} else if (tuple.Value().Maybe<TCoDataCtor>()) {
407-
// TODO: support pg types
408-
Y_ENSURE(tuple.Value().Ref().GetTypeAnn()->GetKind() != NYql::ETypeAnnotationKind::Pg);
424+
Y_ENSURE(tuple.Value().Ref().GetTypeAnn()->GetKind() == NYql::ETypeAnnotationKind::Data);
409425
auto slot = tuple.Value().Ref().GetTypeAnn()->Cast<TDataExprType>()->GetSlot();
410426
auto typeId = NUdf::GetDataTypeInfo(slot).TypeId;
411427
auto typeInfo = NScheme::TTypeInfo(typeId);
@@ -414,6 +430,15 @@ bool RequireLookupPrecomputeStage(const TKqlLookupTable& lookup) {
414430
} else {
415431
return true;
416432
}
433+
} else if (tuple.Value().Maybe<TCoPgConst>()) {
434+
Y_ENSURE(tuple.Value().Ref().GetTypeAnn()->GetKind() == NYql::ETypeAnnotationKind::Pg);
435+
auto pgTypeId = tuple.Value().Ref().GetTypeAnn()->Cast<TPgExprType>()->GetId();
436+
auto typeInfo = NKikimr::NScheme::TTypeInfo(NKikimr::NPg::TypeDescFromPgTypeId(pgTypeId));
437+
if (NKikimr::NSchemeShard::IsAllowedKeyType(typeInfo)) {
438+
// pass
439+
} else {
440+
return true;
441+
}
417442
} else if (!tuple.Value().IsValid() || !IsLiteralNothing(tuple.Value().Cast())) {
418443
return true;
419444
}

ydb/core/kqp/provider/yql_kikimr_provider.cpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -566,8 +566,7 @@ template<typename TProto>
566566
void FillLiteralProtoImpl(const NNodes::TCoDataCtor& literal, TProto& proto) {
567567
auto type = literal.Ref().GetTypeAnn();
568568

569-
// TODO: support pg types
570-
YQL_ENSURE(type->GetKind() != ETypeAnnotationKind::Pg, "pg types are not supported");
569+
YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::Data, "unexpected type: " << type->GetKind());
571570

572571
auto slot = type->Cast<TDataExprType>()->GetSlot();
573572
auto typeId = NKikimr::NUdf::GetDataTypeInfo(slot).TypeId;
@@ -655,6 +654,23 @@ void FillLiteralProto(const NNodes::TCoDataCtor& literal, NKqpProto::TKqpPhyLite
655654
FillLiteralProtoImpl(literal, proto);
656655
}
657656

657+
void FillLiteralProto(const NNodes::TCoPgConst& pgLiteral, NKqpProto::TKqpPhyLiteralValue& proto) {
658+
auto type = pgLiteral.Ref().GetTypeAnn();
659+
auto actualPgType = type->Cast<TPgExprType>();
660+
auto typeDesc = NKikimr::NPg::TypeDescFromPgTypeId(actualPgType->GetId());
661+
662+
auto& protoType = *proto.MutableType();
663+
auto& protoValue = *proto.MutableValue();
664+
665+
protoType.SetKind(NKikimrMiniKQL::ETypeKind::Pg);
666+
protoType.MutablePg()->Setoid(actualPgType->GetId());
667+
668+
TString content = TString(pgLiteral.Value().Value());
669+
auto parseResult = NKikimr::NPg::PgNativeBinaryFromNativeText(content, typeDesc);
670+
671+
protoValue.SetBytes(parseResult.Str.data(), parseResult.Str.size());
672+
}
673+
658674
bool IsPgNullExprNode(const NNodes::TExprBase& maybeLiteral) {
659675
return maybeLiteral.Ptr()->IsCallable() &&
660676
maybeLiteral.Ptr()->Content() == "PgCast" && maybeLiteral.Ptr()->ChildrenSize() >= 1 &&
@@ -711,8 +727,7 @@ void FillLiteralProto(const NNodes::TCoDataCtor& literal, Ydb::TypedValue& proto
711727
{
712728
auto type = literal.Ref().GetTypeAnn();
713729

714-
// TODO: support pg types
715-
YQL_ENSURE(type->GetKind() != ETypeAnnotationKind::Pg, "pg types are not supported");
730+
YQL_ENSURE(type->GetKind() == ETypeAnnotationKind::Data, "unexpected type: " << type->GetKind());
716731

717732
auto slot = type->Cast<TDataExprType>()->GetSlot();
718733
auto typeId = NKikimr::NUdf::GetDataTypeInfo(slot).TypeId;

ydb/core/kqp/provider/yql_kikimr_provider_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ std::optional<TString> FillLiteralProto(NNodes::TExprBase maybeLiteral, const TT
284284
void FillLiteralProto(const NNodes::TCoDataCtor& literal, Ydb::TypedValue& proto);
285285
// todo gvit switch to ydb typed value.
286286
void FillLiteralProto(const NNodes::TCoDataCtor& literal, NKqpProto::TKqpPhyLiteralValue& proto);
287+
void FillLiteralProto(const NNodes::TCoPgConst& literal, NKqpProto::TKqpPhyLiteralValue& proto);
287288

288289
// Optimizer rules
289290
TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx, TStringBuf database, TIntrusivePtr<TKikimrTablesData> tablesData,

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,8 @@ void FillColumns(const TContainer& columns, const TKikimrTableMetadata& tableMet
235235
}
236236
}
237237

238-
void FillNothing(TCoNothing expr, NKqpProto::TKqpPhyLiteralValue& value) {
239-
auto* typeann = expr.Raw()->GetTypeAnn();
240-
YQL_ENSURE(typeann->GetKind() == ETypeAnnotationKind::Optional);
241-
typeann = typeann->Cast<TOptionalExprType>()->GetItemType();
242-
YQL_ENSURE(typeann->GetKind() == ETypeAnnotationKind::Data);
243-
auto slot = typeann->Cast<TDataExprType>()->GetSlot();
238+
void FillNothingData(const TDataExprType& dataType, NKqpProto::TKqpPhyLiteralValue& value) {
239+
auto slot = dataType.GetSlot();
244240
auto typeId = NKikimr::NUdf::GetDataTypeInfo(slot).TypeId;
245241

246242
YQL_ENSURE(NKikimr::NScheme::NTypeIds::IsYqlType(typeId) &&
@@ -253,16 +249,43 @@ void FillNothing(TCoNothing expr, NKqpProto::TKqpPhyLiteralValue& value) {
253249
toFill->MutableData()->SetScheme(typeId);
254250

255251
if (slot == EDataSlot::Decimal) {
256-
const auto paramsDataType = typeann->Cast<TDataExprParamsType>();
257-
auto precision = FromString<ui8>(paramsDataType->GetParamOne());
258-
auto scale = FromString<ui8>(paramsDataType->GetParamTwo());
252+
const auto& paramsDataType = *dataType.Cast<TDataExprParamsType>();
253+
auto precision = FromString<ui8>(paramsDataType.GetParamOne());
254+
auto scale = FromString<ui8>(paramsDataType.GetParamTwo());
259255
toFill->MutableData()->MutableDecimalParams()->SetPrecision(precision);
260256
toFill->MutableData()->MutableDecimalParams()->SetScale(scale);
261257
}
262258

263259
value.MutableValue()->SetNullFlagValue(::google::protobuf::NullValue::NULL_VALUE);
264260
}
265261

262+
void FillNothingPg(const TPgExprType& pgType, NKqpProto::TKqpPhyLiteralValue& value) {
263+
value.MutableType()->SetKind(NKikimrMiniKQL::Pg);
264+
value.MutableType()->MutablePg()->Setoid(pgType.GetId());
265+
266+
value.MutableValue()->SetNullFlagValue(::google::protobuf::NullValue::NULL_VALUE);
267+
}
268+
269+
void FillNothing(TCoNothing expr, NKqpProto::TKqpPhyLiteralValue& value) {
270+
auto* typeann = expr.Raw()->GetTypeAnn();
271+
switch (typeann->GetKind()) {
272+
case ETypeAnnotationKind::Optional: {
273+
typeann = typeann->Cast<TOptionalExprType>()->GetItemType();
274+
YQL_ENSURE(
275+
typeann->GetKind() == ETypeAnnotationKind::Data,
276+
"Unexpected type in Nothing.Optional: " << typeann->GetKind());
277+
FillNothingData(*typeann->Cast<TDataExprType>(), value);
278+
return;
279+
}
280+
case ETypeAnnotationKind::Pg: {
281+
FillNothingPg(*typeann->Cast<TPgExprType>(), value);
282+
return;
283+
}
284+
default:
285+
YQL_ENSURE(false, "Unexpected type in Nothing: " << typeann->GetKind());
286+
}
287+
}
288+
266289
void FillKeyBound(const TVarArgCallable<TExprBase>& bound, NKqpProto::TKqpPhyKeyBound& boundProto) {
267290
if (bound.Maybe<TKqlKeyInc>()) {
268291
boundProto.SetIsInclusive(true);
@@ -286,6 +309,8 @@ void FillKeyBound(const TVarArgCallable<TExprBase>& bound, NKqpProto::TKqpPhyKey
286309
paramElementProto.SetElementIndex(FromString<ui32>(key.Cast<TCoNth>().Index().Value()));
287310
} else if (auto maybeLiteral = key.Maybe<TCoDataCtor>()) {
288311
FillLiteralProto(maybeLiteral.Cast(), *protoValue.MutableLiteralValue());
312+
} else if (auto maybePgLiteral = key.Maybe<TCoPgConst>()) {
313+
FillLiteralProto(maybePgLiteral.Cast(), *protoValue.MutableLiteralValue());
289314
} else if (auto maybeNull = key.Maybe<TCoNothing>()) {
290315
FillNothing(maybeNull.Cast(), *protoValue.MutableLiteralValue());
291316
} else {

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,7 +1202,7 @@ std::vector<NJson::TJsonValue> FindPlanStages(const NJson::TJsonValue& plan) {
12021202
return stages;
12031203
}
12041204

1205-
void CreateSampleTablesWithIndex(TSession& session, bool populateTables) {
1205+
void CreateSampleTablesWithIndex(TSession& session, bool populateTables, bool withPgTypes) {
12061206
auto res = session.ExecuteSchemeQuery(R"(
12071207
--!syntax_v1
12081208
CREATE TABLE `/Root/SecondaryKeys` (
@@ -1228,11 +1228,26 @@ void CreateSampleTablesWithIndex(TSession& session, bool populateTables) {
12281228
PRIMARY KEY (Key),
12291229
INDEX Index GLOBAL ON (Index2)
12301230
COVER (Value)
1231-
)
1231+
);
12321232
12331233
)").GetValueSync();
12341234
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
12351235

1236+
if (withPgTypes) {
1237+
auto res = session.ExecuteSchemeQuery(R"(
1238+
--!syntax_v1
1239+
CREATE TABLE `/Root/SecondaryPgTypeKeys` (
1240+
Key pgint4,
1241+
Fk pgint4,
1242+
Value String,
1243+
PRIMARY KEY (Key),
1244+
INDEX Index GLOBAL ON (Fk)
1245+
);
1246+
1247+
)").GetValueSync();
1248+
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
1249+
}
1250+
12361251
if (!populateTables)
12371252
return;
12381253

@@ -1272,6 +1287,19 @@ void CreateSampleTablesWithIndex(TSession& session, bool populateTables) {
12721287
12731288
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
12741289

1290+
if (withPgTypes) {
1291+
auto result = session.ExecuteDataQuery(R"(
1292+
1293+
REPLACE INTO `/Root/SecondaryPgTypeKeys` (Key, Fk, Value) VALUES
1294+
(1pi, 1pi, "Payload1"),
1295+
(2pi, 2pi, "Payload2"),
1296+
(5pi, 5pi, "Payload5"),
1297+
(NULL, 6pi, "Payload6"),
1298+
(7pi, NULL, "Payload7");
1299+
1300+
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
1301+
}
1302+
12751303
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
12761304
}
12771305

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ inline void AssertSuccessResult(const NYdb::TStatus& result) {
358358
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
359359
}
360360

361-
void CreateSampleTablesWithIndex(NYdb::NTable::TSession& session, bool populateTables = true);
361+
void CreateSampleTablesWithIndex(NYdb::NTable::TSession& session, bool populateTables = true, bool withPgTypes = false);
362362

363363
void InitRoot(Tests::TServer::TPtr server, TActorId sender);
364364

0 commit comments

Comments
 (0)