Skip to content

Commit 72d9831

Browse files
committed
Fixes + new features for BATCH UPDATE/DELETE (#14994)
1 parent c222668 commit 72d9831

31 files changed

+2148
-1282
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#include "batch_operation_settings.h"
2+
3+
namespace NKikimr::NKqp {
4+
5+
TBatchOperationSettings SetBatchOperationSettings(const NKikimrConfig::TTableServiceConfig::TBatchOperationSettings& settings) {
6+
TBatchOperationSettings res;
7+
8+
res.MaxBatchSize = settings.GetMaxBatchSize();
9+
res.MinBatchSize = settings.GetMinBatchSize();
10+
res.MaxRetryDelayMs = settings.GetMaxRetryDelayMs();
11+
res.StartRetryDelayMs = settings.GetStartRetryDelayMs();
12+
res.PartitionExecutionLimit = settings.GetPartitionExecutionLimit();
13+
14+
return res;
15+
}
16+
17+
} // namespace NKikimr::NKqp
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include <ydb/core/protos/table_service_config.pb.h>
4+
#include <util/generic/fwd.h>
5+
6+
namespace NKikimr::NKqp {
7+
8+
struct TBatchOperationSettings {
9+
ui64 MaxBatchSize = 10000;
10+
ui64 MinBatchSize = 1;
11+
ui64 MaxRetryDelayMs = 30000;
12+
ui64 StartRetryDelayMs = 50;
13+
ui64 PartitionExecutionLimit = 10;
14+
};
15+
16+
TBatchOperationSettings SetBatchOperationSettings(const NKikimrConfig::TTableServiceConfig::TBatchOperationSettings& settings);
17+
18+
} // namespace NKikimr::NKqp

ydb/core/kqp/common/batch/params.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
namespace NKikimr::NKqp::NBatchParams {
66

7-
const TString Header = "$_kqp_batch_";
8-
const TString IsFirstQuery = Header + "is_first_query";
9-
const TString IsLastQuery = Header + "is_last_query";
7+
const TString Header = "%kqp%batch_";
108
const TString IsInclusiveLeft = Header + "is_inclusive_left";
119
const TString IsInclusiveRight = Header + "is_inclusive_right";
1210
const TString Begin = Header + "begin_"; // begin_N
1311
const TString End = Header + "end_"; // end_N
12+
const TString BeginPrefixSize = Begin + "prefix_size";
13+
const TString EndPrefixSize = End + "prefix_size";
1414

15-
} // namespace NKikimr::NKqp::NPartitionedExecuter
15+
} // namespace NKikimr::NKqp::NBatchParams

ydb/core/kqp/common/batch/ya.make

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
batch_operation_settings.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/protos
9+
)
10+
11+
YQL_LAST_ABI_VERSION()
12+
13+
END()

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ struct TKqpExecuterEvents {
6565
EvProgress,
6666
EvStreamDataAck,
6767
EvTableResolveStatus,
68-
EvShardsResolveStatus
68+
EvShardsResolveStatus,
69+
EvDelayedExecution
6970
};
7071
};
7172

ydb/core/kqp/common/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
ydb/core/engine
3030
ydb/core/kqp/expr_nodes
3131
ydb/core/kqp/common/simple
32+
ydb/core/kqp/common/batch
3233
ydb/core/kqp/common/compilation
3334
ydb/core/kqp/common/events
3435
ydb/core/kqp/common/shutdown
@@ -58,6 +59,7 @@ GENERATE_ENUM_SERIALIZATION(kqp_yql.h)
5859
END()
5960

6061
RECURSE(
62+
batch
6163
compilation
6264
events
6365
simple

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,14 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
888888
QueryStatMemFinishBytes = KqpGroup->GetCounter("Query/Stat/MemFinishBytes", true);
889889
QueryStatMemConvertBytes = KqpGroup->GetCounter("Query/Stat/MemConvertBytes", true);
890890

891+
/* Statistics batch operations */
892+
BatchOperationUpdateRows = KqpGroup->GetCounter("BatchOperation/Update/Rows", true);
893+
BatchOperationUpdateBytes = KqpGroup->GetCounter("BatchOperation/Update/Bytes", true);
894+
895+
BatchOperationDeleteRows = KqpGroup->GetCounter("BatchOperation/Delete/Rows", true);
896+
BatchOperationDeleteBytes = KqpGroup->GetCounter("BatchOperation/Delete/Bytes", true);
897+
898+
BatchOperationRetries = KqpGroup->GetCounter("BatchOperation/Retries", true);
891899
}
892900

893901
::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,12 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
479479
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemFinishBytes;
480480
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemConvertBytes;
481481

