Skip to content

Commit f116c28

Browse files
committed
Implement caching via tasks instead of shared mutexes
This helps prevent a rare suboptimal case where a visitor to the playground triggered the crates / versions request but then left the site before the request finished, resulting in the request being canceled and the work being wasted without being cached. This mostly showed up when running heavy load tests locally to try and suss out other bugs. Other benefits: - This will also result in reduced contention when multiple requests would have triggered a cache refresh. Only one computation should occur. - The cache value is now computed out-of-band and requests should not block on it.
1 parent db8a083 commit f116c28

File tree

2 files changed

+358
-159
lines changed

2 files changed

+358
-159
lines changed

ui/src/server_axum.rs

Lines changed: 92 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use axum_extra::{
2525
headers::{authorization::Bearer, Authorization, CacheControl, ETag, IfNoneMatch},
2626
TypedHeader,
2727
};
28-
use futures::{future::BoxFuture, FutureExt};
28+
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
2929
use orchestrator::coordinator::{self, CoordinatorFactory, DockerBackend, TRACKED_CONTAINERS};
3030
use snafu::prelude::*;
3131
use std::{
@@ -34,9 +34,9 @@ use std::{
3434
mem, path,
3535
str::FromStr,
3636
sync::{Arc, LazyLock},
37-
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
37+
time::{Duration, Instant, UNIX_EPOCH},
3838
};
39-
use tokio::{select, sync::Mutex};
39+
use tokio::{select, sync::mpsc};
4040
use tower_http::{
4141
cors::{self, CorsLayer},
4242
request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer},
@@ -48,25 +48,35 @@ use tracing::{error, error_span, field};
4848

4949
use crate::{env::PLAYGROUND_GITHUB_TOKEN, public_http_api as api};
5050

51+
use cache::{
52+
cache_task, CacheTaskItem, CacheTx, CacheTxError, Stamped, SANDBOX_CACHE_TIME_TO_LIVE,
53+
};
54+
5155
const ONE_HOUR: Duration = Duration::from_secs(60 * 60);
52-
const CORS_CACHE_TIME_TO_LIVE: Duration = ONE_HOUR;
5356

54-
const TEN_MINUTES: Duration = Duration::from_secs(10 * 60);
55-
const SANDBOX_CACHE_TIME_TO_LIVE: Duration = TEN_MINUTES;
57+
const CORS_CACHE_TIME_TO_LIVE: Duration = ONE_HOUR;
5658

5759
const MAX_AGE_ONE_DAY: HeaderValue = HeaderValue::from_static("public, max-age=86400");
5860
const MAX_AGE_ONE_YEAR: HeaderValue = HeaderValue::from_static("public, max-age=31536000");
5961

6062
const DOCKER_PROCESS_TIMEOUT_SOFT: Duration = Duration::from_secs(10);
6163

64+
mod cache;
6265
mod websocket;
6366

6467
#[derive(Debug, Clone)]
6568
struct Factory(Arc<CoordinatorFactory>);
6669

6770
#[tokio::main]
6871
pub(crate) async fn serve(config: Config) {
69-
let factory = Factory(Arc::new(config.coordinator_factory()));
72+
let factory = Arc::new(config.coordinator_factory());
73+
74+
let (cache_crates_task, cache_crates_tx) =
75+
CacheTx::spawn(|rx| cache_crates_task(factory.clone(), rx));
76+
let (cache_versions_task, cache_versions_tx) =
77+
CacheTx::spawn(|rx| cache_versions_task(factory.clone(), rx));
78+
79+
let factory = Factory(factory);
7080

7181
let request_db = config.request_database();
7282
let (db_task, db_handle) = request_db.spawn();
@@ -101,7 +111,8 @@ pub(crate) async fn serve(config: Config) {
101111
)
102112
.layer(Extension(factory))
103113
.layer(Extension(db_handle))
104-
.layer(Extension(Arc::new(SandboxCache::default())))
114+
.layer(Extension(cache_crates_tx))
115+
.layer(Extension(cache_versions_tx))
105116
.layer(Extension(config.github_token()))
106117
.layer(Extension(config.feature_flags))
107118
.layer(Extension(config.websocket_config));
@@ -167,6 +178,8 @@ pub(crate) async fn serve(config: Config) {
167178
select! {
168179
v = server => v.unwrap(),
169180
v = db_task => v.unwrap(),
181+
v = cache_crates_task => v.unwrap(),
182+
v = cache_versions_task => v.unwrap(),
170183
}
171184
}
172185

@@ -477,24 +490,22 @@ where
477490
}
478491

479492
async fn meta_crates(
480-
Extension(factory): Extension<Factory>,
481-
Extension(cache): Extension<Arc<SandboxCache>>,
493+
Extension(tx): Extension<CacheCratesTx>,
482494
if_none_match: Option<TypedHeader<IfNoneMatch>>,
483495
) -> Result<impl IntoResponse> {
484-
let value =
485-
track_metric_no_request_async(Endpoint::MetaCrates, || cache.crates(&factory.0)).await?;
486-
496+
let value = track_metric_no_request_async(Endpoint::MetaCrates, || tx.get())
497+
.await
498+
.context(CratesSnafu)?;
487499
apply_timestamped_caching(value, if_none_match)
488500
}
489501

490502
async fn meta_versions(
491-
Extension(factory): Extension<Factory>,
492-
Extension(cache): Extension<Arc<SandboxCache>>,
503+
Extension(tx): Extension<CacheVersionsTx>,
493504
if_none_match: Option<TypedHeader<IfNoneMatch>>,
494505
) -> Result<impl IntoResponse> {
495-
let value =
496-
track_metric_no_request_async(Endpoint::MetaVersions, || cache.versions(&factory.0))
497-
.await?;
506+
let value = track_metric_no_request_async(Endpoint::MetaVersions, || tx.get())
507+
.await
508+
.context(VersionsSnafu)?;
498509
apply_timestamped_caching(value, if_none_match)
499510
}
500511

@@ -640,6 +651,67 @@ impl MetricsAuthorization {
640651
const FAILURE: MetricsAuthorizationRejection = (StatusCode::UNAUTHORIZED, "Wrong credentials");
641652
}
642653

654+
type CacheCratesTx = CacheTx<api::MetaCratesResponse, CacheCratesError>;
655+
type CacheCratesItem = CacheTaskItem<api::MetaCratesResponse, CacheCratesError>;
656+
657+
#[tracing::instrument(skip_all)]
658+
async fn cache_crates_task(factory: Arc<CoordinatorFactory>, rx: mpsc::Receiver<CacheCratesItem>) {
659+
cache_task(rx, move || {
660+
let coordinator = factory.build::<DockerBackend>();
661+
662+
async move {
663+
let crates = coordinator.crates().map_ok(From::from).await?;
664+
665+
coordinator.shutdown().await?;
666+
667+
Ok::<_, CacheCratesError>(crates)
668+
}
669+
.boxed()
670+
})
671+
.await
672+
}
673+
674+
#[derive(Debug, Snafu)]
675+
enum CacheCratesError {
676+
#[snafu(transparent)]
677+
Crates { source: coordinator::CratesError },
678+
679+
#[snafu(transparent)]
680+
Shutdown { source: coordinator::Error },
681+
}
682+
683+
type CacheVersionsTx = CacheTx<api::MetaVersionsResponse, CacheVersionsError>;
684+
type CacheVersionsItem = CacheTaskItem<api::MetaVersionsResponse, CacheVersionsError>;
685+
686+
#[tracing::instrument(skip_all)]
687+
async fn cache_versions_task(
688+
factory: Arc<CoordinatorFactory>,
689+
rx: mpsc::Receiver<CacheVersionsItem>,
690+
) {
691+
cache_task(rx, move || {
692+
let coordinator = factory.build::<DockerBackend>();
693+
694+
async move {
695+
let versions = coordinator.versions().map_ok(From::from).await?;
696+
697+
coordinator.shutdown().await?;
698+
699+
Ok::<_, CacheVersionsError>(versions)
700+
}
701+
.boxed()
702+
})
703+
.await
704+
}
705+
706+
#[derive(Debug, Snafu)]
707+
enum CacheVersionsError {
708+
#[snafu(transparent)]
709+
Versions { source: coordinator::VersionsError },
710+
711+
#[snafu(transparent)]
712+
Shutdown { source: coordinator::Error },
713+
}
714+
643715
#[async_trait]
644716
impl<S> extract::FromRequestParts<S> for MetricsAuthorization
645717
where
@@ -667,145 +739,6 @@ where
667739
}
668740
}
669741

