Skip to content

Commit f6d410d

Browse files
committed
test: [NET-1444] Test load shedding on call endpoint
1 parent bc32d91 commit f6d410d

File tree

3 files changed

+98
-11
lines changed

3 files changed

+98
-11
lines changed

rs/http_endpoints/public/src/call.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
use crate::{
44
body::BodyReceiverLayer,
55
common::{
6-
get_cors_headers, make_plaintext_response, make_response, map_box_error_to_response,
7-
remove_effective_canister_id,
6+
get_cors_headers, make_plaintext_response, make_response, remove_effective_canister_id,
87
},
98
metrics::LABEL_UNKNOWN,
109
types::ApiReqType,
@@ -33,8 +32,7 @@ use std::pin::Pin;
3332
use std::sync::Arc;
3433
use std::task::{Context, Poll};
3534
use tower::{
36-
limit::GlobalConcurrencyLimitLayer, load_shed::LoadShed, util::BoxCloneService, Service,
37-
ServiceBuilder, ServiceExt,
35+
limit::GlobalConcurrencyLimitLayer, util::BoxCloneService, Service, ServiceBuilder, ServiceExt,
3836
};
3937

4038
#[derive(Clone)]
@@ -45,7 +43,7 @@ pub(crate) struct CallService {
4543
registry_client: Arc<dyn RegistryClient>,
4644
validator_executor: ValidatorExecutor,
4745
ingress_sender: IngressIngestionService,
48-
ingress_filter: LoadShed<IngressFilterService>,
46+
ingress_filter: IngressFilterService,
4947
malicious_flags: MaliciousFlags,
5048
}
5149

@@ -74,7 +72,7 @@ impl CallService {
7472
registry_client,
7573
validator_executor,
7674
ingress_sender,
77-
ingress_filter: ServiceBuilder::new().load_shed().service(ingress_filter),
75+
ingress_filter,
7876
malicious_flags,
7977
}),
8078
);
@@ -262,9 +260,7 @@ impl Service<Request<Vec<u8>>> for CallService {
262260
.call((provisional_whitelist, msg.content().clone()))
263261
.await
264262
{
265-
Err(err) => {
266-
return Ok(map_box_error_to_response(err));
267-
}
263+
Err(_) => panic!("Can't panic on Infallible"),
268264
Ok(Err(err)) => {
269265
return Ok(make_response(err));
270266
}

rs/http_endpoints/public/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,6 @@ pub fn start_server(
226226
metrics_registry: &MetricsRegistry,
227227
config: Config,
228228
ingress_filter: IngressFilterService,
229-
// ingress_sender and query_execution_service are external services with a concurrency limiter.
230-
// It is safe to clone them and pass them to a single-threaded context.
231229
ingress_sender: IngressIngestionService,
232230
query_execution_service: QueryExecutionService,
233231
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,

rs/http_endpoints/public/tests/load_shed_test.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,96 @@ fn test_load_shedding_pprof() {
359359
assert_eq!(StatusCode::OK, ok_request.await.unwrap())
360360
});
361361
}
362+
363+
/// Test concurrency limiter for `/call` endpoint and that when the load shedder kicks in
364+
/// we return 429.
365+
/// Test scenario:
366+
/// 1. Set the concurrency limiter for the call service, `max_call_concurrent_requests`, to 1.
367+
/// 2. Use [`Agent`] to make an update calls where we wait with responding for the update call
368+
/// inside the ingress filter service handle.
369+
/// 3. Concurrently make another update call, and assert it hits the load shedder.
370+
#[test]
371+
fn test_load_shedding_update_call() {
372+
let rt = Runtime::new().unwrap();
373+
let addr = get_free_localhost_socket_addr();
374+
375+
let config = Config {
376+
listen_addr: addr,
377+
max_call_concurrent_requests: 1,
378+
..Default::default()
379+
};
380+
381+
let mock_state_manager = basic_state_manager_mock();
382+
let mock_consensus_cache = basic_consensus_pool_cache();
383+
let mock_registry_client = basic_registry_client();
384+
385+
let canister = Principal::from_text("223xb-saaaa-aaaaf-arlqa-cai").unwrap();
386+
387+
let (mut ingress_filter, mut ingress_sender, _) = start_http_endpoint(
388+
rt.handle().clone(),
389+
config,
390+
Arc::new(mock_state_manager),
391+
Arc::new(mock_consensus_cache),
392+
Arc::new(mock_registry_client),
393+
Arc::new(Pprof::default()),
394+
);
395+
396+
let ingress_filter_running = Arc::new(Notify::new());
397+
let load_shedder_returned = Arc::new(Notify::new());
398+
399+
let ok_agent = Agent::builder()
400+
.with_transport(ReqwestHttpReplicaV2Transport::create(format!("http://{}", addr)).unwrap())
401+
.build()
402+
.unwrap();
403+
404+
let load_shedded_agent = ok_agent.clone();
405+
406+
let ingress_filter_running_clone = ingress_filter_running.clone();
407+
let load_shedder_returned_clone = load_shedder_returned.clone();
408+
409+
let load_shedded_agent_handle = rt.spawn(async move {
410+
ingress_filter_running_clone.notified().await;
411+
let resp = load_shedded_agent
412+
.update(&canister, "some method")
413+
.call()
414+
.await;
415+
load_shedder_returned_clone.notify_one();
416+
resp
417+
});
418+
419+
// Ingress sender mock that returns empty Ok(()) response.
420+
rt.spawn(async move {
421+
loop {
422+
let (_, resp) = ingress_sender.next_request().await.unwrap();
423+
resp.send_response(Ok(()))
424+
}
425+
});
426+
427+
// Mock ingress filter
428+
rt.spawn(async move {
429+
let (_, resp) = ingress_filter.next_request().await.unwrap();
430+
ingress_filter_running.notify_one();
431+
load_shedder_returned.notified().await;
432+
resp.send_response(Ok(()))
433+
});
434+
435+
rt.block_on(async {
436+
wait_for_status_healthy(&ok_agent).await.unwrap();
437+
let resp = ok_agent.update(&canister, "some method").call().await;
438+
439+
assert!(resp.is_ok(), "Received unexpeceted response: {:?}", resp);
440+
441+
let resp = load_shedded_agent_handle.await.unwrap();
442+
let expected_resp = StatusCode::TOO_MANY_REQUESTS;
443+
444+
match resp {
445+
Err(AgentError::HttpError(HttpErrorPayload { status, .. })) => {
446+
assert_eq!(expected_resp, status)
447+
}
448+
_ => panic!(
449+
"Load shedder did not kick in. Received unexpeceted response: {:?}",
450+
resp
451+
),
452+
}
453+
});
454+
}

0 commit comments

Comments
 (0)