@@ -4226,10 +4226,12 @@ void TPersQueue::PushTxInQueue(TDistributedTransaction& tx, TDistributedTransact
4226
4226
void TPersQueue::ChangeTxState (TDistributedTransaction& tx,
4227
4227
TDistributedTransaction::EState newState)
4228
4228
{
4229
- tx.State = newState;
4229
+ PQ_LOG_TX_I (" TxId " << tx.TxId << " moved from " <<
4230
+ NKikimrPQ::TTransaction_EState_Name (tx.State ) <<
4231
+ " to " <<
4232
+ NKikimrPQ::TTransaction_EState_Name (newState));
4230
4233
4231
- PQ_LOG_TX_I (" TxId " << tx.TxId <<
4232
- " , NewState " << NKikimrPQ::TTransaction_EState_Name (tx.State ));
4234
+ tx.State = newState;
4233
4235
}
4234
4236
4235
4237
bool TPersQueue::TryChangeTxState (TDistributedTransaction& tx,
@@ -4261,11 +4263,6 @@ bool TPersQueue::TryChangeTxState(TDistributedTransaction& tx,
4261
4263
PushTxInQueue (tx, newState);
4262
4264
}
4263
4265
4264
- PQ_LOG_TX_I (" TxId " << tx.TxId << " moved from " <<
4265
- NKikimrPQ::TTransaction_EState_Name (oldState) <<
4266
- " to " <<
4267
- NKikimrPQ::TTransaction_EState_Name (newState));
4268
-
4269
4266
return true ;
4270
4267
}
4271
4268
@@ -4347,6 +4344,12 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
4347
4344
PQ_LOG_TX_D (" Wait for TxId " << tx.TxId );
4348
4345
return ;
4349
4346
}
4347
+ if (tx.WriteInProgress ) {
4348
+ if (tx.State == NKikimrPQ::TTransaction::EXECUTED) {
4349
+ PQ_LOG_TX_I (" You cannot send TEvReadSetAck for TxId: " << tx.TxId << " until the EXECUTED state is saved" );
4350
+ }
4351
+ return ;
4352
+ }
4350
4353
4351
4354
switch (tx.State ) {
4352
4355
case NKikimrPQ::TTransaction::UNKNOWN:
@@ -4362,12 +4365,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
4362
4365
break ;
4363
4366
4364
4367
case NKikimrPQ::TTransaction::PREPARING:
4365
- Y_ABORT_UNLESS (tx.WriteInProgress ,
4368
+ Y_ABORT_UNLESS (! tx.WriteInProgress ,
4366
4369
" PQ %" PRIu64 " , TxId %" PRIu64,
4367
4370
TabletID (), tx.TxId );
4368
4371
4369
- tx.WriteInProgress = false ;
4370
-
4371
4372
// scheduled events will be sent to EndWriteTxs
4372
4373
4373
4374
ChangeTxState (tx, NKikimrPQ::TTransaction::PREPARED);
@@ -4386,12 +4387,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
4386
4387
break ;
4387
4388
4388
4389
case NKikimrPQ::TTransaction::PLANNING:
4389
- Y_ABORT_UNLESS (tx.WriteInProgress ,
4390
+ Y_ABORT_UNLESS (! tx.WriteInProgress ,
4390
4391
" PQ %" PRIu64 " , TxId %" PRIu64,
4391
4392
TabletID (), tx.TxId );
4392
4393
4393
- tx.WriteInProgress = false ;
4394
-
4395
4394
// scheduled events will be sent to EndWriteTxs
4396
4395
4397
4396
TryChangeTxState (tx, NKikimrPQ::TTransaction::PLANNED);
@@ -4529,6 +4528,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
4529
4528
Y_ABORT_UNLESS (tx.TxId == TxQueue.front ().second ,
4530
4529
" PQ %" PRIu64 " , TxId %" PRIu64 " , FrontTxId %" PRIu64,
4531
4530
TabletID (), tx.TxId , TxQueue.front ().second );
4531
+ Y_ABORT_UNLESS (!tx.WriteInProgress ,
4532
+ " PQ %" PRIu64 " , TxId %" PRIu64,
4533
+ TabletID (), tx.TxId );
4534
+
4532
4535
TxQueue.pop_front ();
4533
4536
SetTxCompleteLagCounter ();
4534
4537
@@ -4627,6 +4630,8 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
4627
4630
" PQ %" PRIu64 " , TxId %" PRIu64,
4628
4631
TabletID (), txId);
4629
4632
4633
+ tx->WriteInProgress = false ;
4634
+
4630
4635
TryExecuteTxs (ctx, *tx);
4631
4636
}
4632
4637
0 commit comments