Skip to content

Commit fcf6826

Browse files
authored
Stable-24-3: Reset pipeline in datashard init (#11488)
1 parent e3d6f61 commit fcf6826

File tree

3 files changed

+35
-0
lines changed

3 files changed

+35
-0
lines changed

ydb/core/tx/datashard/datashard__init.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ bool TDataShard::TTxInit::Execute(TTransactionContext& txc, const TActorContext&
2626
Self->NextSeqno = 1;
2727
Self->NextChangeRecordOrder = 1;
2828
Self->LastChangeRecordGroup = 1;
29+
Self->Pipeline.Reset();
2930
Self->TransQueue.Reset();
3031
Self->SnapshotManager.Reset();
3132
Self->SchemaSnapshotManager.Reset();

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,34 @@ TPipeline::~TPipeline()
4343
}
4444
}
4545

46+
void TPipeline::Reset() {
47+
ImmediateOps.clear();
48+
ActiveOps.clear();
49+
ActivePlannedOps.clear();
50+
DataTxCache.clear();
51+
DelayedAcks.clear();
52+
LastPlannedTx = {0, 0};
53+
LastCompleteTx = {0, 0};
54+
UtmostCompleteTx = {0, 0};
55+
KeepSchemaStep = 0;
56+
LastCleanupTime = 0;
57+
SchemaTx = nullptr;
58+
ExecuteBlockers.clear();
59+
CandidateOps.clear();
60+
CandidateUnits.clear();
61+
NextActiveOp = {};
62+
SlowOpProfiles.clear();
63+
ActiveStreamingTxs.clear();
64+
PredictedPlan.clear();
65+
WaitingSchemeOpsOrder.clear();
66+
WaitingSchemeOps.clear();
67+
WaitingDataTxOps.clear();
68+
CommittingOps.Reset();
69+
CompletingOps.clear();
70+
WaitingDataReadIterators.clear();
71+
WaitingReadIteratorsById.clear();
72+
}
73+
4674
bool TPipeline::Load(NIceDb::TNiceDb& db) {
4775
using Schema = TDataShard::Schema;
4876

ydb/core/tx/datashard/datashard_pipeline.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TPipeline : TNonCopyable {
8585
TPipeline(TDataShard * self);
8686
~TPipeline();
8787

88+
void Reset();
8889
bool Load(NIceDb::TNiceDb& db);
8990
void UpdateConfig(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TPipelineConfig& cfg);
9091

@@ -487,6 +488,11 @@ class TPipeline : TNonCopyable {
487488
}
488489
}
489490

491+
void Reset() {
492+
TxIdMap.clear();
493+
ItemsSet.clear();
494+
}
495+
490496
inline bool HasOpsBelow(TRowVersion upperBound) const {
491497
return bool(ItemsSet) && *ItemsSet.begin() <= upperBound;
492498
}

0 commit comments

Comments
 (0)