16
16
#include < ydb/core/tx/columnshard/blob_cache.h>
17
17
#include < ydb/core/tx/columnshard/common/limits.h>
18
18
#include < ydb/core/tx/columnshard/data_accessor/cache_policy/policy.h>
19
- #include < ydb/core/tx/limiter/grouped_memory/usage/events.h>
20
- #include < ydb/core/tx/limiter/grouped_memory/usage/service.h>
21
19
#include < ydb/library/actors/core/actor_bootstrapped.h>
22
20
#include < ydb/library/actors/core/log.h>
23
21
#include < ydb/library/actors/core/process_stats.h>
@@ -86,6 +84,7 @@ struct TConsumerState {
86
84
ui64 MinBytes = 0 ;
87
85
ui64 MaxBytes = 0 ;
88
86
bool CanZeroLimit = false ;
87
+ std::optional<ui64> ExactLimit;
89
88
90
89
TConsumerState (const TMemoryConsumer& consumer)
91
90
: Kind(consumer.Kind)
@@ -266,21 +265,31 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
266
265
267
266
ui64 consumersLimitBytes = 0 ;
268
267
for (const auto & consumer : consumers) {
269
- ui64 limitBytes = consumer.GetLimit (coefficient);
270
- if (resultingConsumersConsumption + otherConsumption + externalConsumption > softLimitBytes && consumer.CanZeroLimit ) {
271
- limitBytes = SafeDiff (limitBytes, resultingConsumersConsumption + otherConsumption + externalConsumption - softLimitBytes);
268
+ const bool isExactLimitConsumer = consumer.ExactLimit .has_value ();
269
+ ui64 limitBytes;
270
+ if (isExactLimitConsumer) {
271
+ limitBytes = consumer.ExactLimit .value ();
272
+ } else {
273
+ limitBytes = consumer.GetLimit (coefficient);
274
+ if (resultingConsumersConsumption + otherConsumption + externalConsumption > softLimitBytes && consumer.CanZeroLimit ) {
275
+ limitBytes = SafeDiff (limitBytes, resultingConsumersConsumption + otherConsumption + externalConsumption - softLimitBytes);
276
+ }
272
277
}
273
278
consumersLimitBytes += limitBytes;
274
279
275
280
LOG_INFO_S (ctx, NKikimrServices::MEMORY_CONTROLLER, " Consumer " << consumer.Kind << " state:"
276
281
<< " Consumption: " << HumanReadableBytes (consumer.Consumption ) << " Limit: " << HumanReadableBytes (limitBytes)
277
282
<< " Min: " << HumanReadableBytes (consumer.MinBytes ) << " Max: " << HumanReadableBytes (consumer.MaxBytes ));
278
- auto & counters = GetConsumerCounters (consumer.Kind );
283
+ auto & counters = GetConsumerCounters (consumer.Kind , !isExactLimitConsumer );
279
284
counters.Consumption ->Set (consumer.Consumption );
280
285
counters.Reservation ->Set (SafeDiff (limitBytes, consumer.Consumption ));
281
286
counters.LimitBytes ->Set (limitBytes);
282
- counters.LimitMinBytes ->Set (consumer.MinBytes );
283
- counters.LimitMaxBytes ->Set (consumer.MaxBytes );
287
+ if (counters.LimitMinBytes ) {
288
+ counters.LimitMinBytes ->Set (consumer.MinBytes );
289
+ }
290
+ if (counters.LimitMaxBytes ) {
291
+ counters.LimitMaxBytes ->Set (consumer.MaxBytes );
292
+ }
284
293
SetMemoryStats (consumer, memoryStats, limitBytes);
285
294
286
295
ApplyLimit (consumer, limitBytes);
@@ -289,8 +298,6 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
289
298
Counters->GetCounter (" Stats/ConsumersLimit" )->Set (consumersLimitBytes);
290
299
291
300
ProcessResourceBrokerConfig (ctx, memoryStats, hardLimitBytes, activitiesLimitBytes);
292
- ProcessGroupedMemoryLimiterConfig (ctx, memoryStats, hardLimitBytes);
293
- ProcessCacheConfig (ctx, memoryStats, hardLimitBytes);
294
301
295
302
Send (NNodeWhiteboard::MakeNodeWhiteboardServiceId (SelfId ().NodeId ()), memoryStatsUpdate);
296
303
@@ -361,10 +368,15 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
361
368
case EMemoryConsumerKind::MemTable:
362
369
ApplyMemTableLimit (limitBytes);
363
370
break ;
364
- default :
371
+ case EMemoryConsumerKind::SharedCache:
372
+ case EMemoryConsumerKind::BlobCache:
373
+ case EMemoryConsumerKind::DataAccessorCache:
365
374
Send (consumer.ActorId , new TEvConsumerLimit (limitBytes));
366
375
break ;
367
-
376
+ case EMemoryConsumerKind::ScanGroupedMemoryLimiter:
377
+ case EMemoryConsumerKind::CompGroupedMemoryLimiter:
378
+ Send (consumer.ActorId , new TEvConsumerLimit (limitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient, limitBytes));
379
+ break ;
368
380
}
369
381
}
370
382
@@ -377,43 +389,6 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
377
389
}
378
390
}
379
391
380
- void ProcessGroupedMemoryLimiterConfig (const TActorContext& /* ctx*/ , NKikimrMemory::TMemoryStats& /* memoryStats*/ , ui64 hardLimitBytes) {
381
- ui64 columnTablesScanLimitBytes = GetColumnTablesReadExecutionLimitBytes (Config, hardLimitBytes);
382
- ui64 columnTablesCompactionLimitBytes = GetColumnTablesCompactionLimitBytes (Config, hardLimitBytes) *
383
- NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterCompactionLimitCoefficient;
384
-
385
- ApplyGroupedMemoryLimiterConfig (columnTablesScanLimitBytes, columnTablesCompactionLimitBytes);
386
- }
387
-
388
- void ApplyGroupedMemoryLimiterConfig (const ui64 scanHardLimitBytes, const ui64 compactionHardLimitBytes) {
389
- namespace NGroupedMemoryManager = ::NKikimr::NOlap::NGroupedMemoryManager;
390
- using UpdateMemoryLimitsEv = NGroupedMemoryManager::NEvents::TEvExternal::TEvUpdateMemoryLimits;
391
-
392
- Send (NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId (SelfId ().NodeId ()),
393
- new UpdateMemoryLimitsEv (scanHardLimitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient,
394
- scanHardLimitBytes));
395
-
396
- Send (NGroupedMemoryManager::TCompMemoryLimiterOperator::MakeServiceId (SelfId ().NodeId ()),
397
- new UpdateMemoryLimitsEv (compactionHardLimitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient,
398
- compactionHardLimitBytes));
399
- }
400
-
401
- void ProcessCacheConfig (const TActorContext& /* ctx*/ , NKikimrMemory::TMemoryStats& /* memoryStats*/ , ui64 hardLimitBytes) {
402
- ui64 columnTablesBlobCacheLimitBytes = GetColumnTablesCacheLimitBytes (Config, hardLimitBytes);
403
-
404
- ApplyCacheConfig (columnTablesBlobCacheLimitBytes);
405
- }
406
-
407
- void ApplyCacheConfig (const ui64 cacheLimitBytes) {
408
- using TUpdateMaxBlobCacheDataSizeEv = NBlobCache::TEvBlobCache::TEvUpdateMaxCacheDataSize;
409
- using TGeneralCache = NKikimr::NOlap::NDataAccessorControl::TGeneralCache;
410
- using TGlobalLimits = NKikimr::NOlap::TGlobalLimits;
411
-
412
- Send (NKikimr::NBlobCache::MakeBlobCacheServiceId (), new TUpdateMaxBlobCacheDataSizeEv (cacheLimitBytes * TGlobalLimits::BlobCacheCoefficient));
413
-
414
- TGeneralCache::UpdateMaxCacheSize (cacheLimitBytes * TGlobalLimits::DataAccessorCoefficient);
415
- }
416
-
417
392
void ProcessResourceBrokerConfig (const TActorContext& ctx, NKikimrMemory::TMemoryStats& memoryStats, ui64 hardLimitBytes,
418
393
ui64 activitiesLimitBytes) {
419
394
ui64 queryExecutionConsumption = TAlignedPagePool::GetGlobalPagePoolSize ();
@@ -470,18 +445,21 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
470
445
queue->MutableLimit ()->SetMemory (limitBytes);
471
446
}
472
447
473
- TConsumerCounters& GetConsumerCounters (EMemoryConsumerKind consumer) {
448
+ TConsumerCounters& GetConsumerCounters (EMemoryConsumerKind consumer, const bool minMaxRequired ) {
474
449
auto it = ConsumerCounters.FindPtr (consumer);
475
450
if (it) {
476
451
return *it;
477
452
}
478
453
454
+ TCounterPtr limitMinBytes = minMaxRequired ? Counters->GetCounter (TStringBuilder () << " Consumer/" << consumer << " /LimitMin" ) : nullptr ;
455
+ TCounterPtr limitMaxBytes = minMaxRequired ? Counters->GetCounter (TStringBuilder () << " Consumer/" << consumer << " /LimitMax" ) : nullptr ;
456
+
479
457
return ConsumerCounters.emplace (consumer, TConsumerCounters{
480
458
Counters->GetCounter (TStringBuilder () << " Consumer/" << consumer << " /Consumption" ),
481
459
Counters->GetCounter (TStringBuilder () << " Consumer/" << consumer << " /Reservation" ),
482
460
Counters->GetCounter (TStringBuilder () << " Consumer/" << consumer << " /Limit" ),
483
- Counters-> GetCounter ( TStringBuilder () << " Consumer/ " << consumer << " /LimitMin " ) ,
484
- Counters-> GetCounter ( TStringBuilder () << " Consumer/ " << consumer << " /LimitMax " ) ,
461
+ limitMinBytes ,
462
+ limitMaxBytes ,
485
463
}).first ->second ;
486
464
}
487
465
@@ -497,6 +475,23 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
497
475
stats.SetSharedCacheLimit (limitBytes);
498
476
break ;
499
477
}
478
+ case EMemoryConsumerKind::ScanGroupedMemoryLimiter: {
479
+ stats.SetColumnTablesReadExecutionConsumption (consumer.Consumption );
480
+ stats.SetColumnTablesReadExecutionLimit (limitBytes);
481
+ break ;
482
+ }
483
+ case EMemoryConsumerKind::CompGroupedMemoryLimiter: {
484
+ stats.SetColumnTablesCompactionConsumption (consumer.Consumption );
485
+ stats.SetColumnTablesCompactionLimit (limitBytes);
486
+ break ;
487
+ }
488
+ case EMemoryConsumerKind::DataAccessorCache:
489
+ case EMemoryConsumerKind::BlobCache: {
490
+ stats.SetColumnTablesCacheConsumption (
491
+ (stats.HasColumnTablesCacheConsumption () ? stats.GetColumnTablesCacheConsumption () : 0 ) + consumer.Consumption );
492
+ stats.SetColumnTablesCacheLimit ((stats.HasColumnTablesCacheLimit () ? stats.GetColumnTablesCacheLimit () : 0 ) + limitBytes);
493
+ break ;
494
+ }
500
495
default :
501
496
Y_ABORT (" Unhandled consumer" );
502
497
}
@@ -517,6 +512,23 @@ class TMemoryController : public TActorBootstrapped<TMemoryController> {
517
512
result.CanZeroLimit = true ;
518
513
break ;
519
514
}
515
+ case EMemoryConsumerKind::ScanGroupedMemoryLimiter: {
516
+ result.ExactLimit = GetColumnTablesReadExecutionLimitBytes (Config, hardLimitBytes);
517
+ break ;
518
+ }
519
+ case EMemoryConsumerKind::CompGroupedMemoryLimiter: {
520
+ result.ExactLimit = GetColumnTablesCompactionLimitBytes (Config, hardLimitBytes) *
521
+ NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterCompactionLimitCoefficient;
522
+ break ;
523
+ }
524
+ case EMemoryConsumerKind::BlobCache: {
525
+ result.ExactLimit = GetColumnTablesCacheLimitBytes (Config, hardLimitBytes) * NKikimr::NOlap::TGlobalLimits::BlobCacheCoefficient;
526
+ break ;
527
+ }
528
+ case EMemoryConsumerKind::DataAccessorCache: {
529
+ result.ExactLimit = GetColumnTablesCacheLimitBytes (Config, hardLimitBytes) * NKikimr::NOlap::TGlobalLimits::DataAccessorCoefficient;
530
+ break ;
531
+ }
520
532
default :
521
533
Y_ABORT (" Unhandled consumer" );
522
534
}
0 commit comments