Skip to content

Commit 892228d

Browse files
fix race on enqueue tx_commit in case long synchronization (#8588)
1 parent 946c37e commit 892228d

File tree

7 files changed

+18
-15
lines changed

7 files changed

+18
-15
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
4343
LOG_S_ERROR(TxPrefix() << " (" << changes->TypeString() << ") cannot write index blobs" << TxSuffix());
4444
}
4545

46-
Self->EnqueueProgressTx(ctx);
46+
Self->EnqueueProgressTx(ctx, std::nullopt);
4747
return true;
4848
}
4949

ydb/core/tx/columnshard/columnshard.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
6161
SignalTabletActive(ctx);
6262
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SignalTabletActive");
6363
TryRegisterMediatorTimeCast();
64-
EnqueueProgressTx(ctx);
64+
EnqueueProgressTx(ctx, std::nullopt);
6565
}
6666
Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit());
6767
EnqueueBackgroundActivities();

ydb/core/tx/columnshard/columnshard__plan_step.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext& ctx) {
105105
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLAN_STEP_ACCEPTED);
106106

107107
if (plannedCount > 0 || Self->ProgressTxController->HaveOutdatedTxs()) {
108-
Self->EnqueueProgressTx(ctx);
108+
Self->EnqueueProgressTx(ctx, std::nullopt);
109109
}
110110
return true;
111111
}

ydb/core/tx/columnshard/columnshard__progress_tx.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
4848
TxOperator = Self->ProgressTxController->GetTxOperatorVerified(txId);
4949
if (auto txPrepare = TxOperator->BuildTxPrepareForProgress(Self)) {
5050
AbortedThroughRemoveExpired = true;
51-
Self->ProgressTxInFlight = false;
51+
Self->ProgressTxInFlight = txId;
5252
Self->Execute(txPrepare.release(), ctx);
5353
return true;
5454
} else {
@@ -66,9 +66,9 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
6666
Self->ProgressTxController->FinishPlannedTx(txId, txc);
6767
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLANNED_TX_COMPLETED);
6868
}
69-
Self->ProgressTxInFlight = false;
69+
Self->ProgressTxInFlight = std::nullopt;
7070
if (!!Self->ProgressTxController->GetPlannedTx()) {
71-
Self->EnqueueProgressTx(ctx);
71+
Self->EnqueueProgressTx(ctx, std::nullopt);
7272
}
7373
return true;
7474
}
@@ -93,10 +93,13 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
9393
}
9494
};
9595

96-
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
96+
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId) {
9797
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID());
98-
if (!ProgressTxInFlight) {
99-
ProgressTxInFlight = true;
98+
if (continueTxId) {
99+
AFL_VERIFY(!ProgressTxInFlight || ProgressTxInFlight == continueTxId)("current", ProgressTxInFlight)("expected", continueTxId);
100+
}
101+
if (!ProgressTxInFlight || ProgressTxInFlight == continueTxId) {
102+
ProgressTxInFlight = continueTxId.value_or(0);
100103
Execute(new TTxProgressTx(this), ctx);
101104
}
102105
}

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ class TColumnShard
468468
NOlap::NResourceBroker::NSubscribe::TTaskContext CompactTaskSubscription;
469469
NOlap::NResourceBroker::NSubscribe::TTaskContext TTLTaskSubscription;
470470

471-
bool ProgressTxInFlight = false;
471+
std::optional<ui64> ProgressTxInFlight;
472472
THashMap<ui64, TInstant> ScanTxInFlight;
473473
THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites;
474474
using TPartsForLTXShard = THashMap<ui32, TLongTxWriteInfo*>;
@@ -539,7 +539,7 @@ class TColumnShard
539539
public:
540540
ui64 TabletTxCounter = 0;
541541

542-
void EnqueueProgressTx(const TActorContext& ctx);
542+
void EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId);
543543
NOlap::TSnapshot GetLastTxSnapshot() const {
544544
return NOlap::TSnapshot(LastPlannedStep, LastPlannedTxId);
545545
}

ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
166166
void CheckFinished(TColumnShard& owner) {
167167
if (WaitShardsResultAck.empty()) {
168168
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "finished");
169-
owner.EnqueueProgressTx(NActors::TActivationContext::AsActorContext());
169+
owner.EnqueueProgressTx(NActors::TActivationContext::AsActorContext(), GetTxId());
170170
}
171171
}
172172

ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
7777
op->ReceiveAck = true;
7878
if (!op->NeedReceiveBroken) {
7979
op->TxBroken = false;
80-
Self->EnqueueProgressTx(ctx);
80+
Self->EnqueueProgressTx(ctx, TxId);
8181
}
8282
}
8383

@@ -118,7 +118,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
118118
if (BrokenFlag) {
119119
Self->GetProgressTxController().CompleteOnCancel(TxId, ctx);
120120
}
121-
Self->EnqueueProgressTx(ctx);
121+
Self->EnqueueProgressTx(ctx, TxId);
122122
}
123123

124124
public:
@@ -154,7 +154,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
154154

155155
virtual void DoOnTabletInit(TColumnShard& owner) override {
156156
if (TxBroken || (ReceiveAck && !NeedReceiveBroken)) {
157-
owner.EnqueueProgressTx(NActors::TActivationContext::AsActorContext());
157+
owner.EnqueueProgressTx(NActors::TActivationContext::AsActorContext(), GetTxId());
158158
} else if (!ReceiveAck) {
159159
SendResult(owner);
160160
}

0 commit comments

Comments
 (0)