Skip to content

Commit 6796e3f

Browse files
[-] flag tx.WriteInProgress
1 parent 7f296f3 commit 6796e3f

File tree

1 file changed

+19
-14
lines changed

1 file changed

+19
-14
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4226,10 +4226,12 @@ void TPersQueue::PushTxInQueue(TDistributedTransaction& tx, TDistributedTransact
42264226
void TPersQueue::ChangeTxState(TDistributedTransaction& tx,
42274227
TDistributedTransaction::EState newState)
42284228
{
4229-
tx.State = newState;
4229+
PQ_LOG_TX_I("TxId " << tx.TxId << " moved from " <<
4230+
NKikimrPQ::TTransaction_EState_Name(tx.State) <<
4231+
" to " <<
4232+
NKikimrPQ::TTransaction_EState_Name(newState));
42304233

4231-
PQ_LOG_TX_I("TxId " << tx.TxId <<
4232-
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
4234+
tx.State = newState;
42334235
}
42344236

42354237
bool TPersQueue::TryChangeTxState(TDistributedTransaction& tx,
@@ -4261,11 +4263,6 @@ bool TPersQueue::TryChangeTxState(TDistributedTransaction& tx,
42614263
PushTxInQueue(tx, newState);
42624264
}
42634265

4264-
PQ_LOG_TX_I("TxId " << tx.TxId << " moved from " <<
4265-
NKikimrPQ::TTransaction_EState_Name(oldState) <<
4266-
" to " <<
4267-
NKikimrPQ::TTransaction_EState_Name(newState));
4268-
42694266
return true;
42704267
}
42714268

@@ -4347,6 +4344,12 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43474344
PQ_LOG_TX_D("Wait for TxId " << tx.TxId);
43484345
return;
43494346
}
4347+
if (tx.WriteInProgress) {
4348+
if (tx.State == NKikimrPQ::TTransaction::EXECUTED) {
4349+
PQ_LOG_TX_I("You cannot send TEvReadSetAck for TxId: " << tx.TxId << " until the EXECUTED state is saved");
4350+
}
4351+
return;
4352+
}
43504353

43514354
switch (tx.State) {
43524355
case NKikimrPQ::TTransaction::UNKNOWN:
@@ -4362,12 +4365,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43624365
break;
43634366

43644367
case NKikimrPQ::TTransaction::PREPARING:
4365-
Y_ABORT_UNLESS(tx.WriteInProgress,
4368+
Y_ABORT_UNLESS(!tx.WriteInProgress,
43664369
"PQ %" PRIu64 ", TxId %" PRIu64,
43674370
TabletID(), tx.TxId);
43684371

4369-
tx.WriteInProgress = false;
4370-
43714372
// scheduled events will be sent to EndWriteTxs
43724373

43734374
ChangeTxState(tx, NKikimrPQ::TTransaction::PREPARED);
@@ -4386,12 +4387,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43864387
break;
43874388

43884389
case NKikimrPQ::TTransaction::PLANNING:
4389-
Y_ABORT_UNLESS(tx.WriteInProgress,
4390+
Y_ABORT_UNLESS(!tx.WriteInProgress,
43904391
"PQ %" PRIu64 ", TxId %" PRIu64,
43914392
TabletID(), tx.TxId);
43924393

4393-
tx.WriteInProgress = false;
4394-
43954394
// scheduled events will be sent to EndWriteTxs
43964395

43974396
TryChangeTxState(tx, NKikimrPQ::TTransaction::PLANNED);
@@ -4529,6 +4528,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
45294528
Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second,
45304529
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
45314530
TabletID(), tx.TxId, TxQueue.front().second);
4531+
Y_ABORT_UNLESS(!tx.WriteInProgress,
4532+
"PQ %" PRIu64 ", TxId %" PRIu64,
4533+
TabletID(), tx.TxId);
4534+
45324535
TxQueue.pop_front();
45334536
SetTxCompleteLagCounter();
45344537

@@ -4627,6 +4630,8 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
46274630
"PQ %" PRIu64 ", TxId %" PRIu64,
46284631
TabletID(), txId);
46294632

4633+
tx->WriteInProgress = false;
4634+
46304635
TryExecuteTxs(ctx, *tx);
46314636
}
46324637

0 commit comments

Comments
 (0)