Skip to content

Commit 9cf6da3

Browse files
authored
Fixes for BATCH operations for tables with index and empty result row (#19469)
2 parents 625df51 + 374e461 commit 9cf6da3

File tree

5 files changed

+263
-69
lines changed

5 files changed

+263
-69
lines changed

ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,24 +86,23 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
8686
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(PhysicalRequest.TxAlloc,
8787
TEvKqpExecuter::TEvTxResponse::EExecutionType::Data);
8888

89+
if (TableServiceConfig.HasBatchOperationSettings()) {
90+
BatchOperationSettings = SetBatchOperationSettings(TableServiceConfig.GetBatchOperationSettings());
91+
}
92+
93+
PE_LOG_I("Created " << ActorName << " with MaxBatchSize = " << BatchOperationSettings.MaxBatchSize
94+
<< ", PartitionExecutionLimit = " << BatchOperationSettings.PartitionExecutionLimit);
95+
8996
for (const auto& tx : PreparedQuery->GetTransactions()) {
9097
for (const auto& stage : tx->GetStages()) {
9198
for (const auto& sink : stage.GetSinks()) {
9299
FillTableMetaInfo(sink);
93-
94100
if (!KeyColumnInfo.empty()) {
95-
break;
101+
return;
96102
}
97103
}
98104
}
99105
}
100-
101-
if (TableServiceConfig.HasBatchOperationSettings()) {
102-
BatchOperationSettings = SetBatchOperationSettings(TableServiceConfig.GetBatchOperationSettings());
103-
}
104-
105-
PE_LOG_I("Created " << ActorName << " with MaxBatchSize = " << BatchOperationSettings.MaxBatchSize
106-
<< ", PartitionExecutionLimit = " << BatchOperationSettings.PartitionExecutionLimit);
107106
}
108107

109108
void Bootstrap() {
@@ -683,7 +682,7 @@ class TKqpPartitionedExecuter : public TActorBootstrapped<TKqpPartitionedExecute
683682
firstEmpty = std::min(firstEmpty, info.ParamIndex);
684683
}
685684

686-
FillRequestParameter(queryData, paramName, cellValue);
685+
FillRequestParameter(queryData, paramName, cellValue, /* setDefault */ !cellValue.HasValue());
687686
}
688687

689688
FillRequestParameter(queryData, prefixRangeName, firstEmpty);

ydb/core/kqp/opt/kqp_opt_kql.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -537,13 +537,13 @@ TExprBase BuildDeleteTableWithIndex(const TKiWriteTable& write, const TKikimrTab
537537
.Done();
538538
}
539539

540-
TExprBase BuildRowsToDelete(const TKikimrTableDescription& tableData, bool withSystemColumns, const TCoLambda& filter,
541-
const TPositionHandle pos, TExprContext& ctx)
540+
TExprBase BuildRowsToDelete(const TKikimrTableDescription& tableData, bool withSystemColumns,
541+
const TCoLambda& filter, const TCoAtom& isBatch, const TPositionHandle pos, TExprContext& ctx)
542542
{
543543
const auto tableMeta = BuildTableMeta(tableData, pos, ctx);
544544
const auto tableColumns = BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/);
545545

546-
const auto allRows = BuildReadTable(tableColumns, pos, tableData, false, ctx);
546+
const auto allRows = BuildReadTable(tableColumns, pos, tableData, (isBatch == "true"), ctx);
547547