482+
// Statistics batch operations
483+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationUpdateRows;
484+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationUpdateBytes;
485+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationDeleteRows;
486+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationDeleteBytes;
487+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationRetries;
482488
};
483489

484490
struct TKqpRequestCounters : public TThrRefBase {

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
104104
const TGUCSettings::TPtr& GUCSettings,
105105
const TShardIdToTableInfoPtr& shardIdToTableInfo,
106106
const IKqpTransactionManagerPtr& txManager,
107-
const TActorId bufferActorId)
107+
const TActorId bufferActorId,
108+
TMaybe<TBatchOperationSettings> batchOperationSettings = Nothing())
108109
: TBase(std::move(request), std::move(asyncIoFactory), federatedQuerySetup, GUCSettings, database, userToken, counters, tableServiceConfig,
109110
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter,
110-
"DataExecuter", streamResult, bufferActorId, txManager)
111+
"DataExecuter", streamResult, bufferActorId, txManager, std::move(batchOperationSettings))
111112
, ShardIdToTableInfo(shardIdToTableInfo)
112113
, AllowOlapDataQuery(tableServiceConfig.GetAllowOlapDataQuery())
113114
, WaitCAStatsTimeout(TDuration::MilliSeconds(tableServiceConfig.GetQueryLimits().GetWaitCAStatsTimeoutMs()))
@@ -213,6 +214,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
213214
TxManager->AddLock(lock.GetDataShard(), lock);
214215
}
215216
}
217+
218+
if (info.HasBatchOperationMaxKey()) {
219+
if (ResponseEv->BatchOperationMaxKeys.empty()) {
220+
for (auto keyId : info.GetBatchOperationKeyIds()) {
221+
ResponseEv->BatchOperationKeyIds.push_back(keyId);
222+
}
223+
}
224+
225+
ResponseEv->BatchOperationMaxKeys.emplace_back(info.GetBatchOperationMaxKey());
226+
}
216227
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
217228
NKikimrKqp::TEvKqpOutputActorResultInfo info;
218229
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
@@ -3055,11 +3066,12 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
30553066
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator,
30563067
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
30573068
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
3058-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId)
3069+
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
3070+
TMaybe<TBatchOperationSettings> batchOperationSettings)
30593071
{
30603072
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig,
30613073
std::move(asyncIoFactory), creator, userRequestContext, statementResultIndex, federatedQuerySetup, GUCSettings,
3062-
shardIdToTableInfo, txManager, bufferActorId);
3074+
shardIdToTableInfo, txManager, bufferActorId, std::move(batchOperationSettings));
30633075
}
30643076

30653077
} // namespace NKqp

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <library/cpp/lwtrace/shuttle.h>
4+
#include <ydb/core/kqp/common/batch/batch_operation_settings.h>
45
#include <ydb/core/kqp/common/kqp_tx.h>
56
#include <ydb/core/kqp/common/kqp_event_ids.h>
67
#include <ydb/core/kqp/common/kqp_user_request_context.h>
@@ -36,6 +37,9 @@ struct TEvKqpExecuter {
3637

3738
THashSet<ui32> ParticipantNodes;
3839

40+
TVector<TSerializedCellVec> BatchOperationMaxKeys;
41+
TVector<ui32> BatchOperationKeyIds;
42+
3943
enum class EExecutionType {
4044
Data,
4145
Scan,
@@ -105,6 +109,16 @@ struct TEvKqpExecuter {
105109
NYql::TIssues Issues;
106110
TDuration CpuTime;
107111
};
112+
113+
struct TEvTxDelayedExecution : public TEventLocal<TEvTxDelayedExecution,
114+
TKqpExecuterEvents::EvDelayedExecution>
115+
{
116+
TEvTxDelayedExecution(size_t partitionIdx)
117+
: PartitionIdx(partitionIdx)
118+
{}
119+
120+
size_t PartitionIdx;
121+
};
108122
};
109123

110124
struct TKqpFederatedQuerySetup;
@@ -115,7 +129,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
115129
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator,
116130
const TIntrusivePtr<TUserRequestContext>& userRequestContext, ui32 statementResultIndex,
117131
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings,
118-
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId);
132+
const TShardIdToTableInfoPtr& shardIdToTableInfo, const IKqpTransactionManagerPtr& txManager, const TActorId bufferActorId,
133+
TMaybe<TBatchOperationSettings> batchOperationSettings = Nothing());
119134

120135
IActor* CreateKqpSchemeExecuter(
121136
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,

0 commit comments

Comments
 (0)