Skip to content

Commit 15da817

Browse files
Fix broken txs (#8714)
1 parent 376e237 commit 15da817

File tree

7 files changed

+119
-47
lines changed

7 files changed

+119
-47
lines changed

ydb/core/tx/columnshard/columnshard__progress_tx.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
6363
}
6464

6565
AFL_VERIFY(TxOperator->ProgressOnExecute(*Self, NOlap::TSnapshot(step, txId), txc));
66-
Self->ProgressTxController->FinishPlannedTx(txId, txc);
66+
Self->ProgressTxController->ProgressOnExecute(txId, txc);
6767
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLANNED_TX_COMPLETED);
6868
}
6969
Self->ProgressTxInFlight = std::nullopt;
@@ -84,7 +84,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
8484
Self->RescheduleWaitingReads();
8585
}
8686
if (PlannedQueueItem) {
87-
Self->GetProgressTxController().CompleteRunningTx(*PlannedQueueItem);
87+
Self->GetProgressTxController().ProgressOnComplete(*PlannedQueueItem);
8888
}
8989
if (LastCompletedTx) {
9090
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
3131
const auto txKind = record.GetTxKind();
3232
const ui64 txId = record.GetTxId();
3333
const auto& txBody = record.GetTxBody();
34-
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("tx_id", txId)("this", (ui64)this);
34+
NActors::TLogContextGuard lGuard =
35+
NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("tx_id", txId)("this", (ui64)this);
3536

3637
if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
3738
auto proposeResult = ProposeTtlDeprecated(txBody);
@@ -51,7 +52,7 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
5152
Self->CurrentSchemeShardId = record.GetSchemeShardId();
5253
Schema::SaveSpecialValue(db, Schema::EValueIds::CurrentSchemeShardId, Self->CurrentSchemeShardId);
5354
} else {
54-
Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
55+
AFL_VERIFY(Self->CurrentSchemeShardId == record.GetSchemeShardId());
5556
}
5657
}
5758
std::optional<TMessageSeqNo> msgSeqNo;
@@ -79,28 +80,32 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
7980
AFL_VERIFY(!!TxOperator);
8081
AFL_VERIFY(!!TxInfo);
8182
const ui64 txId = record.GetTxId();
82-
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())("request_tx", TxInfo->DebugString())(
83-
"this", (ui64)this)("op_tx", TxOperator->GetTxInfo().DebugString());
83+
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tablet_id", Self->TabletID())(
84+
"request_tx", TxInfo->DebugString())("this", (ui64)this)("op_tx", TxOperator->GetTxInfo().DebugString());
85+
86+
Self->TryRegisterMediatorTimeCast();
8487

8588
if (TxOperator->IsFail()) {
8689
TxOperator->SendReply(*Self, ctx);
90+
return;
91+
}
92+
auto internalOp = Self->GetProgressTxController().GetTxOperatorOptional(txId);
93+
if (!internalOp) {
94+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "removed tx operator");
95+
}
96+
NActors::TLogContextGuard lGuardTx = NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString());
97+
if (!internalOp->CheckTxInfoForReply(*TxInfo)) {
98+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "deprecated tx operator");
99+
return;
100+
}
101+
102+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "actual tx operator");
103+
if (internalOp->IsAsync()) {
104+
Self->GetProgressTxController().StartProposeOnComplete(*internalOp, ctx);
87105
} else {
88-
auto internalOp = Self->GetProgressTxController().GetTxOperatorVerified(TxOperator->GetTxId());
89-
NActors::TLogContextGuard lGuardTx = NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString());
90-
if (!TxOperator->CheckTxInfoForReply(*TxInfo)) {
91-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "deprecated tx operator");
92-
return;
93-
} else {
94-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "actual tx operator");
95-
}
96-
if (TxOperator->IsAsync()) {
97-
Self->GetProgressTxController().StartProposeOnComplete(txId, ctx);
98-
} else {
99-
Self->GetProgressTxController().FinishProposeOnComplete(txId, ctx);
100-
}
106+
Self->GetProgressTxController().FinishProposeOnComplete(*internalOp, ctx);
101107
}
102108

103-
Self->TryRegisterMediatorTimeCast();
104109
}
105110

106111
TTxType GetTxType() const override {
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#include "broken_txs.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
namespace NKikimr::NOlap {
7+
8+
TConclusion<std::vector<INormalizerTask::TPtr>> TBrokenTxsNormalizer::DoInit(
9+
const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
10+
NIceDb::TNiceDb db(txc.DB);
11+
12+
using namespace NColumnShard;
13+
auto rowset = db.Table<Schema::TxInfo>().GreaterOrEqual(0).Select();
14+
if (!rowset.IsReady()) {
15+
return TConclusionStatus::Fail("cannot read TxInfo");
16+
}
17+
while (!rowset.EndOfSet()) {
18+
const ui64 txId = rowset.GetValue<Schema::TxInfo::TxId>();
19+
if (!rowset.HaveValue<Schema::TxInfo::TxKind>()) {
20+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("tx_id", txId)("event", "removed_by_normalizer")("condition", "no_kind");
21+
Schema::EraseTxInfo(db, txId);
22+
}
23+
24+
if (!rowset.Next()) {
25+
return TConclusionStatus::Fail("cannot read TxInfo");
26+
}
27+
}
28+
return std::vector<INormalizerTask::TPtr>();
29+
}
30+
31+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/columnshard_schema.h>
5+
6+
7+
namespace NKikimr::NOlap {
8+
9+
class TBrokenTxsNormalizer: public TNormalizationController::INormalizerComponent {
10+
public:
11+
static TString GetClassNameStatic() {
12+
return "BrokenTxsNormalizer";
13+
}
14+
private:
15+
class TNormalizerResult;
16+
17+
static const inline INormalizerComponent::TFactory::TRegistrator<TBrokenTxsNormalizer> Registrator =
18+
INormalizerComponent::TFactory::TRegistrator<TBrokenTxsNormalizer>(GetClassNameStatic());
19+
20+
public:
21+
TBrokenTxsNormalizer(const TNormalizationController::TInitContext&) {
22+
}
23+
24+
virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
25+
return {};
26+
}
27+
28+
virtual TString GetClassName() const override {
29+
return GetClassNameStatic();
30+
}
31+
32+
virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
33+
};
34+
35+
}

ydb/core/tx/columnshard/normalizer/tablet/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
GLOBAL gc_counters.cpp
5+
GLOBAL broken_txs.cpp
56
)
67