548548
return Build<TCoFilter>(ctx, pos)
549549
.Input(allRows)
@@ -554,7 +554,7 @@ TExprBase BuildRowsToDelete(const TKikimrTableDescription& tableData, bool withS
554554
TExprBase BuildDeleteTable(const TKiDeleteTable& del, const TKikimrTableDescription& tableData, bool withSystemColumns,
555555
TExprContext& ctx)
556556
{
557-
auto rowsToDelete = BuildRowsToDelete(tableData, withSystemColumns, del.Filter(), del.Pos(), ctx);
557+
auto rowsToDelete = BuildRowsToDelete(tableData, withSystemColumns, del.Filter(), del.IsBatch(), del.Pos(), ctx);
558558
auto keysToDelete = ProjectColumns(rowsToDelete, tableData.Metadata->KeyColumnNames, ctx);
559559

560560
return Build<TKqlDeleteRows>(ctx, del.Pos())
@@ -569,7 +569,7 @@ TExprBase BuildDeleteTable(const TKiDeleteTable& del, const TKikimrTableDescript
569569
TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTableDescription& tableData,
570570
bool withSystemColumns, TExprContext& ctx)
571571
{
572-
auto rowsToDelete = BuildRowsToDelete(tableData, withSystemColumns, del.Filter(), del.Pos(), ctx);
572+
auto rowsToDelete = BuildRowsToDelete(tableData, withSystemColumns, del.Filter(), del.IsBatch(), del.Pos(), ctx);
573573

574574
auto indexes = BuildSecondaryIndexVector(tableData, del.Pos(), ctx, nullptr,
575575
[] (const TKikimrTableMetadata& meta, TPositionHandle pos, TExprContext& ctx) -> TExprBase {
@@ -620,9 +620,9 @@ TExprBase BuildDeleteTableWithIndex(const TKiDeleteTable& del, const TKikimrTabl
620620
}
621621

622622
TExprBase BuildRowsToUpdate(const TKikimrTableDescription& tableData, bool withSystemColumns, const TCoLambda& filter,
623-
const TPositionHandle pos, TExprContext& ctx)
623+
const TCoAtom& isBatch, const TPositionHandle pos, TExprContext& ctx)
624624
{
625-
auto kqlReadTable = BuildReadTable(BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/), pos, tableData, false, ctx);
625+
auto kqlReadTable = BuildReadTable(BuildColumnsList(tableData, pos, ctx, withSystemColumns, true /*ignoreWriteOnlyColumns*/), pos, tableData, (isBatch == "true"), ctx);
626626

627627
return Build<TCoFilter>(ctx, pos)
628628
.Input(kqlReadTable)
@@ -689,7 +689,7 @@ THashSet<TStringBuf> GetUpdateColumns(const TKikimrTableDescription& tableData,
689689
TExprBase BuildUpdateTable(const TKiUpdateTable& update, const TKikimrTableDescription& tableData,
690690
bool withSystemColumns, TExprContext& ctx)
691691
{
692-
auto rowsToUpdate = BuildRowsToUpdate(tableData, withSystemColumns, update.Filter(), update.Pos(), ctx);
692+
auto rowsToUpdate = BuildRowsToUpdate(tableData, withSystemColumns, update.Filter(), update.IsBatch(), update.Pos(), ctx);
693693

694694
auto updateColumns = GetUpdateColumns(tableData, update.Update());
695695
auto updatedRows = BuildUpdatedRows(rowsToUpdate, update.Update(), updateColumns, update.Pos(), ctx);
@@ -717,7 +717,7 @@ TExprBase BuildUpdateTable(const TKiUpdateTable& update, const TKikimrTableDescr
717717
TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrTableDescription& tableData,
718718
bool withSystemColumns, TExprContext& ctx)
719719
{
720-
auto rowsToUpdate = BuildRowsToUpdate(tableData, withSystemColumns, update.Filter(), update.Pos(), ctx);
720+
auto rowsToUpdate = BuildRowsToUpdate(tableData, withSystemColumns, update.Filter(), update.IsBatch(), update.Pos(), ctx);
721721

722722
TVector<TExprBase> effects;
723723

@@ -772,7 +772,7 @@ TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrT
772772
.Columns()
773773
.Add(updateColumnsList)
774774
.Build()
775-
.IsBatch(ctx.NewAtom(update.Pos(), "false"))
775+
.IsBatch(update.IsBatch())
776776
.Settings(IsConditionalUpdateSetting(ctx, update.Pos()))
777777
.ReturningColumns(update.ReturningColumns())
778778
.Done();
@@ -843,7 +843,7 @@ TExprBase BuildUpdateTableWithIndex(const TKiUpdateTable& update, const TKikimrT
843843
.Columns()
844844
.Add(indexColumnsList)
845845
.Build()
846-
.IsBatch(ctx.NewAtom(update.Pos(), "false"))
846+
.IsBatch(update.IsBatch())
847847
.Settings().Build()
848848
.Done();
849849

0 commit comments

Comments
 (0)