Skip to content

Commit 5e18f9f

Browse files
compaction tracing (#19369)
1 parent 6888c54 commit 5e18f9f

File tree

12 files changed

+138
-68
lines changed

12 files changed

+138
-68
lines changed

ydb/core/tx/columnshard/columnshard__write_index.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@ class TDiskResourcesRequest: public NLimiter::IResourceRequest {
1717
private:
1818
using TBase = NLimiter::IResourceRequest;
1919
std::shared_ptr<NOlap::TCompactedWriteController> WriteController;
20+
const std::shared_ptr<NOlap::TColumnEngineChanges> Changes;
2021
const ui64 TabletId;
2122

2223
private:
2324
virtual void DoOnResourceAllocated() override {
25+
Changes->SetStage(NOlap::NChanges::EStage::Writing);
2426
NActors::TActivationContext::AsActorContext().Register(CreateWriteActor(TabletId, WriteController, TInstant::Max()));
2527
}
2628

2729
public:
28-
TDiskResourcesRequest(const std::shared_ptr<NOlap::TCompactedWriteController>& writeController, const ui64 tabletId)
30+
TDiskResourcesRequest(const std::shared_ptr<NOlap::TCompactedWriteController>& writeController, const ui64 tabletId,
31+
const std::shared_ptr<NOlap::TColumnEngineChanges>& changes)
2932
: TBase(writeController->GetWriteVolume())
3033
, WriteController(writeController)
34+
, Changes(changes)
3135
, TabletId(tabletId)
3236
{
3337

@@ -38,29 +42,33 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
3842
auto putStatus = ev->Get()->GetPutStatus();
3943

4044
if (putStatus == NKikimrProto::UNKNOWN) {
45+
const auto change = ev->Get()->IndexChanges;
4146
if (IsAnyChannelYellowStop()) {
4247
ACFL_ERROR("event", "TEvWriteIndex failed")("reason", "channel yellow stop");
4348

4449
Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE);
4550
ev->Get()->SetPutStatus(NKikimrProto::TRYLATER);
4651
NOlap::TChangesFinishContext context("out of disk space");
47-
ev->Get()->IndexChanges->Abort(*this, context);
52+
change->Abort(*this, context);
4853
ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
4954
} else {
50-
ACFL_DEBUG("event", "TEvWriteIndex")("count", ev->Get()->IndexChanges->GetWritePortionsCount());
51-
AFL_VERIFY(ev->Get()->IndexChanges->GetWritePortionsCount());
52-
const bool needDiskLimiter = ev->Get()->IndexChanges->NeedDiskWriteLimiter();
55+
ACFL_DEBUG("event", "TEvWriteIndex")("count", change->GetWritePortionsCount());
56+
AFL_VERIFY(change->GetWritePortionsCount());
57+
const bool needDiskLimiter = change->NeedDiskWriteLimiter();
5358
auto writeController = std::make_shared<NOlap::TCompactedWriteController>(ctx.SelfID, ev->Release());
5459
const TConclusion<bool> needDraftTransaction = writeController->GetBlobsAction().NeedDraftWritingTransaction();
5560
AFL_VERIFY(needDraftTransaction.IsSuccess())("error", needDraftTransaction.GetErrorMessage());
5661
if (*needDraftTransaction) {
5762
ACFL_DEBUG("event", "TTxWriteDraft");
63+
change->SetStage(NOlap::NChanges::EStage::WriteDraft);
5864
Execute(new TTxWriteDraft(this, writeController));
5965
} else if (needDiskLimiter) {
6066
ACFL_DEBUG("event", "Limiter");
61-
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID()));
67+
change->SetStage(NOlap::NChanges::EStage::AskDiskQuota);
68+
NLimiter::TCompDiskOperator::AskResource(std::make_shared<TDiskResourcesRequest>(writeController, TabletID(), change));
6269
} else {
6370
ACFL_DEBUG("event", "WriteActor");
71+
change->SetStage(NOlap::NChanges::EStage::Writing);
6472
Register(CreateWriteActor(TabletID(), writeController, TInstant::Max()));
6573
}
6674
}

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,9 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
592592
}
593593
TxEvent->IndexChanges->Blobs = ExtractBlobsData();
594594
const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges);
595-
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId, LastCompletedTx);
595+
TxEvent->IndexChanges->SetStage(NOlap::NChanges::EStage::ReadyForConstruct);
596+
std::shared_ptr<NConveyor::ITask> task =
597+
std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId, LastCompletedTx);
596598
if (isInsert) {
597599
NConveyor::TInsertServiceOperator::SendTaskToExecute(task);
598600
} else {
@@ -849,21 +851,27 @@ void TColumnShard::SetupCompaction(const std::set<TInternalPathId>& pathIds) {
849851
class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITask {
850852
private:
851853
using TBase = NOlap::NResourceBroker::NSubscribe::ITask;
854+
std::shared_ptr<NOlap::TColumnEngineChanges> ChangeTask;
852855
std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
853856
std::shared_ptr<TDataAccessorsSubscriberBase> Subscriber;
854857
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
855858

856859
virtual void DoOnAllocationSuccess(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
857860
Subscriber->SetResourcesGuard(guard);
858861
Request->RegisterSubscriber(Subscriber);
862+
if (ChangeTask) {
863+
ChangeTask->SetStage(NOlap::NChanges::EStage::AskAccessors);
864+
}
859865
DataAccessorsManager->AskData(Request);
860866
}
861867

862868
public:
863869
TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
864870
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriberBase>& subscriber,
865-
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
871+
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
872+
const std::shared_ptr<NOlap::TColumnEngineChanges>& changeTask)
866873
: TBase(0, memory, externalTaskId, context)
874+
, ChangeTask(changeTask)
867875
, Request(std::move(request))
868876
, Subscriber(subscriber)
869877
, DataAccessorsManager(dataAccessorsManager) {
@@ -879,6 +887,7 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea
879887
const TString externalTaskId = Changes->GetTaskIdentifier();
880888
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId);
881889

890+
Changes->SetStage(NOlap::NChanges::EStage::ReadBlobs);
882891
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, CacheDataAfterWrite);
883892
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
884893
std::make_shared<TCompactChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
@@ -898,21 +907,23 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
898907
return;
899908
}
900909

