Skip to content

Commit 57d03e8

Browse files
committed
Provide unique ids across Coordinator instances
Previously we tried to do this from inside the `Coordinator` / `CoordinatorFactory` types, but that didn't handle the case when two `CoordinatorFactory` instances were created (almost) simultaneously. Instead, push the job of determining a unique ID outside of these types, allowing it to be somewhere higher in the call stack (ideally nigh-global).
1 parent 91012b5 commit 57d03e8

File tree

2 files changed

+63
-40
lines changed

2 files changed

+63
-40
lines changed

compiler/base/orchestrator/src/coordinator.rs

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -820,25 +820,33 @@ enum DemultiplexCommand {
820820
ListenOnce(JobId, oneshot::Sender<WorkerMessage>),
821821
}
822822

823-
#[derive(Debug, Copy, Clone)]
824-
pub struct CoordinatorId {
825-
start: u64,
826-
id: u64,
823+
/// The [`Coordinator`][] usually represents a mostly-global resource,
824+
/// such as a Docker container. To avoid conflicts, each container
825+
/// must have a unique name, but that uniqueness can only be
826+
/// guaranteed by whoever is creating [`Coordinator`][]s via the
827+
/// [`CoordinatorFactory`][].
828+
pub trait IdProvider: Send + Sync + fmt::Debug + 'static {
829+
fn next(&self) -> String;
827830
}
828831

