Skip to content

Commit 9888760

Browse files
authored
25-1: Fixes for BATCH UPDATE/DELETE operations (#18135)
2 parents 8571c01 + 87135b5 commit 9888760

37 files changed

+2339
-1288
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#include "batch_operation_settings.h"
2+
3+
namespace NKikimr::NKqp {
4+
5+
TBatchOperationSettings SetBatchOperationSettings(const NKikimrConfig::TTableServiceConfig::TBatchOperationSettings& settings) {
6+
TBatchOperationSettings res;
7+
8+
res.MaxBatchSize = settings.GetMaxBatchSize();
9+
res.MinBatchSize = settings.GetMinBatchSize();
10+
res.MaxRetryDelayMs = settings.GetMaxRetryDelayMs();
11+
res.StartRetryDelayMs = settings.GetStartRetryDelayMs();
12+
res.PartitionExecutionLimit = settings.GetPartitionExecutionLimit();
13+
14+
return res;
15+
}
16+
17+
} // namespace NKikimr::NKqp
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include <ydb/core/protos/table_service_config.pb.h>
4+
#include <util/generic/fwd.h>
5+
6+
namespace NKikimr::NKqp {
7+
8+
struct TBatchOperationSettings {
9+
ui64 MaxBatchSize = 10000;
10+
ui64 MinBatchSize = 1;
11+
ui64 MaxRetryDelayMs = 30000;
12+
ui64 StartRetryDelayMs = 50;
13+
ui64 PartitionExecutionLimit = 10;
14+
};
15+
16+
TBatchOperationSettings SetBatchOperationSettings(const NKikimrConfig::TTableServiceConfig::TBatchOperationSettings& settings);
17+
18+
} // namespace NKikimr::NKqp

ydb/core/kqp/common/batch/params.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
namespace NKikimr::NKqp::NBatchParams {
66

7-
const TString Header = "$_kqp_batch_";
8-
const TString IsFirstQuery = Header + "is_first_query";
9-
const TString IsLastQuery = Header + "is_last_query";
7+
const TString Header = "%kqp%batch_";
108
const TString IsInclusiveLeft = Header + "is_inclusive_left";
119
const TString IsInclusiveRight = Header + "is_inclusive_right";
1210
const TString Begin = Header + "begin_"; // begin_N
1311
const TString End = Header + "end_"; // end_N
12+
const TString BeginPrefixSize = Begin + "prefix_size";
13+
const TString EndPrefixSize = End + "prefix_size";
1414

15-
} // namespace NKikimr::NKqp::NPartitionedExecuter
15+
} // namespace NKikimr::NKqp::NBatchParams

ydb/core/kqp/common/batch/ya.make

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
batch_operation_settings.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/protos
9+
)
10+
11+
YQL_LAST_ABI_VERSION()
12+
13+
END()

ydb/core/kqp/common/kqp_resolve.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,20 @@
77
#include <ydb/core/tx/datashard/range_ops.h>
88
#endif
99

10+
#include <yql/essentials/minikql/mkql_node_builder.h>
11+
1012
namespace NKikimr {
1113
namespace NKqp {
1214

1315
using namespace NMiniKQL;
1416
using namespace NYql;
1517
using namespace NYql::NNodes;
1618

17-
NUdf::TUnboxedValue MakeDefaultValueByType(const NKikimr::NMiniKQL::TType* type) {
19+
NUdf::TUnboxedValue MakeDefaultValueByType(NKikimr::NMiniKQL::TType* type) {
20+
bool isOptional;
21+
type = UnpackOptional(type, isOptional);
22+
Y_ABORT_UNLESS(type->IsData(), "%s", type->GetKindAsStr());
23+
1824
auto dataType = static_cast<const NKikimr::NMiniKQL::TDataType*>(type);
1925
switch (dataType->GetSchemeType()) {
2026
case NUdf::TDataType<bool>::Id:

ydb/core/kqp/common/kqp_resolve.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ class TKqpTableKeys {
233233
THashMap<TTableId, TTable> TablesById;
234234
};
235235

236-
NUdf::TUnboxedValue MakeDefaultValueByType(const NKikimr::NMiniKQL::TType* type);
236+
NUdf::TUnboxedValue MakeDefaultValueByType(NKikimr::NMiniKQL::TType* type);
237237

238238
TVector<TCell> MakeKeyCells(const NKikimr::NUdf::TUnboxedValue& value, const TVector<NScheme::TTypeInfo>& keyColumnTypes,
239239
const TVector<ui32>& keyColumnIndices, const NMiniKQL::TTypeEnvironment& typeEnv, bool copyValues);

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ struct TKqpExecuterEvents {
6565
EvProgress,
6666
EvStreamDataAck,
6767
EvTableResolveStatus,
68-
EvShardsResolveStatus
68+
EvShardsResolveStatus,
69+
EvDelayedExecution
6970
};
7071
};
7172

ydb/core/kqp/common/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
ydb/core/engine
3030
ydb/core/kqp/expr_nodes
3131
ydb/core/kqp/common/simple
32+
ydb/core/kqp/common/batch
3233
ydb/core/kqp/common/compilation
3334
ydb/core/kqp/common/events
3435
ydb/core/kqp/common/shutdown
@@ -58,6 +59,7 @@ GENERATE_ENUM_SERIALIZATION(kqp_yql.h)
5859
END()
5960

6061
RECURSE(
62+
batch
6163
compilation
6264
events
6365
simple

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,14 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
891891
QueryStatMemFinishBytes = KqpGroup->GetCounter("Query/Stat/MemFinishBytes", true);
892892
QueryStatMemConvertBytes = KqpGroup->GetCounter("Query/Stat/MemConvertBytes", true);
893893

894+
/* Statistics batch operations */
895+
BatchOperationUpdateRows = KqpGroup->GetCounter("BatchOperation/Update/Rows", true);
896+
BatchOperationUpdateBytes = KqpGroup->GetCounter("BatchOperation/Update/Bytes", true);
897+
898+
BatchOperationDeleteRows = KqpGroup->GetCounter("BatchOperation/Delete/Rows", true);
899+
BatchOperationDeleteBytes = KqpGroup->GetCounter("BatchOperation/Delete/Bytes", true);
900+
901+
BatchOperationRetries = KqpGroup->GetCounter("BatchOperation/Retries", true);
894902
}
895903

896904
::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,12 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
482482
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemFinishBytes;
483483
::NMonitoring::TDynamicCounters::TCounterPtr QueryStatMemConvertBytes;
484484

485+
// Statistics batch operations
486+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationUpdateRows;
487+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationUpdateBytes;
488+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationDeleteRows;
489+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationDeleteBytes;
490+
::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationRetries;
485491
};
486492

487493
struct TKqpRequestCounters : public TThrRefBase {

0 commit comments

Comments
 (0)