78
PEERDIR(

ydb/core/tx/columnshard/transactions/tx_controller.cpp

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -201,17 +201,16 @@ std::optional<TTxController::TTxInfo> TTxController::PopFirstPlannedTx() {
201201
return std::nullopt;
202202
}
203203

204-
void TTxController::FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
204+
void TTxController::ProgressOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
205205
NIceDb::TNiceDb db(txc.DB);
206206
auto opIt = Operators.find(txId);
207-
if (opIt != Operators.end()) {
208-
Counters.OnFinishPlannedTx(opIt->second->GetOpType());
209-
}
207+
AFL_VERIFY(opIt != Operators.end())("tx_id", txId);
208+
Counters.OnFinishPlannedTx(opIt->second->GetOpType());
209+
AFL_VERIFY(Operators.erase(txId));
210210
Schema::EraseTxInfo(db, txId);
211211
}
212212

213-
void TTxController::CompleteRunningTx(const TPlanQueueItem& txItem) {
214-
AFL_VERIFY(Operators.erase(txItem.TxId));
213+
void TTxController::ProgressOnComplete(const TPlanQueueItem& txItem) {
215214
AFL_VERIFY(RunningQueue.erase(txItem))("info", txItem.DebugString());
216215
}
217216

@@ -347,15 +346,12 @@ std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartPropose
347346
}
348347
}
349348

350-
void TTxController::StartProposeOnComplete(const ui64 txId, const TActorContext& ctx) {
351-
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnComplete")("tx_id", txId);
352-
if (auto txOperator = GetTxOperatorOptional(txId)) {
353-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
354-
txOperator->StartProposeOnComplete(Owner, ctx);
355-
Counters.OnStartProposeOnComplete(txOperator->GetOpType());
356-
} else {
357-
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId);
358-
}
349+
void TTxController::StartProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) {
350+
NActors::TLogContextGuard lGuard =
351+
NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnComplete")("tx_id", txOperator.GetTxId());
352+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
353+
txOperator.StartProposeOnComplete(Owner, ctx);
354+
Counters.OnStartProposeOnComplete(txOperator.GetOpType());
359355
}
360356

361357
void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
@@ -369,19 +365,24 @@ void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor:
369365
}
370366
}
371367

368+
void TTxController::FinishProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) {
369+
NActors::TLogContextGuard lGuard =
370+
NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnComplete")("tx_id", txOperator.GetTxId());
371+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start")("tx_info", txOperator.GetTxInfo().DebugString());
372+
TTxController::TProposeResult proposeResult = txOperator.GetProposeStartInfoVerified();
373+
AFL_VERIFY(!txOperator.IsFail());
374+
txOperator.FinishProposeOnComplete(Owner, ctx);
375+
txOperator.SendReply(Owner, ctx);
376+
Counters.OnFinishProposeOnComplete(txOperator.GetOpType());
377+
}
378+
372379
void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx) {
373-
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnComplete")("tx_id", txId);
374380
auto txOperator = GetTxOperatorOptional(txId);
375381
if (!txOperator) {
376382
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId);
377383
return;
378384
}
379-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start")("tx_info", txOperator->GetTxInfo().DebugString());
380-
TTxController::TProposeResult proposeResult = txOperator->GetProposeStartInfoVerified();
381-
AFL_VERIFY(!txOperator->IsFail());
382-
txOperator->FinishProposeOnComplete(Owner, ctx);
383-
txOperator->SendReply(Owner, ctx);
384-
Counters.OnFinishProposeOnComplete(txOperator->GetOpType());
385+
return FinishProposeOnComplete(*txOperator, ctx);
385386
}
386387

387388
void TTxController::ITransactionOperator::SwitchStateVerified(const EStatus from, const EStatus to) {

ydb/core/tx/columnshard/transactions/tx_controller.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,10 +441,9 @@ class TTxController {
441441

442442
[[nodiscard]] std::shared_ptr<TTxController::ITransactionOperator> StartProposeOnExecute(
443443
const TTxController::TTxInfo& txInfo, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc);
444-
void StartProposeOnComplete(const ui64 txId, const TActorContext& ctx);
445-
444+
void StartProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx);
446445
void FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
447-
446+
void FinishProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx);
448447
void FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx);
449448

450449
void WriteTxOperatorInfo(NTabletFlatExecutor::TTransactionContext& txc, const ui64 txId, const TString& data) {
@@ -456,8 +455,8 @@ class TTxController {
456455

457456
std::optional<TTxInfo> GetFirstPlannedTx() const;
458457
std::optional<TTxInfo> PopFirstPlannedTx();
459-
void FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
460-
void CompleteRunningTx(const TPlanQueueItem& tx);
458+
void ProgressOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
459+
void ProgressOnComplete(const TPlanQueueItem& tx);
461460

462461
std::optional<TPlanQueueItem> GetPlannedTx() const;
463462
TPlanQueueItem GetFrontTx() const;

0 commit comments

Comments
 (0)