@@ -15,11 +15,55 @@ using namespace NYql::NNodes;
15
15
16
16
namespace {
17
17
18
+ TVector<TString> GetMissingInputColumnsForReturning (
19
+ const TKiWriteTable& write, const TCoAtomList& inputColumns)
20
+ {
21
+ auto returnColumns = write.ReturningColumns ().Cast <TCoAtomList>();
22
+ if (returnColumns.Ref ().ChildrenSize () == 0 ) {
23
+ return {};
24
+ }
25
+
26
+ THashSet<TStringBuf> currentInput;
27
+ for (const auto & name : inputColumns) {
28
+ currentInput.insert (name.Value ());
29
+ }
30
+
31
+ TVector<TString> result;
32
+ for (const auto & returnCol : returnColumns) {
33
+ if (!currentInput.contains (returnCol.Value ())) {
34
+ result.push_back (TString (returnCol.Value ()));
35
+ }
36
+ }
37
+
38
+ return result;
39
+ }
40
+
18
41
// Replace absent input columns to NULL to perform REPLACE via UPSERT
19
- std::pair<TExprBase, TCoAtomList> CreateRowsToReplace ( const TExprBase& input,
42
+ std::pair<TExprBase, TCoAtomList> ExtendInputRowsWithAbsentNullColumns ( const TKiWriteTable& write, const TExprBase& input,
20
43
const TCoAtomList& inputColumns, const TKikimrTableDescription& tableDesc,
21
44
TPositionHandle pos, TExprContext& ctx)
22
45
{
46
+ TVector<TString> maybeMissingColumnsToReplace;
47
+ const auto op = GetTableOp (write);
48
+ if (op == TYdbOperation::Replace) {
49
+ for (const auto &[name, _]: tableDesc.Metadata ->Columns ) {
50
+ maybeMissingColumnsToReplace.push_back (name);
51
+ }
52
+ }
53
+
54
+ if (op == TYdbOperation::InsertAbort || op == TYdbOperation::InsertRevert || op == TYdbOperation::Upsert) {
55
+ maybeMissingColumnsToReplace = GetMissingInputColumnsForReturning (write, inputColumns);
56
+ if (maybeMissingColumnsToReplace.size () > 0 ) {
57
+ for (const auto & inputCol: inputColumns) {
58
+ maybeMissingColumnsToReplace.push_back (TString (inputCol.Value ()));
59
+ }
60
+ }
61
+ }
62
+
63
+ if (maybeMissingColumnsToReplace.size () == 0 ) {
64
+ return {input, inputColumns};
65
+ }
66
+
23
67
THashSet<TStringBuf> inputColumnsSet;
24
68
for (const auto & name : inputColumns) {
25
69
inputColumnsSet.insert (name.Value ());
@@ -32,7 +76,7 @@ std::pair<TExprBase, TCoAtomList> CreateRowsToReplace(const TExprBase& input,
32
76
TVector<TCoAtom> writeColumns;
33
77
TVector<TExprBase> writeMembers;
34
78
35
- for (const auto & [ name, _] : tableDesc. Metadata -> Columns ) {
79
+ for (const auto & name : maybeMissingColumnsToReplace ) {
36
80
TMaybeNode<TExprBase> memberValue;
37
81
if (tableDesc.GetKeyColumnIndex (name) || inputColumnsSet.contains (name)) {
38
82
memberValue = Build<TCoMember>(ctx, pos)
@@ -225,11 +269,7 @@ std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, co
225
269
input = BuildKqlSequencer (input, table, inputCols, autoIncrement, pos, ctx);
226
270
}
227
271
228
- const bool isWriteReplace = (GetTableOp (write) == TYdbOperation::Replace);
229
- if (isWriteReplace) {
230
- // TODO: don't need it for sinks (can be disabled when secondary indexes are supported inside write actor)
231
- std::tie (input, inputCols) = CreateRowsToReplace (input, inputCols, table, write.Pos (), ctx);
232
- }
272
+ std::tie (input, inputCols) = ExtendInputRowsWithAbsentNullColumns (write, input, inputCols, table, write.Pos (), ctx);
233
273
234
274
auto baseInput = Build<TKqpWriteConstraint>(ctx, pos)
235
275
.Input (input)
@@ -239,15 +279,35 @@ std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, co
239
279
return {baseInput, inputCols};
240
280
}
241
281
282
+
283
+ TCoAtomList ExtendGenerateOnInsertColumnsList (const TKiWriteTable& write, TCoAtomList& generateColumnsIfInsert, const TCoAtomList& inputColumns, const TCoAtomList& autoincrement, TExprContext& ctx) {
284
+ auto inputCols = BuildUpsertInputColumns (inputColumns, autoincrement, write.Pos (), ctx);
285
+ auto maybeMissingColumnsToReplace = GetMissingInputColumnsForReturning (write, inputCols);
286
+ TVector<TExprNode::TPtr> result;
287
+ result.reserve (generateColumnsIfInsert.Ref ().ChildrenSize () + maybeMissingColumnsToReplace.size ());
288
+ for (const auto & item: generateColumnsIfInsert) {
289
+ result.push_back (item.Ptr ());
290
+ }
291
+
292
+ for (auto & name: maybeMissingColumnsToReplace) {
293
+ auto atom = TCoAtom (ctx.NewAtom (write.Pos (), name));
294
+ result.push_back (atom.Ptr ());
295
+ }
296
+
297
+ return Build<TCoAtomList>(ctx, write.Pos ()).Add (result).Done ();
298
+ }
299
+
242
300
TExprBase BuildUpsertTable (const TKiWriteTable& write, const TCoAtomList& inputColumns,
243
301
const TCoAtomList& autoincrement, const bool isSink,
244
302
const TKikimrTableDescription& table, TExprContext& ctx)
245
303
{
246
304
auto generateColumnsIfInsertNode = GetSetting (write.Settings ().Ref (), " generate_columns_if_insert" );
247
305
YQL_ENSURE (generateColumnsIfInsertNode);
248
306
TCoAtomList generateColumnsIfInsert = TCoNameValueTuple (generateColumnsIfInsertNode).Value ().Cast <TCoAtomList>();
249
-
250
307
auto settings = FilterSettings (write.Settings ().Ref (), {" AllowInconsistentWrites" }, ctx);
308
+
309
+ generateColumnsIfInsert = ExtendGenerateOnInsertColumnsList (write, generateColumnsIfInsert, inputColumns, autoincrement, ctx);
310
+
251
311
settings = AddSetting (*settings, write.Pos (), " Mode" , Build<TCoAtom>(ctx, write.Pos ()).Value (" upsert" ).Done ().Ptr (), ctx);
252
312
const auto [input, columns] = BuildWriteInput (write, table, inputColumns, autoincrement, isSink, write.Pos (), ctx);
253
313
if (generateColumnsIfInsert.Ref ().ChildrenSize () > 0 ) {
@@ -283,6 +343,8 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis
283
343
YQL_ENSURE (generateColumnsIfInsertNode);
284
344
TCoAtomList generateColumnsIfInsert = TCoNameValueTuple (generateColumnsIfInsertNode).Value ().Cast <TCoAtomList>();
285
345
346
+ generateColumnsIfInsert = ExtendGenerateOnInsertColumnsList (write, generateColumnsIfInsert, inputColumns, autoincrement, ctx);
347
+
286
348
auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos ())
287
349
.Table (BuildTableMeta (table, write.Pos (), ctx))
288
350
.Input (input.Ptr ())
0 commit comments