16
16
#include < ydb/library/actors/core/process_stats.h>
17
17
#include < ydb/library/services/services.pb.h>
18
18
#include < ydb/library/yql/minikql/aligned_page_pool.h>
19
+ #include < contrib/libs/apache/arrow/cpp/src/arrow/type.h>
20
+ #include < contrib/libs/apache/arrow/cpp/src/arrow/memory_pool.h>
21
+ #include < ydb/library/yql/public/udf/arrow/memory_pool.h>
19
22
20
23
namespace NKikimr ::NMemory {
21
24
@@ -98,7 +101,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
98
101
const TResourceBrokerConfig& resourceBrokerConfig,
99
102
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
100
103
: Interval(interval)
101
- , MemTables(std::make_shared<TMemTableMemoryConsumersCollection>(counters,
104
+ , MemTables(std::make_shared<TMemTableMemoryConsumersCollection>(counters,
102
105
Consumers.emplace(EMemoryConsumerKind::MemTable, MakeIntrusive<TMemoryConsumer>(EMemoryConsumerKind::MemTable, TActorId{})).first->second))
103
106
, ProcessMemoryInfoProvider(std::move(processMemoryInfoProvider))
104
107
, Config(config)
@@ -109,7 +112,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
109
112
void Bootstrap (const TActorContext& ctx) {
110
113
Become (&TThis::StateWork);
111
114
112
- Send (NConsole::MakeConfigsDispatcherID (SelfId ().NodeId ()),
115
+ Send (NConsole::MakeConfigsDispatcherID (SelfId ().NodeId ()),
113
116
new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest ({
114
117
NKikimrConsole::TConfigItem::MemoryControllerConfigItem}));
115
118
@@ -146,7 +149,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
146
149
ui64 hardLimitBytes = GetHardLimitBytes (Config, processMemoryInfo, hasMemTotalHardLimit);
147
150
ui64 softLimitBytes = GetSoftLimitBytes (Config, hardLimitBytes);
148
151
ui64 targetUtilizationBytes = GetTargetUtilizationBytes (Config, hardLimitBytes);
149
- ui64 activitiesLimitBytes = ResourceBrokerSelfConfig.LimitBytes
152
+ ui64 activitiesLimitBytes = ResourceBrokerSelfConfig.LimitBytes
150
153
? ResourceBrokerSelfConfig.LimitBytes // for backward compatibility
151
154
: GetActivitiesLimitBytes (Config, hardLimitBytes);
152
155
@@ -161,7 +164,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
161
164
ui64 otherConsumption = SafeDiff (processMemoryInfo.AllocatedMemory , consumersConsumption);
162
165
163
166
ui64 externalConsumption = 0 ;
164
- if (hasMemTotalHardLimit && processMemoryInfo.AnonRss .has_value ()
167
+ if (hasMemTotalHardLimit && processMemoryInfo.AnonRss .has_value ()
165
168
&& processMemoryInfo.MemTotal .has_value () && processMemoryInfo.MemAvailable .has_value ()) {
166
169
// externalConsumption + AnonRss + MemAvailable = MemTotal
167
170
externalConsumption = SafeDiff (processMemoryInfo.MemTotal .value (),
@@ -174,20 +177,20 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
174
177
// want to find maximum possible coefficient in range [0..1] so that
175
178
// Sum(
176
179
// Max(
177
- // consumers[i].Consumption,
180
+ // consumers[i].Consumption,
178
181
// consumers[i].MinBytes + coefficient * (consumers[i].MaxBytes - consumers[i].MinBytes
179
182
// )
180
183
// ) <= targetConsumersConsumption
181
184
auto coefficient = BinarySearchCoefficient (consumers, targetConsumersConsumption);
182
185
183
186
ui64 resultingConsumersConsumption = 0 ;
184
187
for (const auto & consumer : consumers) {
185
- // Note: take Max with current consumer consumption because memory free may happen with a delay, or don't happen at all
188
+ // Note: take Max with current consumer consumption because memory free may happen with a delay, or don't happen at all
186
189
resultingConsumersConsumption += Max (consumer.Consumption , consumer.GetLimit (coefficient));
187
190
}
188
191
189
192
LOG_INFO_S (ctx, NKikimrServices::MEMORY_CONTROLLER, " Periodic memory stats:"
190
- << " AnonRss: " << processMemoryInfo.AnonRss << " CGroupLimit: " << processMemoryInfo.CGroupLimit
193
+ << " AnonRss: " << processMemoryInfo.AnonRss << " CGroupLimit: " << processMemoryInfo.CGroupLimit
191
194
<< " MemTotal: " << processMemoryInfo.MemTotal << " MemAvailable: " << processMemoryInfo.MemAvailable
192
195
<< " AllocatedMemory: " << processMemoryInfo.AllocatedMemory << " AllocatorCachesMemory: " << processMemoryInfo.AllocatorCachesMemory
193
196
<< " HardLimit: " << hardLimitBytes << " SoftLimit: " << softLimitBytes << " TargetUtilization: " << targetUtilizationBytes
@@ -212,6 +215,8 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
212
215
Counters->GetCounter (" Stats/TargetConsumersConsumption" )->Set (targetConsumersConsumption);
213
216
Counters->GetCounter (" Stats/ResultingConsumersConsumption" )->Set (resultingConsumersConsumption);
214
217
Counters->GetCounter (" Stats/Coefficient" )->Set (coefficient * 1e9 );
218
+ Counters->GetCounter (" Stats/ArrowAllocatedMemory" )->Set (arrow::default_memory_pool ()->bytes_allocated ());
219
+ Counters->GetCounter (" Stats/ArrowYqlAllocatedMemory" )->Set (NYql::NUdf::GetYqlMemoryPool ()->bytes_allocated ());
215
220
216
221
auto *memoryStatsUpdate = new NNodeWhiteboard::TEvWhiteboard::TEvMemoryStatsUpdate ();
217
222
auto & memoryStats = memoryStatsUpdate->Record ;
@@ -251,7 +256,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
251
256
Counters->GetCounter (" Stats/ConsumersLimit" )->Set (consumersLimitBytes);
252
257
253
258
ui64 queryExecutionConsumption = TAlignedPagePool::GetGlobalPagePoolSize ();
254
- ui64 queryExecutionLimitBytes = ResourceBrokerSelfConfig.QueryExecutionLimitBytes
259
+ ui64 queryExecutionLimitBytes = ResourceBrokerSelfConfig.QueryExecutionLimitBytes
255
260
? ResourceBrokerSelfConfig.QueryExecutionLimitBytes // for backward compatibility
256
261
: GetQueryExecutionLimitBytes (Config, hardLimitBytes);
257
262
LOG_INFO_S (ctx, NKikimrServices::MEMORY_CONTROLLER, " Consumer QueryExecution state:"
@@ -303,9 +308,9 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
303
308
304
309
void Handle (TEvResourceBroker::TEvConfigureResult::TPtr &ev, const TActorContext& ctx) {
305
310
const auto *msg = ev->Get ();
306
- LOG_LOG_S (ctx,
307
- msg->Record .GetSuccess () ? NActors::NLog::PRI_INFO : NActors::NLog::PRI_ERROR,
308
- NKikimrServices::MEMORY_CONTROLLER,
311
+ LOG_LOG_S (ctx,
312
+ msg->Record .GetSuccess () ? NActors::NLog::PRI_INFO : NActors::NLog::PRI_ERROR,
313
+ NKikimrServices::MEMORY_CONTROLLER,
309
314
" ResourceBroker configure result " << msg->Record .ShortDebugString ());
310
315
}
311
316
@@ -404,7 +409,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
404
409
405
410
TConsumerState BuildConsumerState (const TMemoryConsumer& consumer, ui64 hardLimitBytes) const {
406
411
TConsumerState result (consumer);
407
-
412
+
408
413
switch (consumer.Kind ) {
409
414
case EMemoryConsumerKind::MemTable: {
410
415
result.MinBytes = GetMemTableMinBytes (Config, hardLimitBytes);
@@ -445,7 +450,7 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
445
450
IActor* CreateMemoryController (
446
451
TDuration interval,
447
452
TIntrusiveConstPtr<IProcessMemoryInfoProvider> processMemoryInfoProvider,
448
- const NKikimrConfig::TMemoryControllerConfig& config,
453
+ const NKikimrConfig::TMemoryControllerConfig& config,
449
454
const TResourceBrokerConfig& resourceBrokerSelfConfig,
450
455
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) {
451
456
return new TMemoryController (
0 commit comments