Skip to content

Commit a2e8760

Browse files
change memory planner for CS scan (#7372)
1 parent f93ea93 commit a2e8760

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+2492
-371
lines changed

ydb/core/base/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ struct TKikimrEvents : TEvents {
180180
ES_SS_BG_TASKS = 4257,
181181
ES_LIMITER = 4258,
182182
ES_MEMORY = 4259,
183+
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
183184
};
184185
};
185186

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@
187187
#include <ydb/core/tx/limiter/usage/config.h>
188188
#include <ydb/core/tx/limiter/usage/service.h>
189189

190+
#include <ydb/core/tx/limiter/grouped_memory/usage/config.h>
191+
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
192+
190193
#include <ydb/core/backup/controller/tablet.h>
191194

192195
#include <ydb/services/ext_index/common/config.h>
@@ -2216,6 +2219,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
22162219
}
22172220
}
22182221

2222+
TGroupedMemoryLimiterInitializer::TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig)
2223+
: IKikimrServicesInitializer(runConfig) {
2224+
}
2225+
2226+
void TGroupedMemoryLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
2227+
NOlap::NGroupedMemoryManager::TConfig serviceConfig;
2228+
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetGroupedMemoryLimiterConfig()));
2229+
2230+
if (serviceConfig.IsEnabled()) {
2231+
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
2232+
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_GROUPED_MEMORY_LIMITER");
2233+
2234+
auto service = NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::CreateService(serviceConfig, countersGroup);
2235+
2236+
setup->LocalServices.push_back(std::make_pair(
2237+
NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(NodeId),
2238+
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
2239+
}
2240+
}
2241+
22192242
TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
22202243
: IKikimrServicesInitializer(runConfig) {
22212244
}

ydb/core/driver_lib/run/kikimr_services_initializers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,12 @@ class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
404404
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
405405
};
406406

407+
class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
408+
public:
409+
TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
410+
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
411+
};
412+
407413
class TCompConveyorInitializer: public IKikimrServicesInitializer {
408414
public:
409415
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);

ydb/core/driver_lib/run/run.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,6 +1556,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
15561556
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
15571557
}
15581558

1559+
if (serviceMask.EnableGroupedMemoryLimiter) {
1560+
sil->AddServiceInitializer(new TGroupedMemoryLimiterInitializer(runConfig));
1561+
}
1562+
15591563
if (serviceMask.EnableScanConveyor) {
15601564
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
15611565
}

ydb/core/driver_lib/run/service_mask.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ union TBasicKikimrServicesMask {
7878
bool EnableDatabaseMetadataCache:1;
7979
bool EnableGraphService:1;
8080
bool EnableCompDiskLimiter:1;
81+
bool EnableGroupedMemoryLimiter:1;
8182
};
8283

8384
struct {

ydb/core/driver_lib/run/ya.make

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ PEERDIR(
101101
ydb/core/security
102102
ydb/core/security/ldap_auth_provider
103103
ydb/core/statistics/aggregator
104-
ydb/core/statistics/service
104+
ydb/core/statistics/service
105105
ydb/core/sys_view/processor
106106
ydb/core/sys_view/service
107107
ydb/core/tablet
@@ -113,6 +113,7 @@ PEERDIR(
113113
ydb/core/tx/coordinator
114114
ydb/core/tx/conveyor/service
115115
ydb/core/tx/limiter/service
116+
ydb/core/tx/limiter/grouped_memory/usage
116117
ydb/core/tx/datashard
117118
ydb/core/tx/long_tx_service
118119
ydb/core/tx/long_tx_service/public

ydb/core/formats/arrow/arrow_helpers.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,18 @@ std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::Recor
875875
return DeserializeBatch(SerializeBatch(original, arrow::ipc::IpcWriteOptions::Defaults()), original->schema());
876876
}
877877

878+
std::shared_ptr<arrow::Table> ReallocateBatch(const std::shared_ptr<arrow::Table>& original) {
879+
if (!original) {
880+
return original;
881+
}
882+
auto batches = NArrow::SliceToRecordBatches(original);
883+
for (auto&& i : batches) {
884+
i = NArrow::TStatusValidator::GetValid(
885+
NArrow::NSerialization::TNativeSerializer().Deserialize(NArrow::NSerialization::TNativeSerializer().SerializeFull(i)));
886+
}
887+
return NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches(batches));
888+
}
889+
878890
std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches) {
879891
std::vector<std::shared_ptr<arrow::Array>> columns;
880892
std::vector<std::shared_ptr<arrow::Field>> fields;

ydb/core/formats/arrow/arrow_helpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Ar
103103
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
104104
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
105105
std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::RecordBatch> original);
106+
std::shared_ptr<arrow::Table> ReallocateBatch(const std::shared_ptr<arrow::Table>& original);
106107

107108
bool HasNulls(const std::shared_ptr<arrow::Array>& column);
108109

ydb/core/kqp/ut/olap/datatime64_ut.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ Y_UNIT_TEST_SUITE(KqpDatetime64ColumnShard) {
170170
runnerSettings.WithSampleTables = false;
171171

172172
TTestHelper testHelper(runnerSettings);
173+
Tests::NCommon::TLoggerInit(testHelper.GetKikimr()).SetComponents({ NKikimrServices::GROUPED_MEMORY_LIMITER }, "CS").Initialize();
173174

174175
TVector<TTestHelper::TColumnSchema> schema = {
175176
TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int64).SetNullable(false),

ydb/core/kqp/ut/olap/helpers/aggregation.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ void TestAggregationsBase(const std::vector<TAggregationTestCase>& cases) {
1313

1414
TLocalHelper(kikimr).CreateTestOlapTable();
1515
auto tableClient = kikimr.GetTableClient();
16+
Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::GROUPED_MEMORY_LIMITER, NKikimrServices::TX_COLUMNSHARD_SCAN }, "CS").Initialize();
1617

1718
{
1819
WriteTestData(kikimr, "/Root/olapStore/olapTable", 10000, 3000000, 1000);
@@ -49,10 +50,11 @@ void TestAggregationsInternal(const std::vector<TAggregationTestCase>& cases) {
4950
Tests::TServer::TPtr server = new Tests::TServer(settings);
5051

5152
auto runtime = server->GetRuntime();
53+
Tests::NCommon::TLoggerInit(runtime).Initialize();
54+
Tests::NCommon::TLoggerInit(runtime).SetComponents({ NKikimrServices::GROUPED_MEMORY_LIMITER }, "CS").Initialize();
5255
auto sender = runtime->AllocateEdgeActor();
5356

5457
InitRoot(server, sender);
55-
Tests::NCommon::TLoggerInit(runtime).Initialize();
5658

5759
ui32 numShards = 1;
5860
ui32 numIterations = 10;

0 commit comments

Comments
 (0)