Skip to content

Commit e441aea

Browse files
authored
YDB FQ: move ListSplits call from the request execution to the request optimization phase (#14497)
1 parent d222a17 commit e441aea

23 files changed

+606
-399
lines changed

ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,6 @@ namespace NKikimr::NKqp {
110110
clientMock->ExpectListSplits()
111111
.Select()
112112
.DataSourceInstance(dataSourceInstance)
113-
.What()
114-
.Column("col1", Ydb::Type::UINT16)
115-
.Done()
116113
.Done()
117114
.Result()
118115
.AddResponse(NewSuccess())
@@ -209,9 +206,6 @@ namespace NKikimr::NKqp {
209206
clientMock->ExpectListSplits()
210207
.Select()
211208
.DataSourceInstance(dataSourceInstance)
212-
.What()
213-
// Empty
214-
.Done()
215209
.Done()
216210
.Result()
217211
.AddResponse(NewSuccess())
@@ -302,9 +296,6 @@ namespace NKikimr::NKqp {
302296
clientMock->ExpectListSplits()
303297
.Select()
304298
.DataSourceInstance(dataSourceInstance)
305-
.What()
306-
// Empty
307-
.Done()
308299
.Done()
309300
.Result()
310301
.AddResponse(NewSuccess())
@@ -375,8 +366,12 @@ namespace NKikimr::NKqp {
375366
auto clientMock = std::make_shared<TConnectorClientMock>();
376367

377368
const NYql::TGenericDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType);
369+
378370
// clang-format off
379-
const NApi::TSelect select = TConnectorClientMock::TSelectBuilder<>()
371+
const NApi::TSelect selectInListSplits = TConnectorClientMock::TSelectBuilder<>()
372+
.DataSourceInstance(dataSourceInstance).GetResult();
373+
374+
const NApi::TSelect selectInReadSplits = TConnectorClientMock::TSelectBuilder<>()
380375
.DataSourceInstance(dataSourceInstance)
381376
.What()
382377
.NullableColumn("data_column", Ydb::Type::STRING)
@@ -406,11 +401,11 @@ namespace NKikimr::NKqp {
406401
// step 2: ListSplits
407402
// clang-format off
408403
clientMock->ExpectListSplits()
409-
.Select(select)
404+
.Select(selectInListSplits)
410405
.Result()
411406
.AddResponse(NewSuccess())
412407
.Description("some binary description")
413-
.Select(select);
408+
.Select(selectInReadSplits);
414409
// clang-format on
415410

416411
// step 3: ReadSplits
@@ -424,7 +419,7 @@ namespace NKikimr::NKqp {
424419
.Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL)
425420
.Split()
426421
.Description("some binary description")
427-
.Select(select)
422+
.Select(selectInReadSplits)
428423
.Done()
429424
.Result()
430425
.AddResponse(MakeRecordBatch(

ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
160160
;
161161
// clang-format on
162162

163-
NYql::Generic::TLookupSource lookupSourceSettings;
163+
NYql::NGeneric::TLookupSource lookupSourceSettings;
164164
*lookupSourceSettings.mutable_data_source_instance() = dsi;
165165
lookupSourceSettings.Settable("lookup_test");
166166
lookupSourceSettings.SetServiceAccountId("testsaid");
@@ -351,7 +351,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) {
351351
}
352352
// clang-format on
353353

354-
NYql::Generic::TLookupSource lookupSourceSettings;
354+
NYql::NGeneric::TLookupSource lookupSourceSettings;
355355
*lookupSourceSettings.mutable_data_source_instance() = dsi;
356356
lookupSourceSettings.Settable("lookup_test");
357357
lookupSourceSettings.SetServiceAccountId("testsaid");

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
1717
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
1818
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
19-
#include <ydb/library/yql/providers/generic/proto/range.pb.h>
2019
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
2120
#include <yql/essentials/public/udf/arrow/util.h>
2221
#include <yql/essentials/utils/log/log.h>
@@ -69,7 +68,7 @@ namespace NYql::NDq {
6968
::NMonitoring::TDynamicCounterPtr taskCounters,
7069
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
7170
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
72-
NYql::Generic::TLookupSource&& lookupSource,
71+
NYql::NGeneric::TLookupSource&& lookupSource,
7372
const NKikimr::NMiniKQL::TStructType* keyType,
7473
const NKikimr::NMiniKQL::TStructType* payloadType,
7574
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
@@ -500,7 +499,7 @@ namespace NYql::NDq {
500499
const NActors::TActorId ParentId;
501500
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
502501
std::shared_ptr<TKeyTypeHelper> KeyTypeHelper;
503-
const NYql::Generic::TLookupSource LookupSource;
502+
const NYql::NGeneric::TLookupSource LookupSource;
504503
const NKikimr::NMiniKQL::TStructType* const KeyType;
505504
const NKikimr::NMiniKQL::TStructType* const PayloadType;
506505
const NKikimr::NMiniKQL::TStructType* const SelectResultType; // columns from KeyType + PayloadType
@@ -529,7 +528,7 @@ namespace NYql::NDq {
529528
::NMonitoring::TDynamicCounterPtr taskCounters,
530529
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
531530
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
532-
NYql::Generic::TLookupSource&& lookupSource,
531+
NYql::NGeneric::TLookupSource&& lookupSource,
533532
const NKikimr::NMiniKQL::TStructType* keyType,
534533
const NKikimr::NMiniKQL::TStructType* payloadType,
535534
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,

ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace NYql::NDq {
1818
::NMonitoring::TDynamicCounterPtr taskCounters,
1919
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> alloc,
2020
std::shared_ptr<IDqAsyncLookupSource::TKeyTypeHelper> keyTypeHelper,
21-
NYql::Generic::TLookupSource&& lookupSource,
21+
NYql::NGeneric::TLookupSource&& lookupSource,
2222
const NKikimr::NMiniKQL::TStructType* keyType,
2323
const NKikimr::NMiniKQL::TStructType* payloadType,
2424
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,

ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,23 @@ namespace NYql::NDq {
1111
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
1212
NYql::NConnector::IClient::TPtr genericClient) {
1313
auto readActorFactory = [credentialsFactory, genericClient](
14-
Generic::TSource&& settings,
14+
NGeneric::TSource&& settings,
1515
IDqAsyncIoFactory::TSourceArguments&& args) {
16-
return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel,
17-
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
16+
return CreateGenericReadActor(
17+
genericClient,
18+
std::move(settings),
19+
args.InputIndex,
20+
args.StatsLevel,
21+
args.SecureParams,
22+
args.TaskId,
23+
args.TaskParams,
24+
args.ReadRanges,
25+
args.ComputeActorId,
26+
credentialsFactory,
27+
args.HolderFactory);
1828
};
1929

20-
auto lookupActorFactory = [credentialsFactory, genericClient](NYql::Generic::TLookupSource&& lookupSource, IDqAsyncIoFactory::TLookupSourceArguments&& args) {
30+
auto lookupActorFactory = [credentialsFactory, genericClient](NYql::NGeneric::TLookupSource&& lookupSource, IDqAsyncIoFactory::TLookupSourceArguments&& args) {
2131
return CreateGenericLookupActor(
2232
genericClient,
2333
credentialsFactory,
@@ -34,17 +44,16 @@ namespace NYql::NDq {
3444
};
3545

3646
for (auto& name : {
37-
"ClickHouseGeneric",
38-
"PostgreSqlGeneric",
39-
"YdbGeneric",
40-
"MySqlGeneric",
41-
"GreenplumGeneric",
42-
"MsSQLServerGeneric",
43-
"OracleGeneric",
44-
"LoggingGeneric"}
45-
) {
46-
factory.RegisterSource<Generic::TSource>(name, readActorFactory);
47-
factory.RegisterLookupSource<Generic::TLookupSource>(name, lookupActorFactory);
47+
"ClickHouseGeneric",
48+
"PostgreSqlGeneric",
49+
"YdbGeneric",
50+
"MySqlGeneric",
51+
"GreenplumGeneric",
52+
"MsSQLServerGeneric",
53+
"OracleGeneric",
54+
"LoggingGeneric"}) {
55+
factory.RegisterSource<NGeneric::TSource>(name, readActorFactory);
56+
factory.RegisterLookupSource<NGeneric::TLookupSource>(name, lookupActorFactory);
4857
}
4958
}
5059

0 commit comments

Comments
 (0)