901-
auto compaction = dynamic_pointer_cast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges>(indexChanges);
902-
compaction->SetActivityFlag(GetTabletActivity());
903-
compaction->SetQueueGuard(guard);
904-
compaction->Start(*this);
910+
auto& compaction = *VerifyDynamicCast<NOlap::NCompaction::TGeneralCompactColumnEngineChanges*>(indexChanges.get());
911+
compaction.SetActivityFlag(GetTabletActivity());
912+
compaction.SetQueueGuard(guard);
913+
compaction.Start(*this);
905914

906915
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
907-
auto request = compaction->ExtractDataAccessorsRequest();
916+
auto request = compaction.ExtractDataAccessorsRequest();
908917
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) +
909918
indexChanges->CalcMemoryForUsage();
910919
const auto subscriber = std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
911920
Settings.CacheDataAfterCompaction, SelfId(), TabletID(), Counters.GetCompactionCounters(), GetLastCompletedTx(),
912921
CompactTaskSubscription);
913-
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
914-
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier(),
915-
CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
922+
compaction.SetStage(NOlap::NChanges::EStage::AskResources);
923+
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor,
924+
std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier(), CompactTaskSubscription,
925+
std::move(request),
926+
subscriber, DataAccessorsManager.GetObjectPtrVerified(), indexChanges));
916927
}
917928

918929
class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRead {
@@ -981,7 +992,7 @@ void TColumnShard::SetupMetadata() {
981992
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor,
982993
std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i.GetRequest()->GetTaskId(), TTLTaskSubscription,
983994
std::shared_ptr<NOlap::TDataAccessorsRequest>(i.GetRequest()),
984-
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified()));
995+
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified(), nullptr));
985996
}
986997
}
987998

@@ -1020,7 +1031,7 @@ bool TColumnShard::SetupTtl() {
10201031
request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) + memoryUsage;
10211032
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
10221033
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i->GetTaskIdentifier(), TTLTaskSubscription,
1023-
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
1034+
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), i));
10241035
}
10251036
return true;
10261037
}
@@ -1069,7 +1080,7 @@ void TColumnShard::SetupCleanupPortions() {
10691080

10701081
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
10711082
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, changes->GetTaskIdentifier(), TTLTaskSubscription,
1072-
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
1083+
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified(), changes));
10731084
}
10741085

