Skip to content

Commit e098cea

Browse files
authored
YDB-2568 Enable match_recognize in ydb / stable-24-2 (#7502)
1 parent 7af4fdf commit e098cea

File tree

18 files changed

+244
-50
lines changed

18 files changed

+244
-50
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
184184
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
185185

186186
KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
187-
FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));
187+
FederatedQuerySetup, UserToken, QueryServiceConfig, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));
188188

189189
IKqpHost::TPrepareSettings prepareSettings;
190190
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;

ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
99

1010
#include <library/cpp/protobuf/util/pb_io.h>
11+
#include <ydb/core/protos/config.pb.h>
1112

1213
namespace NKikimr {
1314
namespace NKqp {
@@ -28,7 +29,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
2829
IModuleResolver::TPtr moduleResolver;
2930
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));
3031

31-
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, false, false, nullptr, actorSystem);
32+
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, NKikimrConfig::TQueryServiceConfig(),nullptr, false, false, nullptr, actorSystem);
3233
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
3334
result.Issues().PrintTo(Cerr);
3435
UNIT_ASSERT(result.Success());

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,7 @@ class TKqpHost : public IKqpHost {
922922
TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database,
923923
TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
924924
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
925+
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
925926
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
926927
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
927928
NActors::TActorSystem* actorSystem = nullptr)
@@ -938,6 +939,7 @@ class TKqpHost : public IKqpHost {
938939
, FakeWorld(ExprCtx->NewWorld(TPosition()))
939940
, ExecuteCtx(MakeIntrusive<TExecuteContext>())
940941
, ActorSystem(actorSystem ? actorSystem : NActors::TActivationContext::ActorSystem())
942+
, QueryServiceConfig(queryServiceConfig)
941943
{
942944
if (funcRegistry) {
943945
FuncRegistry = funcRegistry;
@@ -1559,10 +1561,15 @@ class TKqpHost : public IKqpHost {
15591561
|| settingName == "Warning"
15601562
|| settingName == "UseBlocks"
15611563
|| settingName == "BlockEngine"
1564+
|| settingName == "TimeOrderRecoverDelay"
1565+
|| settingName == "TimeOrderRecoverAhead"
1566+
|| settingName == "TimeOrderRecoverRowLimit"
1567+
|| settingName == "MatchRecognizeStream"
15621568
;
15631569
};
15641570
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
15651571
TypesCtx->AddDataSource(ConfigProviderName, configProvider);
1572+
TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize();
15661573

15671574
YQL_ENSURE(TypesCtx->Initialize(*ExprCtx));
15681575

@@ -1653,6 +1660,7 @@ class TKqpHost : public IKqpHost {
16531660

16541661
TKqpTempTablesState::TConstPtr TempTablesState;
16551662
NActors::TActorSystem* ActorSystem = nullptr;
1663+
NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
16561664
};
16571665

16581666
} // namespace
@@ -1673,10 +1681,11 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats
16731681
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
16741682
const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
16751683
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
1684+
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
16761685
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
16771686
TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem)
16781687
{
1679-
return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
1688+
return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, queryServiceConfig, funcRegistry,
16801689
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem);
16811690
}
16821691

ydb/core/kqp/host/kqp_host.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ class IKqpHost : public TThrRefBase {
109109

110110
TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
111111
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
112-
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
112+
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
113+
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
113114
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
114115
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/);
115116

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h>
77
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
88

9+
#include <ydb/library/yql/core/yql_opt_match_recognize.h>
910
#include <ydb/library/yql/core/yql_opt_utils.h>
1011
#include <ydb/library/yql/dq/opt/dq_opt_join.h>
1112
#include <ydb/library/yql/dq/opt/dq_opt_log.h>
@@ -53,6 +54,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
5354
AddHandler(0, &TKqlLookupTableBase::Match, HNDL(ApplyExtractMembersToLookupTable<false>));
5455
AddHandler(0, &TCoTop::Match, HNDL(TopSortOverExtend));
5556
AddHandler(0, &TCoTopSort::Match, HNDL(TopSortOverExtend));
57+
AddHandler(0, &TCoMatchRecognize::Match, HNDL(MatchRecognize));
5658

5759
AddHandler(1, &TCoFlatMap::Match, HNDL(LatePushExtractedPredicateToReadTable));
5860
AddHandler(1, &TCoTop::Match, HNDL(RewriteTopSortOverIndexRead));
@@ -234,6 +236,14 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
234236
return output;
235237
}
236238

239+
TMaybeNode<TExprBase> MatchRecognize(TExprBase node, TExprContext& ctx) {
240+
auto output = ExpandMatchRecognize(node.Ptr(), ctx, TypesCtx);
241+
if (output) {
242+
DumpAppliedRule("MatchRecognize", node.Ptr(), output, ctx);
243+
}
244+
return output;
245+
}
246+
237247
template <bool IsGlobal>
238248
TMaybeNode<TExprBase> ApplyExtractMembersToReadTable(TExprBase node, TExprContext& ctx,
239249
const TGetParents& getParents)

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
188188
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
189189

190190
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver,
191-
FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
191+
FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), QueryServiceConfig, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
192192

193193
auto& queryRequest = QueryState->RequestEv;
194194
QueryState->ProxyRequestId = proxyRequestId;

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
5555

5656
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}});
5757
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
58-
federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
58+
federatedQuerySetup, nullptr, NKikimrConfig::TQueryServiceConfig(), funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
5959
}
6060

