Skip to content

Commit c73f760

Browse files
authored
Fix wrong columns order in sinks (#10338)
1 parent 8ad3adc commit c73f760

File tree

6 files changed

+92
-34
lines changed

6 files changed

+92
-34
lines changed

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,10 +546,9 @@
546546
"Match": {"Type": "Callable", "Name": "KqpTableSinkSettings"},
547547
"Children": [
548548
{"Index": 0, "Name": "Table", "Type": "TKqpTable"},
549-
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
550-
{"Index": 2, "Name": "InconsistentWrite", "Type": "TCoAtom"},
551-
{"Index": 3, "Name": "Mode", "Type": "TCoAtom"},
552-
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
549+
{"Index": 1, "Name": "InconsistentWrite", "Type": "TCoAtom"},
550+
{"Index": 2, "Name": "Mode", "Type": "TCoAtom"},
551+
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
553552
]
554553
},
555554
{

ydb/core/kqp/opt/kqp_opt_effects.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ TCoAtomList BuildKeyColumnsList(const TKikimrTableDescription& table, TPositionH
231231
.Done();
232232
}
233233

234-
TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns,
234+
TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table,
235235
const bool allowInconsistentWrites, const TStringBuf mode, TExprContext& ctx) {
236236
Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr));
237237

@@ -253,7 +253,6 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const
253253
.Index().Value("0").Build()
254254
.Settings<TKqpTableSinkSettings>()
255255
.Table(table)
256-
.Columns(columns)
257256
.InconsistentWrite(allowInconsistentWrites
258257
? ctx.NewAtom(expr.Pos(), "true")
259258
: ctx.NewAtom(expr.Pos(), "false"))
@@ -311,7 +310,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
311310
if (IsDqPureExpr(node.Input())) {
312311
if (sinkEffect) {
313312
stageInput = RebuildPureStageWithSink(
314-
node.Input(), node.Table(), node.Columns(),
313+
node.Input(), node.Table(),
315314
settings.AllowInconsistentWrites, settings.Mode, ctx);
316315
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
317316
.Stage(stageInput.Cast().Ptr())
@@ -349,7 +348,6 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
349348
.Index().Value("0").Build()
350349
.Settings<TKqpTableSinkSettings>()
351350
.Table(node.Table())
352-
.Columns(node.Columns())
353351
.InconsistentWrite(settings.AllowInconsistentWrites
354352
? ctx.NewAtom(node.Pos(), "true")
355353
: ctx.NewAtom(node.Pos(), "false"))
@@ -459,7 +457,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
459457
if (IsDqPureExpr(node.Input())) {
460458
if (sinkEffect) {
461459
const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx);
462-
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), keyColumns, false, "delete", ctx);
460+
stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), false, "delete", ctx);
463461
effect = Build<TKqpSinkEffect>(ctx, node.Pos())
464462
.Stage(stageInput.Cast().Ptr())
465463
.SinkIndex().Build("0")
@@ -486,7 +484,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
486484
auto input = dqUnion.Output().Stage().Program().Body();
487485

488486
if (sinkEffect) {
489-
const auto keyColumns = BuildKeyColumnsList(table, node.Pos(), ctx);
490487
auto sink = Build<TDqSink>(ctx, node.Pos())
491488
.DataSink<TKqpTableSink>()
492489
.Category(ctx.NewAtom(node.Pos(), NYql::KqpTableSinkName))
@@ -495,7 +492,6 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
495492
.Index().Value("0").Build()
496493
.Settings<TKqpTableSinkSettings>()
497494
.Table(node.Table())
498-
.Columns(keyColumns)
499495
.InconsistentWrite(ctx.NewAtom(node.Pos(), "false"))
500496
.Mode(ctx.NewAtom(node.Pos(), "delete"))
501497
.Settings()

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,16 @@ void FillTablesMap(const TKqpTable& table, const TCoAtomList& columns,
149149
}
150150
}
151151

