@@ -112,14 +112,12 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
112
112
}
113
113
114
114
TOperation::TPtr operation = new TOperation (txId);
115
- Operations[txId] = operation; // record is erased at ApplyOnExecute if all parts are done at propose
116
115
117
116
for (const auto & transaction : record.GetTransaction ()) {
118
117
auto quotaResult = operation->ConsumeQuota (transaction, context);
119
118
if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
120
119
response.Reset (new TProposeResponse (quotaResult.Status , ui64 (txId), ui64 (selfId)));
121
120
response->SetError (quotaResult.Status , quotaResult.Reason );
122
- Operations.erase (txId);
123
121
return std::move (response);
124
122
}
125
123
}
@@ -139,7 +137,6 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
139
137
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
140
138
response.Reset (new TProposeResponse (splitResult.Status , ui64 (txId), ui64 (selfId)));
141
139
response->SetError (splitResult.Status , splitResult.Reason );
142
- Operations.erase (txId);
143
140
return std::move (response);
144
141
}
145
142
@@ -148,11 +145,15 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
148
145
149
146
const TString owner = record.HasOwner () ? record.GetOwner () : BUILTIN_ACL_ROOT;
150
147
148
+ bool prevProposeUndoSafe = true ;
149
+
150
+ Operations[txId] = operation; // record is erased at ApplyOnExecute if all parts are done at propose
151
+
151
152
for (const auto & transaction : transactions) {
152
153
auto parts = operation->ConstructParts (transaction, context);
153
154
154
155
if (parts.size () > 1 ) {
155
- // les't allow altering impl index tables as part of consistent operation
156
+ // allow altering impl index tables as part of consistent operation
156
157
context.IsAllowedPrivateTables = true ;
157
158
}
158
159
@@ -206,25 +207,77 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
206
207
<< " , with reason: " << response->Record .GetReason ()
207
208
<< " , tx message: " << PrintSecurely (record));
208
209
209
- context.OnComplete = {}; // recreate
210
- context.DbChanges = {};
210
+ AbortOperationPropose (txId, context);
211
211
212
- for (auto & toAbort : operation->Parts ) {
213
- toAbort->AbortPropose (context);
214
- }
212
+ return std::move (response);
213
+ }
215
214
216
- context. MemChanges . UnDo (context. SS );
217
- context.OnComplete . ApplyOnExecute (context. SS , context. GetTxc (), context. Ctx );
218
- Operations. erase (txId) ;
215
+ // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
216
+ if (prevProposeUndoSafe && ! context.IsUndoChangesSafe ()) {
217
+ prevProposeUndoSafe = false ;
219
218
220
- return std::move (response);
219
+ LOG_WARN_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD,
220
+ " Operation part proposed ok, but propose itself is undo unsafe"
221
+ << " , suboperation type: " << NKikimrSchemeOp::EOperationType_Name (part->GetTransaction ().GetOperationType ())
222
+ << " , opId: " << part->GetOperationId ()
223
+ << " , at schemeshard: " << selfId
224
+ );
221
225
}
222
226
}
223
227
}
224
228
225
229
return std::move (response);
226
230
}
227
231
232
+ void TSchemeShard::AbortOperationPropose (const TTxId txId, TOperationContext& context) {
233
+ Y_ABORT_UNLESS (Operations.contains (txId));
234
+ TOperation::TPtr operation = Operations.at (txId);
235
+
236
+ // Drop operation side effects, undo memory changes
237
+ // (Local db changes were already applied)
238
+ context.OnComplete = {};
239
+ context.DbChanges = {};
240
+
241
+ for (auto & i : operation->Parts ) {
242
+ i->AbortPropose (context);
243
+ }
244
+
245
+ context.MemChanges .UnDo (context.SS );
246
+
247
+ // And remove aborted operation from existence
248
+ Operations.erase (txId);
249
+ }
250
+
251
+ void AbortOperation (TOperationContext& context, const TTxId txId, const TString& reason) {
252
+ LOG_ERROR_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD, " TTxOperationPropose Execute"
253
+ << " , txId: " << txId
254
+ << " , operation is rejected and all changes reverted"
255
+ << " , " << reason
256
+ << " , at schemeshard: " << context.SS ->SelfTabletId ()
257
+ );
258
+
259
+ context.GetTxc ().DB .RollbackChanges ();
260
+ context.SS ->AbortOperationPropose (txId, context);
261
+ }
262
+
263
+ bool IsCommitRedoSizeOverLimit (TString* reason, TOperationContext& context) {
264
+ // MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor.
265
+ // We subtract from MaxCommitRedoMB additional 1MB for anything extra
266
+ // that executor/tablet may (or may not) add under the hood
267
+ const ui64 limitBytes = (context.SS ->MaxCommitRedoMB - 1 ) << 20 ; // MB to bytes
268
+ const ui64 commitRedoBytes = context.GetTxc ().DB .GetCommitRedoBytes ();
269
+ if (commitRedoBytes >= limitBytes) {
270
+ *reason = TStringBuilder ()
271
+ << " local tx commit redo size generated by IgniteOperation() is more than allowed limit: "
272
+ << " commit redo size " << commitRedoBytes
273
+ << " , limit " << limitBytes
274
+ << " , excess " << (commitRedoBytes - limitBytes)
275
+ ;
276
+ return true ;
277
+ }
278
+ return false ;
279
+ }
280
+
228
281
struct TSchemeShard ::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
229
282
using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;
230
283
@@ -244,6 +297,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
244
297
245
298
bool Execute (NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
246
299
TTabletId selfId = Self->SelfTabletId ();
300
+ auto txId = TTxId (Request->Get ()->Record .GetTxId ());
247
301
248
302
LOG_DEBUG_S (ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
249
303
" TTxOperationPropose Execute"
@@ -254,7 +308,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
254
308
255
309
auto [userToken, tokenParseError] = ParseUserToken (Request->Get ()->Record .GetUserToken ());
256
310
if (tokenParseError) {
257
- auto txId = Request->Get ()->Record .GetTxId ();
258
311
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64 (txId), ui64 (selfId), " Failed to parse user token" );
259
312
return true ;
260
313
}
@@ -266,10 +319,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
266
319
TStorageChanges dbChanges;
267
320
TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move (userToken)};
268
321
322
+ // NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context.
323
+ // Unsuccessful IgniteOperation will leave no operation and context will also be clean.
269
324
Response = Self->IgniteOperation (*Request->Get (), context);
270
325
271
- OnComplete.ApplyOnExecute (Self, txc, ctx);
326
+ // NOTE: Successfully created operation also must be checked for the size of this local tx.
327
+ //
328
+ // Limitation on a commit redo size of local transactions is imposed at the tablet executor level
329
+ // (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()).
330
+ // And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately.
331
+ //
332
+ // So even if operation was ignited successfully, it's local tx size still must be checked
333
+ // as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose
334
+ // persisted operation again, hitting commit redo size limit and restarting again.
335
+ //
336
+ // On unsuccessful check, local tx should be rolled back, operation should be rejected and
337
+ // all accumulated changes dropped or reverted.
338
+ //
339
+
340
+ // Actually build commit redo (dbChanges could be empty)
272
341
dbChanges.Apply (Self, txc, ctx);
342
+
343
+ if (Self->Operations .contains (txId)) {
344
+ Y_ABORT_UNLESS (Response->IsDone () || Response->IsAccepted () || Response->IsConditionalAccepted ());
345
+
346
+ // Check local tx commit redo size
347
+ TString reason;
348
+ if (IsCommitRedoSizeOverLimit (&reason, context)) {
349
+ Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusSchemeError, ui64 (txId), ui64 (selfId), reason);
350
+
351
+ AbortOperation (context, txId, reason);
352
+
353
+ if (!context.IsUndoChangesSafe ()) {
354
+ LOG_ERROR_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD, " TTxOperationPropose Execute"
355
+ << " , opId: " << txId
356
+ << " , operation should be rejected and all changes be reverted"
357
+ << " , but context.IsUndoChangesSafe is false, which means some direct writes have been done"
358
+ << " , message: " << PrintSecurely (Request->Get ()->Record )
359
+ << " , at schemeshard: " << context.SS ->SelfTabletId ()
360
+ );
361
+ }
362
+ }
363
+ }
364
+
365
+ // Apply accumulated changes (changes could be empty)
366
+ OnComplete.ApplyOnExecute (Self, txc, ctx);
367
+
273
368
return true ;
274
369
}
275
370
0 commit comments