21
21
#include " blobs_action/transaction/tx_remove_blobs.h"
22
22
#include " blobs_action/transaction/tx_gc_insert_table.h"
23
23
#include " blobs_action/transaction/tx_gc_indexed.h"
24
+ #include " blobs_reader/actor.h"
24
25
#include " bg_tasks/events/events.h"
25
26
26
27
#include " data_accessor/manager.h"
@@ -579,8 +580,13 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
579
580
580
581
protected:
581
582
virtual void DoOnDataReady (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
583
+ if (!!resourcesGuard) {
584
+ AFL_VERIFY (!TxEvent->IndexChanges ->ResourcesGuard );
585
+ TxEvent->IndexChanges ->ResourcesGuard = resourcesGuard;
586
+ } else {
587
+ AFL_VERIFY (TxEvent->IndexChanges ->ResourcesGuard );
588
+ }
582
589
TxEvent->IndexChanges ->Blobs = ExtractBlobsData ();
583
- TxEvent->IndexChanges ->ResourcesGuard = resourcesGuard;
584
590
const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges );
585
591
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move (TxEvent), Counters, TabletId, ParentActorId, LastCompletedTx);
586
592
if (isInsert) {
@@ -615,6 +621,7 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
615
621
const NActors::TActorId ShardActorId;
616
622
std::shared_ptr<NOlap::TColumnEngineChanges> Changes;
617
623
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
624
+ std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
618
625
619
626
virtual void DoOnRequestsFinishedImpl () = 0;
620
627
@@ -624,6 +631,16 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
624
631
}
625
632
626
633
public:
634
+ void SetResourcesGuard (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
635
+ AFL_VERIFY (!ResourcesGuard);
636
+ ResourcesGuard = guard;
637
+ }
638
+
639
+ std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& ExtractResourcesGuard() {
640
+ AFL_VERIFY (ResourcesGuard);
641
+ return std::move (ResourcesGuard);
642
+ }
643
+
627
644
TDataAccessorsSubscriber (const NActors::TActorId& shardActorId, const std::shared_ptr<NOlap::TColumnEngineChanges>& changes,
628
645
const std::shared_ptr<NOlap::TVersionedIndex>& versionedIndex)
629
646
: ShardActorId(shardActorId)
@@ -801,6 +818,30 @@ void TColumnShard::SetupCompaction(const std::set<ui64>& pathIds) {
801
818
}
802
819
}
803
820
821
+ class TAccessorsMemorySubscriber : public NOlap ::NResourceBroker::NSubscribe::ITask {
822
+ private:
823
+ using TBase = NOlap::NResourceBroker::NSubscribe::ITask;
824
+ std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
825
+ std::shared_ptr<TDataAccessorsSubscriber> Subscriber;
826
+ std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
827
+
828
+ virtual void DoOnAllocationSuccess (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
829
+ Subscriber->SetResourcesGuard (guard);
830
+ Request->RegisterSubscriber (Subscriber);
831
+ DataAccessorsManager->AskData (Request);
832
+ }
833
+
834
+ public:
835
+ TAccessorsMemorySubscriber (const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
836
+ std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriber>& subscriber,
837
+ const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
838
+ : TBase(0 , memory, externalTaskId, context)
839
+ , Request(std::move(request))
840
+ , Subscriber(subscriber)
841
+ , DataAccessorsManager(dataAccessorsManager) {
842
+ }
843
+ };
844
+
804
845
class TCompactionDataAccessorsSubscriber : public TDataAccessorsSubscriberWithRead {
805
846
private:
806
847
using TBase = TDataAccessorsSubscriberWithRead;
@@ -811,10 +852,9 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea
811
852
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " compaction" )(" external_task_id" , externalTaskId);
812
853
813
854
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, CacheDataAfterWrite);
814
- auto readSubscriber = std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
815
- std::make_shared<TCompactChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification), 0 ,
816
- Changes->CalcMemoryForUsage (), externalTaskId, TaskSubscriptionContext);
817
- NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (ResourceSubscribeActor, readSubscriber);
855
+ ev->IndexChanges ->ResourcesGuard = ExtractResourcesGuard ();
856
+ TActorContext::AsActorContext ().Register (new NOlap::NBlobOperations::NRead::TActor (
857
+ std::make_shared<TCompactChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
818
858
}
819
859
820
860
public:
@@ -837,10 +877,14 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
837
877
838
878
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex ()->GetVersionedIndex ());
839
879
auto request = compaction->ExtractDataAccessorsRequest ();
840
- request->RegisterSubscriber (std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
880
+ const ui64 accessorsMemory = request->PredictAccessorsMemory (TablesManager.GetPrimaryIndex ()->GetVersionedIndex ().GetLastSchema ()) +
881
+ indexChanges->CalcMemoryForUsage ();
882
+ const auto subscriber = std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
841
883
Settings.CacheDataAfterCompaction , SelfId (), TabletID (), Counters.GetCompactionCounters (), GetLastCompletedTx (),
842
- CompactTaskSubscription));
843
- TablesManager.GetPrimaryIndex ()->FetchDataAccessors (request);
884
+ CompactTaskSubscription);
885
+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
886
+ ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier (),
887
+ CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
844
888
}
845
889
846
890
class TWriteEvictPortionsDataAccessorsSubscriber : public TDataAccessorsSubscriberWithRead {
@@ -851,11 +895,9 @@ class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscribe
851
895
virtual void DoOnRequestsFinishedImpl () override {
852
896
ACFL_DEBUG (" background" , " ttl" )(" need_writes" , true );
853
897
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false );
854
- auto readSubscriber = std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
855
- std::make_shared<TTTLChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification), 0 ,
856
- Changes->CalcMemoryForUsage (), Changes->GetTaskIdentifier (), TaskSubscriptionContext);
857
-
858
- NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (ResourceSubscribeActor, readSubscriber);
898
+ ev->IndexChanges ->ResourcesGuard = ExtractResourcesGuard ();
899
+ TActorContext::AsActorContext ().Register (new NOlap::NBlobOperations::NRead::TActor (
900
+ std::make_shared<TTTLChangesReadTask>(std::move (ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
859
901
}
860
902
861
903
public:
@@ -911,7 +953,8 @@ void TColumnShard::SetupMetadata() {
911
953
}
912
954
913
955
bool TColumnShard::SetupTtl (const THashMap<ui64, NOlap::TTiering>& pathTtls) {
914
- if (!AppDataVerified ().ColumnShardConfig .GetTTLEnabled () || !NYDBTest::TControllers::GetColumnShardController ()->IsBackgroundEnabled (NYDBTest::ICSController::EBackground::TTL)) {
956
+ if (!AppDataVerified ().ColumnShardConfig .GetTTLEnabled () ||
957
+ !NYDBTest::TControllers::GetColumnShardController ()->IsBackgroundEnabled (NYDBTest::ICSController::EBackground::TTL)) {
915
958
AFL_WARN (NKikimrServices::TX_COLUMNSHARD)(" event" , " skip_ttl" )(" reason" , " disabled" );
916
959
return false ;
917
960
}
@@ -922,7 +965,8 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
922
965
}
923
966
924
967
const ui64 memoryUsageLimit = HasAppData () ? AppDataVerified ().ColumnShardConfig .GetTieringsMemoryLimit () : ((ui64)512 * 1024 * 1024 );
925
- std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges = TablesManager.MutablePrimaryIndex ().StartTtl (eviction, DataLocksManager, memoryUsageLimit);
968
+ std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges =
969
+ TablesManager.MutablePrimaryIndex ().StartTtl (eviction, DataLocksManager, memoryUsageLimit);
926
970
927
971
if (indexChanges.empty ()) {
928
972
ACFL_DEBUG (" background" , " ttl" )(" skip_reason" , " no_changes" );
@@ -933,14 +977,21 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
933
977
for (auto && i : indexChanges) {
934
978
i->Start (*this );
935
979
auto request = i->ExtractDataAccessorsRequest ();
980
+ ui64 memoryUsage = 0 ;
981
+ std::shared_ptr<TDataAccessorsSubscriber> subscriber;
936
982
if (i->NeedConstruction ()) {
937
- request->RegisterSubscriber (std::make_shared<TWriteEvictPortionsDataAccessorsSubscriber>(ResourceSubscribeActor, i,
938
- actualIndexInfo, Settings.CacheDataAfterCompaction , SelfId (), TabletID (), Counters.GetEvictionCounters (), GetLastCompletedTx (),
939
- TTLTaskSubscription));
983
+ subscriber = std::make_shared<TWriteEvictPortionsDataAccessorsSubscriber>(ResourceSubscribeActor, i, actualIndexInfo,
984
+ Settings.CacheDataAfterCompaction , SelfId (), TabletID (), Counters.GetEvictionCounters (), GetLastCompletedTx (),
985
+ TTLTaskSubscription);
986
+ memoryUsage = i->CalcMemoryForUsage ();
940
987
} else {
941
- request-> RegisterSubscriber ( std::make_shared<TNoWriteEvictPortionsDataAccessorsSubscriber>(SelfId (), i, actualIndexInfo) );
988
+ subscriber = std::make_shared<TNoWriteEvictPortionsDataAccessorsSubscriber>(SelfId (), i, actualIndexInfo);
942
989
}
943
- TablesManager.GetPrimaryIndex ()->FetchDataAccessors (request);
990
+ const ui64 accessorsMemory =
991
+ request->PredictAccessorsMemory (TablesManager.GetPrimaryIndex ()->GetVersionedIndex ().GetLastSchema ()) + memoryUsage;
992
+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
993
+ ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i->GetTaskIdentifier (), TTLTaskSubscription,
994
+ std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
944
995
}
945
996
return true ;
946
997
}
@@ -953,6 +1004,7 @@ class TCleanupPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriber {
953
1004
virtual void DoOnRequestsFinishedImpl () override {
954
1005
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" background" , " cleanup" )(" changes_info" , Changes->DebugString ());
955
1006
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false );
1007
+ ev->IndexChanges ->ResourcesGuard = ExtractResourcesGuard ();
956
1008
ev->SetPutStatus (NKikimrProto::OK); // No new blobs to write
957
1009
NActors::TActivationContext::Send (ShardActorId, std::move (ev));
958
1010
}
@@ -982,8 +1034,12 @@ void TColumnShard::SetupCleanupPortions() {
982
1034
983
1035
auto request = changes->ExtractDataAccessorsRequest ();
984
1036
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex ()->GetVersionedIndex ());
985
- request->RegisterSubscriber (std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId (), changes, actualIndexInfo));
986
- TablesManager.GetPrimaryIndex ()->FetchDataAccessors (request);
1037
+ const ui64 accessorsMemory = request->PredictAccessorsMemory (TablesManager.GetPrimaryIndex ()->GetVersionedIndex ().GetLastSchema ());
1038
+ const auto subscriber = std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId (), changes, actualIndexInfo);
1039
+
1040
+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
1041
+ ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, changes->GetTaskIdentifier (), TTLTaskSubscription,
1042
+ std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
987
1043
}
988
1044
989
1045
void TColumnShard::SetupCleanupTables () {
0 commit comments