152+
void FillTablesMap(const TKqpTable& table, const TVector<TStringBuf>& columns,
153+
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap)
154+
{
155+
FillTablesMap(table, tablesMap);
156+
157+
for (const auto& column : columns) {
158+
tablesMap[table.Path()].emplace(column);
159+
}
160+
}
161+
152162
void FillTable(const TKikimrTableMetadata& tableMeta, THashSet<TStringBuf>&& columns,
153163
NKqpProto::TKqpPhyTable& tableProto)
154164
{
@@ -808,7 +818,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
808818
YQL_ENSURE(maybeSinkNode);
809819
auto sinkNode = maybeSinkNode.Cast();
810820
auto* sinkProto = stageProto.AddSinks();
811-
FillSink(sinkNode, sinkProto, tablesMap, ctx);
821+
FillSink(sinkNode, sinkProto, tablesMap, stage, ctx);
812822
sinkProto->SetOutputIndex(FromString(TStringBuf(sinkNode.Index())));
813823

814824
if (IsTableSink(sinkNode.DataSink().Cast<TCoDataSink>().Category())) {
@@ -1074,19 +1084,34 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10741084
}
10751085
}
10761086

1077-
void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap) {
1087+
void FillKqpSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage) {
10781088
if (auto settings = sink.Settings().Maybe<TKqpTableSinkSettings>()) {
10791089
NKqpProto::TKqpInternalSink& internalSinkProto = *protoSink->MutableInternalSink();
10801090
internalSinkProto.SetType(TString(NYql::KqpTableSinkName));
10811091
NKikimrKqp::TKqpTableSinkSettings settingsProto;
1082-
FillTablesMap(settings.Table().Cast(), settings.Columns().Cast(), tablesMap);
1092+
1093+
const auto& tupleType = stage.Ref().GetTypeAnn()->Cast<TTupleExprType>();
1094+
YQL_ENSURE(tupleType);
1095+
YQL_ENSURE(tupleType->GetSize() == 1);
1096+
const auto& listType = tupleType->GetItems()[0]->Cast<TListExprType>();
1097+
YQL_ENSURE(listType);
1098+
const auto& structType = listType->GetItemType()->Cast<TStructExprType>();
1099+
YQL_ENSURE(structType);
1100+
1101+
TVector<TStringBuf> columns;
1102+
columns.reserve(structType->GetSize());
1103+
for (const auto& item : structType->GetItems()) {
1104+
columns.emplace_back(item->GetName());
1105+
}
1106+
1107+
FillTablesMap(settings.Table().Cast(), columns, tablesMap);
10831108
FillTableId(settings.Table().Cast(), *settingsProto.MutableTable());
10841109

10851110
const auto tableMeta = TablesData->ExistingTable(Cluster, settings.Table().Cast().Path()).Metadata;
10861111

1087-
auto fillColumnProto = [] (TString columnName, const NYql::TKikimrColumnMetadata* column, NKikimrKqp::TKqpColumnMetadataProto* columnProto ) {
1112+
auto fillColumnProto = [] (TStringBuf columnName, const NYql::TKikimrColumnMetadata* column, NKikimrKqp::TKqpColumnMetadataProto* columnProto ) {
10881113
columnProto->SetId(column->Id);
1089-
columnProto->SetName(columnName);
1114+
columnProto->SetName(TString(columnName));
10901115
columnProto->SetTypeId(column->TypeInfo.GetTypeId());
10911116

10921117
if(NScheme::NTypeIds::IsParametrizedType(column->TypeInfo.GetTypeId())) {
@@ -1096,16 +1121,15 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10961121

10971122
for (const auto& columnName : tableMeta->KeyColumnNames) {
10981123
const auto columnMeta = tableMeta->Columns.FindPtr(columnName);
1099-
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\"");
1124+
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + TString(columnName) + "\"");
11001125

11011126
auto keyColumnProto = settingsProto.AddKeyColumns();
11021127
fillColumnProto(columnName, columnMeta, keyColumnProto);
11031128
}
11041129

1105-
for (const auto& column : settings.Columns().Cast()) {
1106-
const auto columnName = column.StringValue();
1130+
for (const auto& columnName : columns) {
11071131
const auto columnMeta = tableMeta->Columns.FindPtr(columnName);
1108-
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\"");
1132+
YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + TString(columnName) + "\"");
11091133

11101134
auto columnProto = settingsProto.AddColumns();
11111135
fillColumnProto(columnName, columnMeta, columnProto);
@@ -1141,11 +1165,11 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
11411165
|| dataSinkCategory == NYql::KqpTableSinkName;
11421166
}
11431167

1144-
void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, TExprContext& ctx) {
1168+
void FillSink(const TDqSink& sink, NKqpProto::TKqpSink* protoSink, THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap, const TDqPhyStage& stage, TExprContext& ctx) {
11451169
Y_UNUSED(ctx);
11461170
const TStringBuf dataSinkCategory = sink.DataSink().Cast<TCoDataSink>().Category();
11471171
if (IsTableSink(dataSinkCategory)) {
1148-
FillKqpSink(sink, protoSink, tablesMap);
1172+
FillKqpSink(sink, protoSink, tablesMap, stage);
11491173
} else {
11501174
// Delegate sink filling to dq integration of specific provider
11511175
const auto provider = TypesCtx.DataSinkMap.find(dataSinkCategory);

ydb/core/kqp/ut/olap/delete_ut.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#include <ydb/core/kqp/ut/common/columnshard.h>
2+
3+
#include <library/cpp/testing/unittest/registar.h>
4+
5+
namespace NKikimr::NKqp {
6+
Y_UNIT_TEST_SUITE(KqpOlapDelete) {
7+
Y_UNIT_TEST_TWIN(DeleteWithDiffrentTypesPKColumns, isStream) {
8+
NKikimrConfig::TAppConfig appConfig;
9+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
10+
auto runnerSettings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(true);
11+
12+
TTestHelper testHelper(runnerSettings);
13+
auto client = testHelper.GetKikimr().GetQueryClient();
14+
15+
TVector<TTestHelper::TColumnSchema> schema = {
16+
TTestHelper::TColumnSchema().SetName("time").SetType(NScheme::NTypeIds::Timestamp).SetNullable(false),
17+
TTestHelper::TColumnSchema().SetName("class").SetType(NScheme::NTypeIds::Utf8).SetNullable(false),
18+
TTestHelper::TColumnSchema().SetName("uniq").SetType(NScheme::NTypeIds::Utf8).SetNullable(false),
19+
};
20+
21+
TTestHelper::TColumnTable testTable;
22+
testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "time", "class", "uniq" }).SetSchema(schema);
23+
testHelper.CreateTable(testTable);
24+
25+
auto ts = TInstant::Now();
26+
{
27+
TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
28+
tableInserter.AddRow().Add(ts.MicroSeconds()).Add("test").Add("test");
29+
testHelper.BulkUpsert(testTable, tableInserter);
30+
}
31+
32+
33+
if (isStream) {
34+
auto deleteQuery = "DELETE FROM `/Root/ColumnTableTest` ON SELECT * FROM `/Root/ColumnTableTest`";
35+
auto deleteQueryResult = client.ExecuteQuery(deleteQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
36+
UNIT_ASSERT_C(deleteQueryResult.IsSuccess(), deleteQueryResult.GetIssues().ToString());
37+
} else {
38+
auto deleteQuery = TStringBuilder() << "DELETE FROM `/Root/ColumnTableTest` WHERE Cast(DateTime::MakeDate(DateTime::StartOfDay(time)) as String) == \""
39+
<< ts.FormatLocalTime("%Y-%m-%d")
40+
<< "\" and class == \"test\" and uniq = \"test\";";
41+
auto deleteQueryResult = client.ExecuteQuery(deleteQuery, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
42+
UNIT_ASSERT_C(deleteQueryResult.IsSuccess(), deleteQueryResult.GetIssues().ToString());
43+
}
44+
45+
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest`", "[]");
46+
}
47+
}
48+
}

ydb/core/kqp/ut/olap/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ ELSE()
1313
ENDIF()
1414

1515
SRCS(
16+
delete_ut.cpp
1617
kqp_olap_stats_ut.cpp
1718
GLOBAL kqp_olap_ut.cpp
1819
sys_view_ut.cpp

ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,21 +144,11 @@ Y_UNIT_TEST_SUITE(KqpSinkTx) {
144144
result = session.ExecuteQuery(Q_(R"(
145145
UPDATE `/Root/KV` SET Value = "third" WHERE Key = 4;
146146
)"), TTxControl::Tx(tx->GetId())).ExtractValueSync();
147-
if (GetIsOlap()) {
148-
// Olap has Reads in this query, so it breaks now.
149-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
150-
} else {
151-
// Oltp doesn't have Reads in this query, so it breaks later.
152-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
153-
}
147+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
154148

155149
auto commitResult = tx->Commit().ExtractValueSync();
156150

157-
if (GetIsOlap()) {
158-
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString());
159-
} else {
160-
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString());
161-
}
151+
UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::NOT_FOUND, commitResult.GetIssues().ToString());
162152
}
163153
};
164154

0 commit comments

Comments
 (0)