12
12
#include < yt/yt/core/concurrency/thread_pool_poller.h>
13
13
14
14
#include < yt/yt/core/misc/finally.h>
15
+ #include < yt/yt/core/misc/memory_usage_tracker.h>
16
+ #include < yt/yt/core/misc/public.h>
15
17
16
18
#include < yt/yt/core/ytree/convert.h>
17
19
@@ -56,20 +58,22 @@ class TServer
56
58
{
57
59
public:
58
60
TServer (
59
- const TServerConfigPtr& config,
60
- const IListenerPtr& listener,
61
- const IPollerPtr& poller,
62
- const IPollerPtr& acceptor,
63
- const IInvokerPtr& invoker,
64
- const IRequestPathMatcherPtr& requestPathMatcher,
61
+ TServerConfigPtr config,
62
+ IListenerPtr listener,
63
+ IPollerPtr poller,
64
+ IPollerPtr acceptor,
65
+ IInvokerPtr invoker,
66
+ IMemoryUsageTrackerPtr memoryUsageTracker,
67
+ IRequestPathMatcherPtr requestPathMatcher,
65
68
bool ownPoller = false )
66
- : Config_(config)
67
- , Listener_(listener)
68
- , Poller_(poller)
69
- , Acceptor_(acceptor)
70
- , Invoker_(invoker)
71
- , RequestPathMatcher_(requestPathMatcher )
69
+ : Config_(std::move( config) )
70
+ , Listener_(std::move( listener) )
71
+ , Poller_(std::move( poller) )
72
+ , Acceptor_(std::move( acceptor) )
73
+ , Invoker_(std::move( invoker) )
74
+ , MemoryUsageTracker_(std::move(memoryUsageTracker) )
72
75
, OwnPoller_(ownPoller)
76
+ , RequestPathMatcher_(std::move(requestPathMatcher))
73
77
{ }
74
78
75
79
void AddHandler (const TString& path, const IHttpHandlerPtr& handler) override
@@ -122,9 +126,10 @@ class TServer
122
126
const IPollerPtr Poller_;
123
127
const IPollerPtr Acceptor_;
124
128
const IInvokerPtr Invoker_;
125
- IRequestPathMatcherPtr RequestPathMatcher_ ;
129
+ const IMemoryUsageTrackerPtr MemoryUsageTracker_ ;
126
130
const bool OwnPoller_ = false ;
127
131
132
+ IRequestPathMatcherPtr RequestPathMatcher_;
128
133
bool Started_ = false ;
129
134
std::atomic<bool > Stopped_ = false ;
130
135
@@ -220,6 +225,14 @@ class TServer
220
225
221
226
SetRequestId (response, request->GetRequestId ());
222
227
228
+ if (MemoryUsageTracker_ && MemoryUsageTracker_->IsExceeded ()) {
229
+ THROW_ERROR_EXCEPTION (
230
+ EStatusCode::TooManyRequests,
231
+ " Request is dropped due to high memory pressure" )
232
+ << TErrorAttribute (" total_memory_limit" , MemoryUsageTracker_->GetLimit ())
233
+ << TErrorAttribute (" memory_usage" , MemoryUsageTracker_->GetUsed ());
234
+ }
235
+
223
236
handler->HandleRequest (request, response);
224
237
225
238
NTracing::FlushCurrentTraceContextElapsedTime ();
@@ -379,36 +392,46 @@ class TServer
379
392
// //////////////////////////////////////////////////////////////////////////////
380
393
381
394
IServerPtr CreateServer (
382
- const TServerConfigPtr& config,
383
- const IListenerPtr& listener,
384
- const IPollerPtr& poller,
385
- const IPollerPtr& acceptor,
386
- const IInvokerPtr& invoker,
395
+ TServerConfigPtr config,
396
+ IListenerPtr listener,
397
+ IPollerPtr poller,
398
+ IPollerPtr acceptor,
399
+ IInvokerPtr invoker,
400
+ IMemoryUsageTrackerPtr memoryUsageTracker,
387
401
bool ownPoller)
388
402
{
389
403
auto handlers = New<TRequestPathMatcher>();
390
404
return New<TServer>(
391
- config,
392
- listener,
393
- poller,
394
- acceptor,
395
- invoker,
396
- handlers,
405
+ std::move (config),
406
+ std::move (listener),
407
+ std::move (poller),
408
+ std::move (acceptor),
409
+ std::move (invoker),
410
+ std::move (memoryUsageTracker),
411
+ std::move (handlers),
397
412
ownPoller);
398
413
}
399
414
400
415
IServerPtr CreateServer (
401
- const TServerConfigPtr& config,
402
- const IPollerPtr& poller,
403
- const IPollerPtr& acceptor,
404
- const IInvokerPtr& invoker,
416
+ TServerConfigPtr config,
417
+ IPollerPtr poller,
418
+ IPollerPtr acceptor,
419
+ IInvokerPtr invoker,
420
+ IMemoryUsageTrackerPtr memoryUsageTracker,
405
421
bool ownPoller)
406
422
{
407
423
auto address = TNetworkAddress::CreateIPv6Any (config->Port );
408
424
for (int i = 0 ;; ++i) {
409
425
try {
410
426
auto listener = CreateListener (address, poller, acceptor, config->MaxBacklogSize );
411
- return CreateServer (config, listener, poller, acceptor, invoker, ownPoller);
427
+ return CreateServer (
428
+ std::move (config),
429
+ std::move (listener),
430
+ std::move (poller),
431
+ std::move (acceptor),
432
+ std::move (invoker),
433
+ std::move (memoryUsageTracker),
434
+ ownPoller);
412
435
} catch (const std::exception& ex) {
413
436
if (i + 1 == config->BindRetryCount ) {
414
437
throw ;
@@ -425,80 +448,98 @@ IServerPtr CreateServer(
425
448
// //////////////////////////////////////////////////////////////////////////////
426
449
427
450
IServerPtr CreateServer (
428
- const TServerConfigPtr& config,
429
- const IListenerPtr& listener,
430
- const IPollerPtr& poller)
451
+ TServerConfigPtr config,
452
+ IListenerPtr listener,
453
+ IPollerPtr poller)
431
454
{
455
+ auto acceptor = poller;
456
+ auto invoker = poller->GetInvoker ();
432
457
return CreateServer (
433
- config,
434
- listener,
435
- poller,
436
- poller,
437
- poller->GetInvoker (),
458
+ std::move (config),
459
+ std::move (listener),
460
+ std::move (poller),
461
+ std::move (acceptor),
462
+ std::move (invoker),
463
+ /* memoryUsageTracker*/ GetNullMemoryUsageTracker (),
438
464
/* ownPoller*/ false );
439
465
}
440
466
441
467
IServerPtr CreateServer (
442
- const TServerConfigPtr& config,
443
- const IListenerPtr& listener,
444
- const IPollerPtr& poller,
445
- const IPollerPtr& acceptor)
468
+ TServerConfigPtr config,
469
+ IListenerPtr listener,
470
+ IPollerPtr poller,
471
+ IPollerPtr acceptor,
472
+ IMemoryUsageTrackerPtr memoryUsageTracker)
446
473
{
474
+ auto invoker = poller->GetInvoker ();
447
475
return CreateServer (
448
- config,
449
- listener,
450
- poller,
451
- acceptor,
452
- poller->GetInvoker (),
476
+ std::move (config),
477
+ std::move (listener),
478
+ std::move (poller),
479
+ std::move (acceptor),
480
+ std::move (invoker),
481
+ std::move (memoryUsageTracker),
453
482
/* ownPoller*/ false );
454
483
}
455
484
456
- IServerPtr CreateServer (const TServerConfigPtr& config, const IPollerPtr& poller, const IPollerPtr& acceptor)
485
+ IServerPtr CreateServer (
486
+ TServerConfigPtr config,
487
+ IPollerPtr poller,
488
+ IPollerPtr acceptor,
489
+ IMemoryUsageTrackerPtr memoryUsageTracker)
457
490
{
491
+ auto invoker = poller->GetInvoker ();
458
492
return CreateServer (
459
- config,
460
- poller,
461
- acceptor,
462
- poller->GetInvoker (),
493
+ std::move (config),
494
+ std::move (poller),
495
+ std::move (acceptor),
496
+ std::move (invoker),
497
+ std::move (memoryUsageTracker),
463
498
/* ownPoller*/ false );
464
499
}
465
500
466
- IServerPtr CreateServer (const TServerConfigPtr& config, const IPollerPtr& poller)
501
+ IServerPtr CreateServer (TServerConfigPtr config, IPollerPtr poller)
467
502
{
503
+ auto acceptor = poller;
468
504
return CreateServer (
469
- config,
470
- poller,
471
- poller );
505
+ std::move ( config) ,
506
+ std::move ( poller) ,
507
+ std::move (acceptor) );
472
508
}
473
509
474
- IServerPtr CreateServer (int port, const IPollerPtr& poller)
510
+ IServerPtr CreateServer (int port, IPollerPtr poller)
475
511
{
476
512
auto config = New<TServerConfig>();
477
513
config->Port = port;
478
- return CreateServer (config, poller);
514
+ return CreateServer (std::move ( config), std::move ( poller) );
479
515
}
480
516
481
- IServerPtr CreateServer (const TServerConfigPtr& config, int pollerThreadCount)
517
+ IServerPtr CreateServer (TServerConfigPtr config, int pollerThreadCount)
482
518
{
483
519
auto poller = CreateThreadPoolPoller (pollerThreadCount, config->ServerName );
520
+ auto acceptor = poller;
521
+ auto invoker = poller->GetInvoker ();
484
522
return CreateServer (
485
- config,
486
- poller,
487
- poller,
488
- poller->GetInvoker (),
523
+ std::move (config),
524
+ std::move (poller),
525
+ std::move (acceptor),
526
+ std::move (invoker),
527
+ /* memoryUsageTracker*/ GetNullMemoryUsageTracker (),
489
528
/* ownPoller*/ true );
490
529
}
491
530
492
531
IServerPtr CreateServer (
493
- const TServerConfigPtr& config,
494
- const NConcurrency::IPollerPtr& poller,
495
- const IInvokerPtr& invoker)
532
+ TServerConfigPtr config,
533
+ NConcurrency::IPollerPtr poller,
534
+ IInvokerPtr invoker)
496
535
{
536
+ auto acceptor = poller;
497
537
return CreateServer (
498
- config,
499
- poller,
500
- poller,
501
- invoker,
538
+ std::move (config),
539
+ std::move (poller),
540
+ std::move (acceptor),
541
+ std::move (invoker),
542
+ /* memoryUsageTracker*/ GetNullMemoryUsageTracker (),
502
543
/* ownPoller*/ false );
503
544
}
504
545
0 commit comments