Skip to content

Commit 1049202

Browse files
authored
[KQP] Add configuration for the custom partition pruning for DataExecuter and PartitionedExecuter (#19919)
1 parent 7fe228b commit 1049202

28 files changed

+642
-819
lines changed

ydb/core/kqp/common/batch/batch_operation_settings.cpp

Lines changed: 0 additions & 17 deletions
This file was deleted.

ydb/core/kqp/common/batch/batch_operation_settings.h

Lines changed: 0 additions & 18 deletions
This file was deleted.

ydb/core/kqp/common/batch/params.h

Lines changed: 0 additions & 15 deletions
This file was deleted.

ydb/core/kqp/common/batch/ya.make

Lines changed: 0 additions & 13 deletions
This file was deleted.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "kqp_batch_operations.h"
2+
3+
namespace NKikimr::NKqp::NBatchOperations {
4+
5+
TSerializedTableRange MakePartitionRange(TMaybe<TKeyDesc::TPartitionRangeInfo> begin, TMaybe<TKeyDesc::TPartitionRangeInfo> end, size_t keySize) {
6+
TVector<TCell> tableBegin;
7+
TVector<TCell> tableEnd;
8+
9+
bool inclusiveTableBegin = (begin) ? begin->IsInclusive : false;
10+
bool inclusiveTableEnd = (end) ? end->IsInclusive : false;
11+
12+
if (!begin || !begin->EndKeyPrefix) {
13+
inclusiveTableBegin = true;
14+
tableBegin.resize(keySize, TCell());
15+
} else {
16+
const auto& cells = begin->EndKeyPrefix.GetCells();
17+
tableBegin.assign(cells.begin(), cells.end());
18+
}
19+
20+
if (!end || !end->EndKeyPrefix) {
21+
inclusiveTableEnd = true;
22+
} else {
23+
const auto& cells = end->EndKeyPrefix.GetCells();
24+
tableEnd.assign(cells.begin(), cells.end());
25+
}
26+
27+
return TSerializedTableRange{tableBegin, inclusiveTableBegin, tableEnd, inclusiveTableEnd};
28+
}
29+
30+
TSettings ImportSettingsFromProto(const NKikimrConfig::TTableServiceConfig::TBatchOperationSettings& settings) {
31+
TSettings res;
32+
33+
res.MaxBatchSize = settings.GetMaxBatchSize();
34+
res.MinBatchSize = settings.GetMinBatchSize();
35+
res.MaxRetryDelayMs = settings.GetMaxRetryDelayMs();
36+
res.StartRetryDelayMs = settings.GetStartRetryDelayMs();
37+
res.PartitionExecutionLimit = settings.GetPartitionExecutionLimit();
38+
39+
return res;
40+
}
41+
42+
} // namespace NKikimr::NKqp::NBatchOperations
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#pragma once
2+
3+
#include <ydb/core/protos/table_service_config.pb.h>
4+
#include <ydb/core/scheme/scheme_tabledefs.h>
5+
6+
#include <util/generic/fwd.h>
7+
8+
namespace NKikimr::NKqp::NBatchOperations {
9+
10+
TSerializedTableRange MakePartitionRange(TMaybe<TKeyDesc::TPartitionRangeInfo> begin, TMaybe<TKeyDesc::TPartitionRangeInfo> end, size_t keySize);
11+
12+
struct TSettings {
13+
ui64 MaxBatchSize = 10000;
14+
ui64 MinBatchSize = 1;
15+
ui64 MaxRetryDelayMs = 30000;
16+
ui64 StartRetryDelayMs = 50;
17+
ui64 PartitionExecutionLimit = 10;
18+
};
19+
20+
TSettings ImportSettingsFromProto(const NKikimrConfig::TTableServiceConfig::TBatchOperationSettings& settings);
21+
22+
} // namespace NKikimr::NKqp::NBatchOperations

ydb/core/kqp/common/ya.make

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ ENDIF()
66

77
SRCS(
88
control.cpp
9+
kqp_batch_operations.cpp
910
kqp_event_ids.h
1011
kqp_event_impl.cpp
1112
kqp_lwtrace_probes.cpp
@@ -31,9 +32,10 @@ SRCS(
3132
PEERDIR(
3233
ydb/core/base
3334
ydb/core/engine
35+
ydb/core/protos
36+
ydb/core/scheme
3437
ydb/core/kqp/expr_nodes
3538
ydb/core/kqp/common/simple
36-
ydb/core/kqp/common/batch
3739
ydb/core/kqp/common/compilation
3840
ydb/core/kqp/common/events
3941
ydb/core/kqp/common/shutdown
@@ -64,7 +66,6 @@ GENERATE_ENUM_SERIALIZATION(kqp_resolve.h)
6466
END()
6567

6668
RECURSE(
67-
batch
6869
compilation
6970
events
7071
simple

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include "kqp_executer.h"
22
#include "kqp_executer_impl.h"
33
#include "kqp_locks_helper.h"
4-
#include "kqp_partition_helper.h"
54
#include "kqp_planner.h"
65
#include "kqp_table_resolver.h"
76
#include "kqp_tasks_validate.h"
@@ -102,12 +101,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
102101
const TActorId& creator, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
103102
ui32 statementResultIndex, const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup,
104103
const TGUCSettings::TPtr& GUCSettings,
104+
TPartitionPruner::TConfig partitionPrunerConfig,
105105
const TShardIdToTableInfoPtr& shardIdToTableInfo,
106106
const IKqpTransactionManagerPtr& txManager,
107107
const TActorId bufferActorId,
108-
TMaybe<TBatchOperationSettings> batchOperationSettings = Nothing())
109-
: TBase(std::move(request), std::move(asyncIoFactory), federatedQuerySetup, GUCSettings, database, userToken, counters, executerConfig,
110-
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter,
108+
TMaybe<NBatchOperations::TSettings> batchOperationSettings = Nothing())
109+
: TBase(std::move(request), std::move(asyncIoFactory), federatedQuerySetup, GUCSettings, std::move(partitionPrunerConfig),
110+
database, userToken, counters, executerConfig, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter,
111111
"DataExecuter", streamResult, bufferActorId, txManager, std::move(batchOperationSettings))
112112
, ShardIdToTableInfo(shardIdToTableInfo)
113113
, AllowOlapDataQuery(executerConfig.TableServiceConfig.GetAllowOlapDataQuery())
@@ -212,14 +212,14 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
212212
}
213213
}
214214

215-
if (info.HasBatchOperationMaxKey()) {
216-
if (ResponseEv->BatchOperationMaxKeys.empty()) {
217-
for (auto keyId : info.GetBatchOperationKeyIds()) {
218-
ResponseEv->BatchOperationKeyIds.push_back(keyId);
215+
if (!BatchOperationSettings.Empty() && info.HasSerializedEndRow()) {
216+
if (ResponseEv->EndRowColumnIds.empty()) {
217+
for (auto keyId : info.GetEndRowColumnIds()) {
218+
ResponseEv->EndRowColumnIds.push_back(keyId);
219219
}
220220
}
221221

222-
ResponseEv->BatchOperationMaxKeys.emplace_back(info.GetBatchOperationMaxKey());
222+
ResponseEv->SerializedEndRows.emplace_back(info.GetSerializedEndRow());
223223
}
224224
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
225225
NKikimrKqp::TEvKqpOutputActorResultInfo info;
@@ -1651,7 +1651,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16511651
case NKqpProto::TKqpPhyTableOperation::kLookup: {
16521652
auto columns = BuildKqpColumns(op, tableInfo);
16531653
bool isFullScan = false;
1654-
auto partitions = PrunePartitions(op, stageInfo, HolderFactory(), TypeEnv(), isFullScan);
1654+
auto partitions = PartitionPruner.Prune(op, stageInfo, isFullScan);
16551655
auto readSettings = ExtractReadSettings(op, stageInfo, HolderFactory(), TypeEnv());
16561656

16571657
if (!readSettings.ItemsLimit && isFullScan) {
@@ -1673,7 +1673,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16731673
case NKqpProto::TKqpPhyTableOperation::kDeleteRows: {
16741674
YQL_ENSURE(stage.InputsSize() <= 1, "Effect stage with multiple inputs: " << stage.GetProgramAst());
16751675

1676-
auto result = PruneEffectPartitions(op, stageInfo, HolderFactory(), TypeEnv());
1676+
auto result = PartitionPruner.PruneEffect(op, stageInfo);
16771677
for (auto& [shardId, shardInfo] : result) {
16781678
YQL_ENSURE(!shardInfo.KeyReadRanges);
16791679
YQL_ENSURE(shardInfo.KeyWriteRanges);
@@ -2251,7 +2251,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
22512251
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
22522252
const auto& source = stage.GetSources(0).GetReadRangesSource();
22532253
bool isFullScan;
2254-
SourceScanStageIdToParititions[stageInfo.Id] = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv(), isFullScan);
2254+
SourceScanStageIdToParititions[stageInfo.Id] = PartitionPruner.Prune(source, stageInfo, isFullScan);
22552255
if (isFullScan && !source.HasItemsLimit()) {
22562256
Counters->Counters->FullScansExecuted->Inc();
22572257
}
@@ -3058,12 +3058,13 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
30583058
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
30593059
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
30603060
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
3061-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
3062-
TMaybe<TBatchOperationSettings> batchOperationSettings)
3061+
TPartitionPruner::TConfig partitionPrunerConfig, const TShardIdToTableInfoPtr& shardIdToTableInfo,
3062+
const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
3063+
TMaybe<NBatchOperations::TSettings> batchOperationSettings)
30633064
{
30643065
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerConfig,
30653066
std::move(asyncIoFactory), creator, userRequestContext, statementResultIndex, federatedQuerySetup, GUCSettings,
3066-
shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings));
3067+
std::move(partitionPrunerConfig), shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings));
30673068
}
30683069

30693070
} // namespace NKqp

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
#pragma once
22

33
#include <library/cpp/lwtrace/shuttle.h>
4-
#include <ydb/core/kqp/common/batch/batch_operation_settings.h>
4+
#include <ydb/core/kqp/common/kqp_batch_operations.h>
55
#include <ydb/core/kqp/common/kqp_tx.h>
66
#include <ydb/core/kqp/common/kqp_event_ids.h>
77
#include <ydb/core/kqp/common/kqp_user_request_context.h>
8+
#include <ydb/core/kqp/executer_actor/kqp_partition_helper.h>
89
#include <ydb/core/kqp/executer_actor/shards_resolver/kqp_shards_resolver_events.h>
910
#include <ydb/core/kqp/query_data/kqp_query_data.h>
1011
#include <ydb/core/kqp/gateway/kqp_gateway.h>
@@ -37,8 +38,9 @@ struct TEvKqpExecuter {
3738

3839
THashSet<ui32> ParticipantNodes;
3940

40-
TVector<TSerializedCellVec> BatchOperationMaxKeys;
41-
TVector<ui32> BatchOperationKeyIds;
41+
// For BATCH operations only
42+
TVector<TSerializedCellVec> SerializedEndRows;
43+
TVector<ui32> EndRowColumnIds;
4244

4345
enum class EExecutionType {
4446
Data,
@@ -151,8 +153,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
151153
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
152154
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
153155
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
154-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
155-
TMaybe<TBatchOperationSettings> batchOperationSettings = Nothing());
156+
TPartitionPruner::TConfig partitionPrunerConfig, const TShardIdToTableInfoPtr& shardIdToTableInfo,
157+
const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
158+
TMaybe<NBatchOperations::TSettings> batchOperationSettings = Nothing());
156159

157160
IActor* CreateKqpSchemeExecuter(
158161
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,

ydb/core/kqp/executer_actor/kqp_executer_impl.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
8282
TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
8383
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
8484
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
85-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
86-
TMaybe<TBatchOperationSettings> batchOperationSettings)
85+
TPartitionPruner::TConfig partitionPrunerConfig, const TShardIdToTableInfoPtr& shardIdToTableInfo,
86+
const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
87+
TMaybe<NBatchOperations::TSettings> batchOperationSettings)
8788
{
8889
if (request.Transactions.empty()) {
8990
// commit-only or rollback-only data transaction
9091
return CreateKqpDataExecuter(
9192
std::move(request), database, userToken, counters, false, executerConfig,
9293
std::move(asyncIoFactory), creator,
9394
userRequestContext, statementResultIndex,
94-
federatedQuerySetup, /*GUCSettings*/nullptr,
95+
federatedQuerySetup, /*GUCSettings*/nullptr, std::move(partitionPrunerConfig),
9596
shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings)
9697
);
9798
}
@@ -115,7 +116,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
115116
std::move(request), database, userToken, counters, false, executerConfig,
116117
std::move(asyncIoFactory), creator,
117118
userRequestContext, statementResultIndex,
118-
federatedQuerySetup, /*GUCSettings*/nullptr,
119+
federatedQuerySetup, /*GUCSettings*/nullptr, std::move(partitionPrunerConfig),
119120
shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings)
120121
);
121122

@@ -131,7 +132,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
131132
std::move(request), database, userToken, counters, true,
132133
executerConfig, std::move(asyncIoFactory), creator,
133134
userRequestContext, statementResultIndex,
134-
federatedQuerySetup, GUCSettings,
135+
federatedQuerySetup, GUCSettings, std::move(partitionPrunerConfig),
135136
shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings)
136137
);
137138

0 commit comments

Comments
 (0)