670-
type Stamped<T> = (T, SystemTime);
671-
672-
#[derive(Debug, Default)]
673-
struct SandboxCache {
674-
crates: CacheOne<api::MetaCratesResponse>,
675-
versions: CacheOne<api::MetaVersionsResponse>,
676-
}
677-
678-
impl SandboxCache {
679-
async fn crates(
680-
&self,
681-
factory: &CoordinatorFactory,
682-
) -> Result<Stamped<api::MetaCratesResponse>> {
683-
let coordinator = factory.build::<DockerBackend>();
684-
685-
let c = self
686-
.crates
687-
.fetch(|| async { Ok(coordinator.crates().await.context(CratesSnafu)?.into()) })
688-
.await;
689-
690-
coordinator
691-
.shutdown()
692-
.await
693-
.context(ShutdownCoordinatorSnafu)?;
694-
695-
c
696-
}
697-
698-
async fn versions(
699-
&self,
700-
factory: &CoordinatorFactory,
701-
) -> Result<Stamped<api::MetaVersionsResponse>> {
702-
let coordinator = factory.build::<DockerBackend>();
703-
704-
let v = self
705-
.versions
706-
.fetch(|| async { Ok(coordinator.versions().await.context(VersionsSnafu)?.into()) })
707-
.await;
708-
709-
coordinator
710-
.shutdown()
711-
.await
712-
.context(ShutdownCoordinatorSnafu)?;
713-
714-
v
715-
}
716-
}
717-
718-
#[derive(Debug)]
719-
struct CacheOne<T>(Mutex<Option<CacheInfo<T>>>);
720-
721-
impl<T> Default for CacheOne<T> {
722-
fn default() -> Self {
723-
Self(Default::default())
724-
}
725-
}
726-
727-
impl<T> CacheOne<T>
728-
where
729-
T: Clone + PartialEq,
730-
{
731-
async fn fetch<F, FFut>(&self, generator: F) -> Result<Stamped<T>>
732-
where
733-
F: FnOnce() -> FFut,
734-
FFut: Future<Output = Result<T>>,
735-
{
736-
let data = &mut *self.0.lock().await;
737-
match data {
738-
Some(info) => {
739-
if info.validation_time.elapsed() <= SANDBOX_CACHE_TIME_TO_LIVE {
740-
Ok(info.stamped_value())
741-
} else {
742-
Self::set_value(data, generator).await
743-
}
744-
}
745-
None => Self::set_value(data, generator).await,
746-
}
747-
}
748-
749-
async fn set_value<F, FFut>(data: &mut Option<CacheInfo<T>>, generator: F) -> Result<Stamped<T>>
750-
where
751-
F: FnOnce() -> FFut,
752-
FFut: Future<Output = Result<T>>,
753-
{
754-
let value = generator().await?;
755-
756-
let old_info = data.take();
757-
let new_info = CacheInfo::build(value);
758-
759-
let info = match old_info {
760-
Some(mut old_value) => {
761-
if old_value.value == new_info.value {
762-
// The value hasn't changed; record that we have
763-
// checked recently, but keep the creation time to
764-
// preserve caching.
765-
old_value.validation_time = new_info.validation_time;
766-
old_value
767-
} else {
768-
new_info
769-
}
770-
}
771-
None => new_info,
772-
};
773-
774-
let value = info.stamped_value();
775-
776-
*data = Some(info);
777-
778-
Ok(value)
779-
}
780-
}
781-
782-
#[derive(Debug)]
783-
struct CacheInfo<T> {
784-
value: T,
785-
creation_time: SystemTime,
786-
validation_time: Instant,
787-
}
788-
789-
impl<T> CacheInfo<T> {
790-
fn build(value: T) -> Self {
791-
let creation_time = SystemTime::now();
792-
let validation_time = Instant::now();
793-
794-
Self {
795-
value,
796-
creation_time,
797-
validation_time,
798-
}
799-
}
800-
801-
fn stamped_value(&self) -> Stamped<T>
802-
where
803-
T: Clone,
804-
{
805-
(self.value.clone(), self.creation_time)
806-
}
807-
}
808-
809742
impl IntoResponse for Error {
810743
fn into_response(self) -> axum::response::Response {
811744
let error = snafu::CleanedErrorText::new(&self)
@@ -901,12 +834,12 @@ enum Error {
901834

902835
#[snafu(display("Unable to find the available crates"))]
903836
Crates {
904-
source: orchestrator::coordinator::CratesError,
837+
source: CacheTxError<CacheCratesError>,
905838
},
906839

907840
#[snafu(display("Unable to find the available versions"))]
908841
Versions {
909-
source: orchestrator::coordinator::VersionsError,
842+
source: CacheTxError<CacheVersionsError>,
910843
},
911844

912845
#[snafu(display("Unable to shutdown the coordinator"))]

0 commit comments

Comments
 (0)