Skip to content

Commit 57ae3a3

Browse files
authored
Reset pipeline in datashard init (#11483)
1 parent 790417d commit 57ae3a3

File tree

3 files changed

+35
-0
lines changed

3 files changed

+35
-0
lines changed

ydb/core/tx/datashard/datashard__init.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ bool TDataShard::TTxInit::Execute(TTransactionContext& txc, const TActorContext&
2626
Self->NextSeqno = 1;
2727
Self->NextChangeRecordOrder = 1;
2828
Self->LastChangeRecordGroup = 1;
29+
Self->Pipeline.Reset();
2930
Self->TransQueue.Reset();
3031
Self->SnapshotManager.Reset();
3132
Self->SchemaSnapshotManager.Reset();

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,34 @@ TPipeline::~TPipeline()
4242
}
4343
}
4444

45+
void TPipeline::Reset() {
46+
ImmediateOps.clear();
47+
ActiveOps.clear();
48+
ActivePlannedOps.clear();
49+
DataTxCache.clear();
50+
DelayedAcks.clear();
51+
LastPlannedTx = {0, 0};
52+
LastCompleteTx = {0, 0};
53+
UtmostCompleteTx = {0, 0};
54+
KeepSchemaStep = 0;
55+
LastCleanupTime = 0;
56+
SchemaTx = nullptr;
57+
ExecuteBlockers.clear();
58+
CandidateOps.clear();
59+
CandidateUnits.clear();
60+
NextActiveOp = {};
61+
SlowOpProfiles.clear();
62+
ActiveStreamingTxs.clear();
63+
PredictedPlan.clear();
64+
WaitingSchemeOpsOrder.clear();
65+
WaitingSchemeOps.clear();
66+
WaitingDataTxOps.clear();
67+
CommittingOps.Reset();
68+
CompletingOps.clear();
69+
WaitingDataReadIterators.clear();
70+
WaitingReadIteratorsById.clear();
71+
}
72+
4573
bool TPipeline::Load(NIceDb::TNiceDb& db) {
4674
using Schema = TDataShard::Schema;
4775

ydb/core/tx/datashard/datashard_pipeline.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TPipeline : TNonCopyable {
8585
TPipeline(TDataShard * self);
8686
~TPipeline();
8787

88+
void Reset();
8889
bool Load(NIceDb::TNiceDb& db);
8990
void UpdateConfig(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TPipelineConfig& cfg);
9091

@@ -489,6 +490,11 @@ class TPipeline : TNonCopyable {
489490
}
490491
}
491492

493+
void Reset() {
494+
TxIdMap.clear();
495+
ItemsSet.clear();
496+
}
497+
492498
inline bool HasOpsBelow(TRowVersion upperBound) const {
493499
return bool(ItemsSet) && *ItemsSet.begin() <= upperBound;
494500
}

0 commit comments

Comments
 (0)