6161
NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) {

ydb/core/kqp/ut/yql/kqp_pragma_ut.cpp

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,93 @@ Y_UNIT_TEST_SUITE(KqpPragma) {
8484
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
8585
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
8686
}
87+
88+
Y_UNIT_TEST(MatchRecognizeWithTimeOrderRecoverer) {
89+
TKikimrSettings settings;
90+
NKikimrConfig::TAppConfig appConfig;
91+
appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
92+
settings.SetAppConfig(appConfig);
93+
94+
TKikimrRunner kikimr(settings);
95+
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
96+
97+
auto result = client.ExecuteYqlScript(R"(
98+
PRAGMA FeatureR010="prototype";
99+
100+
CREATE TABLE `/Root/NewTable` (
101+
dt Uint64,
102+
value String,
103+
PRIMARY KEY (dt)
104+
);
105+
COMMIT;
106+
107+
INSERT INTO `/Root/NewTable` (dt, value) VALUES
108+
(1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
109+
COMMIT;
110+
111+
SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
112+
MATCH_RECOGNIZE(
113+
ORDER BY CAST(dt as Timestamp)
114+
MEASURES
115+
LAST(V1.dt) as v1,
116+
LAST(V4.dt) as v4
117+
ONE ROW PER MATCH
118+
PATTERN (V1 V* V4)
119+
DEFINE
120+
V1 as V1.value = "value1",
121+
V as True,
122+
V4 as V4.value = "value4"
123+
);
124+
)").GetValueSync();
125+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
126+
CompareYson(R"([
127+
[[1u];[4u]];
128+
])", FormatResultSetYson(result.GetResultSet(0)));
129+
}
130+
131+
Y_UNIT_TEST(MatchRecognizeWithoutTimeOrderRecoverer) {
132+
TKikimrSettings settings;
133+
NKikimrConfig::TAppConfig appConfig;
134+
appConfig.MutableQueryServiceConfig()->SetEnableMatchRecognize(true);
135+
settings.SetAppConfig(appConfig);
136+
137+
TKikimrRunner kikimr(settings);
138+
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
139+
140+
auto result = client.ExecuteYqlScript(R"(
141+
PRAGMA FeatureR010="prototype";
142+
PRAGMA config.flags("MatchRecognizeStream", "disable");
143+
144+
CREATE TABLE `/Root/NewTable` (
145+
dt Uint64,
146+
value String,
147+
PRIMARY KEY (dt)
148+
);
149+
COMMIT;
150+
151+
INSERT INTO `/Root/NewTable` (dt, value) VALUES
152+
(1, 'value1'), (2, 'value2'), (3, 'value3'), (4, 'value4');
153+
COMMIT;
154+
155+
SELECT * FROM (SELECT dt, value FROM `/Root/NewTable`)
156+
MATCH_RECOGNIZE(
157+
ORDER BY CAST(dt as Timestamp)
158+
MEASURES
159+
LAST(V1.dt) as v1,
160+
LAST(V4.dt) as v4
161+
ONE ROW PER MATCH
162+
PATTERN (V1 V* V4)
163+
DEFINE
164+
V1 as V1.value = "value1",
165+
V as True,
166+
V4 as V4.value = "value4"
167+
);
168+
)").GetValueSync();
169+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
170+
CompareYson(R"([
171+
[[1u];[4u]];
172+
])", FormatResultSetYson(result.GetResultSet(0)));
173+
}
87174
}
88175

89176
} // namspace NKqp

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,7 @@ message TQueryServiceConfig {
10501050
optional NYql.TGenericGatewayConfig Generic = 11;
10511051
optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12;
10521052
optional uint64 ProgressStatsPeriodMs = 14 [default = 0]; // 0 = disabled
1053+
optional bool EnableMatchRecognize = 20 [default = false];
10531054
}
10541055

10551056
// Config describes immediate controls and allows

ydb/library/yql/core/yql_opt_match_recognize.cpp

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
146146
ExtractSortKeyAndOrder(pos, sortTraits, sortKey, sortOrder, ctx);
147147
TExprNode::TPtr result;
148148
if (isStreaming) {
149-
YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE on streams");
149+
YQL_ENSURE(sortOrder->ChildrenSize() == 1, "Expect ORDER BY timestamp for MATCH_RECOGNIZE");
150150
const auto reordered = ctx.Builder(pos)
151151
.Lambda()
152152
.Param("partition")
@@ -216,37 +216,15 @@ TExprNode::TPtr ExpandMatchRecognize(const TExprNode::TPtr& node, TExprContext&
216216
.Seal()
217217
.Build();
218218
} else { //non-streaming
219-
if (partitionColumns->ChildrenSize() != 0) {
220-
result = ctx.Builder(pos)
221-
.Callable("PartitionsByKeys")
222-
.Add(0, input)
223-
.Add(1, partitionKeySelector)
224-
.Add(2, sortOrder)
225-
.Add(3, sortKey)
226-
.Add(4, matchRecognize)
227-
.Seal()
228-
.Build();
229-
} else {
230-
if (sortOrder->IsCallable("Void")) {
231-
result = ctx.Builder(pos)
232-
.Apply(matchRecognize)
233-
.With(0, input)
234-
.Seal()
235-
.Build();;
236-
} else {
237-
result = ctx.Builder(pos)
238-
.Apply(matchRecognize)
239-
.With(0)
240-
.Callable("Sort")
241-
.Add(0, input)
242-
.Add(1, sortOrder)
243-
.Add(2, sortKey)
244-
.Seal()
245-
.Done()
246-
.Seal()
247-
.Build();
248-
}
249-
}
219+
result = ctx.Builder(pos)
220+
.Callable("PartitionsByKeys")
221+
.Add(0, input)
222+
.Add(1, partitionKeySelector)
223+
.Add(2, sortOrder)
224+
.Add(3, sortKey)
225+
.Add(4, matchRecognize)
226+
.Seal()
227+
.Build();
250228
}
251229
YQL_CLOG(INFO, Core) << "Expanded MatchRecognize";
252230
return result;

0 commit comments

Comments
 (0)