10751086
void TColumnShard::SetupCleanupTables() {

ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
#include "abstract.h"
2-
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
2+
33
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
44
#include <ydb/core/tx/columnshard/columnshard_impl.h>
5+
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
56
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
7+
68
#include <ydb/library/actors/core/actor.h>
79

810
namespace NKikimr::NOlap {
911

1012
void TColumnEngineChanges::SetStage(const NChanges::EStage stage) {
11-
AFL_VERIFY(stage >= Stage);
12-
if (Stage != stage) {
13-
Counters->OnStageChanged(stage, GetWritePortionsCount());
14-
}
15-
Stage = stage;
13+
Counters->SetStage(stage);
1614
}
1715

1816
TString TColumnEngineChanges::DebugString() const {
@@ -25,7 +23,8 @@ TString TColumnEngineChanges::DebugString() const {
2523

2624
TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& context) noexcept {
2725
const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("task_id", GetTaskIdentifier())("task_class", TypeString());
28-
Y_ABORT_UNLESS(Stage == NChanges::EStage::Started);
26+
AFL_VERIFY(Counters->GetStage() == NChanges::EStage::ReadyForConstruct || Counters->GetStage() == NChanges::EStage::Started)(
27+
"actual_stage", Counters->GetStage());
2928

3029
context.Counters.CompactionInputSize(Blobs.GetTotalBlobsSize());
3130
const TMonotonic start = TMonotonic::Now();
@@ -40,29 +39,27 @@ TConclusionStatus TColumnEngineChanges::ConstructBlobs(TConstructionContext& con
4039
}
4140

4241
void TColumnEngineChanges::WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) {
43-
Y_ABORT_UNLESS(Stage != NChanges::EStage::Aborted);
44-
Y_ABORT_UNLESS(Stage <= NChanges::EStage::Written);
45-
Y_ABORT_UNLESS(Stage >= NChanges::EStage::Compiled);
42+
AFL_VERIFY(Counters->GetStage() != NChanges::EStage::Aborted);
43+
AFL_VERIFY(Counters->GetStage() <= NChanges::EStage::Written);
44+
AFL_VERIFY(Counters->GetStage() >= NChanges::EStage::Compiled);
4645

4746
DoWriteIndexOnExecute(self, context);
4847
SetStage(NChanges::EStage::Written);
4948
}
5049

5150
void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
52-
Y_ABORT_UNLESS(Stage == NChanges::EStage::Written || !self);
51+
Y_ABORT_UNLESS(Counters->GetStage() == NChanges::EStage::Written || !self);
5352
SetStage(NChanges::EStage::Finished);
5453
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
5554
DoWriteIndexOnComplete(self, context);
5655
if (self) {
5756
OnFinish(*self, context);
5857
self->Counters.GetTabletCounters()->IncCounter(GetCounterIndex(context.FinishedSuccessfully));
5958
}
60-
6159
}
6260

6361
void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept {
64-
AFL_VERIFY(Stage != NChanges::EStage::Aborted);
65-
AFL_VERIFY(Stage == NChanges::EStage::Constructed)("real", Stage);
62+
AFL_VERIFY(Counters->GetStage() != NChanges::EStage::Aborted);
6663

6764
DoCompile(context);
6865
DoOnAfterCompile();
@@ -76,7 +73,8 @@ TColumnEngineChanges::~TColumnEngineChanges() {
7673

7774
void TColumnEngineChanges::Abort(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {
7875
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Abort")("reason", context.ErrorMessage);
79-
AFL_VERIFY(Stage != NChanges::EStage::Finished && Stage != NChanges::EStage::Created && Stage != NChanges::EStage::Aborted)("stage", Stage)("reason", context.ErrorMessage)("prev_reason", AbortedReason);
76+
AFL_VERIFY(Counters->GetStage() != NChanges::EStage::Finished && Counters->GetStage() != NChanges::EStage::Created && Counters->GetStage() != NChanges::EStage::Aborted)("stage", Counters->GetStage())(
77+
"reason", context.ErrorMessage)("prev_reason", AbortedReason);
8078
SetStage(NChanges::EStage::Aborted);
8179
AbortedReason = context.ErrorMessage;
8280
OnFinish(self, context);
@@ -85,26 +83,26 @@ void TColumnEngineChanges::Abort(NColumnShard::TColumnShard& self, TChangesFinis
8583
void TColumnEngineChanges::Start(NColumnShard::TColumnShard& self) {
8684
AFL_VERIFY(!LockGuard);
8785
LockGuard = self.DataLocksManager->RegisterLock(BuildDataLock());
88-
Y_ABORT_UNLESS(Stage == NChanges::EStage::Created);
86+
Y_ABORT_UNLESS(Counters->GetStage() == NChanges::EStage::Created);
8987
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexStart(self.TabletID(), *this);
9088
DoStart(self);
9189
SetStage(NChanges::EStage::Started);
92-
if (!NeedConstruction()) {
93-
SetStage(NChanges::EStage::Constructed);
94-
}
90+
// if (!NeedConstruction()) {
91+
// SetStage(NChanges::EStage::Constructed);
92+
// }
9593
}
9694

9795
void TColumnEngineChanges::StartEmergency() {
98-
Y_ABORT_UNLESS(Stage == NChanges::EStage::Created);
96+
Y_ABORT_UNLESS(Counters->GetStage() == NChanges::EStage::Created);
9997
SetStage(NChanges::EStage::Started);
100-
if (!NeedConstruction()) {
101-
SetStage(NChanges::EStage::Constructed);
102-
}
98+
// if (!NeedConstruction()) {
99+
// SetStage(NChanges::EStage::Constructed);
100+
// }
103101
}
104102

105103
void TColumnEngineChanges::AbortEmergency(const TString& reason) {
106104
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "AbortEmergency")("reason", reason)("prev_reason", AbortedReason);
107-
if (Stage == NChanges::EStage::Aborted) {
105+
if (Counters->GetStage() == NChanges::EStage::Aborted) {
108106
AbortedReason += "; AnotherReason: " + reason;
109107
} else {
110108
SetStage(NChanges::EStage::Aborted);
@@ -128,7 +126,6 @@ TWriteIndexContext::TWriteIndexContext(NTable::TDatabase* db, IDbWrapper& dbWrap
128126
, DBWrapper(dbWrapper)
129127
, EngineLogs(engineLogs)
130128
, Snapshot(snapshot) {
131-
132129
}
133130

134-
}
131+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/changes/abstract/abstract.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,14 +249,11 @@ class TDataAccessorsInitializationContext {
249249

250250
class TColumnEngineChanges: public TMoveOnly {
251251
private:
252-
NChanges::EStage Stage = NChanges::EStage::Created;
253252
std::shared_ptr<NDataLocks::TManager::TGuard> LockGuard;
254253
TString AbortedReason;
255254
const TString TaskIdentifier = TGUID::CreateTimebased().AsGuidString();
256255
std::shared_ptr<const TAtomicCounter> ActivityFlag;
257-
std::shared_ptr<NChanges::TChangesCounters::TStageCounters> Counters;
258-
259-
void SetStage(const NChanges::EStage stage);
256+
std::shared_ptr<NChanges::TChangesCounters::TStageCountersGuard> Counters;
260257

261258
protected:
262259
std::optional<TDataAccessorsResult> FetchedDataAccessors;
@@ -290,6 +287,8 @@ class TColumnEngineChanges: public TMoveOnly {
290287
virtual void OnDataAccessorsInitialized(const TDataAccessorsInitializationContext& context) = 0;
291288

292289
public:
290+
void SetStage(const NChanges::EStage stage);
291+
293292
bool IsActive() const {
294293
return !ActivityFlag || ActivityFlag->Val();
295294
}
@@ -355,14 +354,13 @@ class TColumnEngineChanges: public TMoveOnly {
355354
TColumnEngineChanges(const std::shared_ptr<IStoragesManager>& storagesManager, const NBlobOperations::EConsumer consumerId)
356355
: Counters(NChanges::TChangesCounters::GetStageCounters(consumerId))
357356
, BlobsAction(storagesManager, consumerId) {
358-
Counters->OnStageChanged(Stage, 0);
359357
}
360358

361359
TConclusionStatus ConstructBlobs(TConstructionContext& context) noexcept;
362360
virtual ~TColumnEngineChanges();
363361

364362
bool IsAborted() const {
365-
return Stage == NChanges::EStage::Aborted;
363+
return Counters->GetCurrentStage() == NChanges::EStage::Aborted;
366364
}
367365

368366
void StartEmergency();

0 commit comments

Comments
 (0)