@@ -359,3 +359,96 @@ fn test_load_shedding_pprof() {
359
359
assert_eq ! ( StatusCode :: OK , ok_request. await . unwrap( ) )
360
360
} ) ;
361
361
}
362
+
363
+ /// Test concurrency limiter for `/call` endpoint and that when the load shedder kicks in
364
+ /// we return 429.
365
+ /// Test scenario:
366
+ /// 1. Set the concurrency limiter for the call service, `max_call_concurrent_requests`, to 1.
367
+ /// 2. Use [`Agent`] to make an update calls where we wait with responding for the update call
368
+ /// inside the ingress filter service handle.
369
+ /// 3. Concurrently make another update call, and assert it hits the load shedder.
370
+ #[ test]
371
+ fn test_load_shedding_update_call ( ) {
372
+ let rt = Runtime :: new ( ) . unwrap ( ) ;
373
+ let addr = get_free_localhost_socket_addr ( ) ;
374
+
375
+ let config = Config {
376
+ listen_addr : addr,
377
+ max_call_concurrent_requests : 1 ,
378
+ ..Default :: default ( )
379
+ } ;
380
+
381
+ let mock_state_manager = basic_state_manager_mock ( ) ;
382
+ let mock_consensus_cache = basic_consensus_pool_cache ( ) ;
383
+ let mock_registry_client = basic_registry_client ( ) ;
384
+
385
+ let canister = Principal :: from_text ( "223xb-saaaa-aaaaf-arlqa-cai" ) . unwrap ( ) ;
386
+
387
+ let ( mut ingress_filter, mut ingress_sender, _) = start_http_endpoint (
388
+ rt. handle ( ) . clone ( ) ,
389
+ config,
390
+ Arc :: new ( mock_state_manager) ,
391
+ Arc :: new ( mock_consensus_cache) ,
392
+ Arc :: new ( mock_registry_client) ,
393
+ Arc :: new ( Pprof :: default ( ) ) ,
394
+ ) ;
395
+
396
+ let ingress_filter_running = Arc :: new ( Notify :: new ( ) ) ;
397
+ let load_shedder_returned = Arc :: new ( Notify :: new ( ) ) ;
398
+
399
+ let ok_agent = Agent :: builder ( )
400
+ . with_transport ( ReqwestHttpReplicaV2Transport :: create ( format ! ( "http://{}" , addr) ) . unwrap ( ) )
401
+ . build ( )
402
+ . unwrap ( ) ;
403
+
404
+ let load_shedded_agent = ok_agent. clone ( ) ;
405
+
406
+ let ingress_filter_running_clone = ingress_filter_running. clone ( ) ;
407
+ let load_shedder_returned_clone = load_shedder_returned. clone ( ) ;
408
+
409
+ let load_shedded_agent_handle = rt. spawn ( async move {
410
+ ingress_filter_running_clone. notified ( ) . await ;
411
+ let resp = load_shedded_agent
412
+ . update ( & canister, "some method" )
413
+ . call ( )
414
+ . await ;
415
+ load_shedder_returned_clone. notify_one ( ) ;
416
+ resp
417
+ } ) ;
418
+
419
+ // Ingress sender mock that returns empty Ok(()) response.
420
+ rt. spawn ( async move {
421
+ loop {
422
+ let ( _, resp) = ingress_sender. next_request ( ) . await . unwrap ( ) ;
423
+ resp. send_response ( Ok ( ( ) ) )
424
+ }
425
+ } ) ;
426
+
427
+ // Mock ingress filter
428
+ rt. spawn ( async move {
429
+ let ( _, resp) = ingress_filter. next_request ( ) . await . unwrap ( ) ;
430
+ ingress_filter_running. notify_one ( ) ;
431
+ load_shedder_returned. notified ( ) . await ;
432
+ resp. send_response ( Ok ( ( ) ) )
433
+ } ) ;
434
+
435
+ rt. block_on ( async {
436
+ wait_for_status_healthy ( & ok_agent) . await . unwrap ( ) ;
437
+ let resp = ok_agent. update ( & canister, "some method" ) . call ( ) . await ;
438
+
439
+ assert ! ( resp. is_ok( ) , "Received unexpeceted response: {:?}" , resp) ;
440
+
441
+ let resp = load_shedded_agent_handle. await . unwrap ( ) ;
442
+ let expected_resp = StatusCode :: TOO_MANY_REQUESTS ;
443
+
444
+ match resp {
445
+ Err ( AgentError :: HttpError ( HttpErrorPayload { status, .. } ) ) => {
446
+ assert_eq ! ( expected_resp, status)
447
+ }
448
+ _ => panic ! (
449
+ "Load shedder did not kick in. Received unexpeceted response: {:?}" ,
450
+ resp
451
+ ) ,
452
+ }
453
+ } ) ;
454
+ }
0 commit comments