829-
/// Enforces a limited number of concurrent `Coordinator`s.
832+
/// A reasonable choice when there's a single [`IdProvider`][] in the
833+
/// entire process.
834+
///
835+
/// This represents uniqueness via a combination of
836+
///
837+
/// 1. **process start time** — this helps avoid conflicts from other
838+
/// processes, assuming they were started at least one second apart.
839+
///
840+
/// 2. **instance counter** — this avoids conflicts from other
841+
/// [`Coordinator`][]s started inside this process.
830842
#[derive(Debug)]
831-
pub struct CoordinatorFactory {
832-
semaphore: Arc<Semaphore>,
833-
843+
pub struct GlobalIdProvider {
834844
start: u64,
835845
id: AtomicU64,
836846
}
837847

838-
impl CoordinatorFactory {
839-
pub fn new(maximum: usize) -> Self {
840-
let semaphore = Arc::new(Semaphore::new(maximum));
841-
848+
impl GlobalIdProvider {
849+
pub fn new() -> Self {
842850
let now = std::time::SystemTime::now();
843851
let start = now
844852
.duration_since(std::time::UNIX_EPOCH)
@@ -847,28 +855,40 @@ impl CoordinatorFactory {
847855

848856
let id = AtomicU64::new(0);
849857

850-
Self {
851-
semaphore,
852-
start,
853-
id,
854-
}
858+
Self { start, id }
855859
}
860+
}
856861

857-
fn next_id(&self) -> CoordinatorId {
862+
impl IdProvider for GlobalIdProvider {
863+
fn next(&self) -> String {
858864
let start = self.start;
859865
let id = self.id.fetch_add(1, Ordering::SeqCst);
860866

861-
CoordinatorId { start, id }
867+
format!("{start}-{id}")
868+
}
869+
}
870+
871+
/// Enforces a limited number of concurrent `Coordinator`s.
872+
#[derive(Debug)]
873+
pub struct CoordinatorFactory {
874+
semaphore: Arc<Semaphore>,
875+
ids: Arc<dyn IdProvider>,
876+
}
877+
878+
impl CoordinatorFactory {
879+
pub fn new(ids: Arc<dyn IdProvider>, maximum: usize) -> Self {
880+
let semaphore = Arc::new(Semaphore::new(maximum));
881+
882+
Self { semaphore, ids }
862883
}
863884

864885
pub fn build<B>(&self) -> Coordinator<B>
865886
where
866-
B: Backend + From<CoordinatorId>,
887+
B: Backend + From<Arc<dyn IdProvider>>,
867888
{
868889
let semaphore = self.semaphore.clone();
869890

870-
let id = self.next_id();
871-
let backend = B::from(id);
891+
let backend = B::from(self.ids.clone());
872892

873893
Coordinator::new(semaphore, backend)
874894
}
@@ -2586,25 +2606,20 @@ fn basic_secure_docker_command() -> Command {
25862606
}
25872607

25882608
pub struct DockerBackend {
2589-
id: CoordinatorId,
2590-
instance: AtomicU64,
2609+
ids: Arc<dyn IdProvider>,
25912610
}
25922611

2593-
impl From<CoordinatorId> for DockerBackend {
2594-
fn from(id: CoordinatorId) -> Self {
2595-
Self {
2596-
id,
2597-
instance: Default::default(),
2598-
}
2612+
impl From<Arc<dyn IdProvider>> for DockerBackend {
2613+
fn from(ids: Arc<dyn IdProvider>) -> Self {
2614+
Self { ids }
25992615
}
26002616
}
26012617

26022618
impl DockerBackend {
26032619
fn next_name(&self) -> String {
2604-
let CoordinatorId { start, id } = self.id;
2605-
let instance = self.instance.fetch_add(1, Ordering::SeqCst);
2620+
let id = self.ids.next();
26062621

2607-
format!("playground-{start}-{id}-{instance}")
2622+
format!("playground-{id}")
26082623
}
26092624
}
26102625

@@ -2794,8 +2809,8 @@ mod tests {
27942809
project_dir: TempDir,
27952810
}
27962811

2797-
impl From<CoordinatorId> for TestBackend {
2798-
fn from(_id: CoordinatorId) -> Self {
2812+
impl From<Arc<dyn IdProvider>> for TestBackend {
2813+
fn from(_ids: Arc<dyn IdProvider>) -> Self {
27992814
static COMPILE_WORKER_ONCE: Once = Once::new();
28002815

28012816
COMPILE_WORKER_ONCE.call_once(|| {
@@ -2849,8 +2864,12 @@ mod tests {
28492864
.unwrap_or(5)
28502865
});
28512866

2852-
static TEST_COORDINATOR_FACTORY: Lazy<CoordinatorFactory> =
2853-
Lazy::new(|| CoordinatorFactory::new(*MAX_CONCURRENT_TESTS));
2867+
static TEST_COORDINATOR_ID_PROVIDER: Lazy<Arc<GlobalIdProvider>> =
2868+
Lazy::new(|| Arc::new(GlobalIdProvider::new()));
2869+
2870+
static TEST_COORDINATOR_FACTORY: Lazy<CoordinatorFactory> = Lazy::new(|| {
2871+
CoordinatorFactory::new(TEST_COORDINATOR_ID_PROVIDER.clone(), *MAX_CONCURRENT_TESTS)
2872+
});
28542873

28552874
fn new_coordinator_test() -> Coordinator<TestBackend> {
28562875
TEST_COORDINATOR_FACTORY.build()

ui/src/main.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![deny(rust_2018_idioms)]
22

3-
use orchestrator::coordinator::CoordinatorFactory;
3+
use orchestrator::coordinator::{CoordinatorFactory, GlobalIdProvider, IdProvider};
44
use std::{
55
net::SocketAddr,
66
path::{Path, PathBuf},
@@ -47,6 +47,7 @@ struct Config {
4747
metrics_token: Option<String>,
4848
feature_flags: FeatureFlags,
4949
request_db_path: Option<PathBuf>,
50+
id_provider: Arc<dyn IdProvider>,
5051
coordinators_one_off_limit: usize,
5152
coordinators_websocket_limit: usize,
5253
port: u16,
@@ -106,6 +107,8 @@ impl Config {
106107

107108
let request_db_path = env::var_os("PLAYGROUND_REQUEST_DATABASE").map(Into::into);
108109

110+
let id_provider = Arc::new(GlobalIdProvider::new());
111+
109112
let coordinators_one_off_limit = env::var("PLAYGROUND_COORDINATORS_ONE_OFF_LIMIT")
110113
.ok()
111114
.and_then(|l| l.parse().ok())
@@ -123,6 +126,7 @@ impl Config {
123126
metrics_token,
124127
feature_flags,
125128
request_db_path,
129+
id_provider,
126130
coordinators_one_off_limit,
127131
coordinators_websocket_limit,
128132
port,
@@ -162,11 +166,11 @@ impl Config {
162166
}
163167

164168
fn coordinator_one_off_factory(&self) -> CoordinatorFactory {
165-
CoordinatorFactory::new(self.coordinators_one_off_limit)
169+
CoordinatorFactory::new(self.id_provider.clone(), self.coordinators_one_off_limit)
166170
}
167171

168172
fn coordinator_websocket_factory(&self) -> CoordinatorFactory {
169-
CoordinatorFactory::new(self.coordinators_websocket_limit)
173+
CoordinatorFactory::new(self.id_provider.clone(), self.coordinators_websocket_limit)
170174
}
171175

172176
fn server_socket_addr(&self) -> SocketAddr {

0 commit comments

Comments
 (0)