Skip to content

Commit c010d74

Browse files
James Sunfacebook-github-bot
authored andcommitted
(3/n) Specialize state actor bootstrap for each alloc
Summary: Different alloc should have different setup of state actor. The remote allocator should bootstrap the state actor inside the initializer. This will be done in the follow-up diffs. Differential Revision: D77914042
1 parent 0ab4c85 commit c010d74

File tree

6 files changed

+165
-19
lines changed

6 files changed

+165
-19
lines changed

hyperactor_extension/src/alloc.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use hyperactor_mesh::alloc::AllocConstraints;
1818
use hyperactor_mesh::alloc::AllocSpec;
1919
use hyperactor_mesh::alloc::AllocatorError;
2020
use hyperactor_mesh::alloc::ProcState;
21+
use hyperactor_mesh::log_source::LogSource;
2122
use hyperactor_mesh::shape::Shape;
2223
use ndslice::Slice;
2324
use pyo3::exceptions::PyValueError;
@@ -84,6 +85,12 @@ impl Alloc for PyAllocWrapper {
8485
self.inner.transport()
8586
}
8687

88+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
89+
LogSource::new_with_local_actor()
90+
.await
91+
.map_err(AllocatorError::from)
92+
}
93+
8794
async fn stop(&mut self) -> Result<(), AllocatorError> {
8895
self.inner.stop().await
8996
}

hyperactor_mesh/src/alloc.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,7 @@ pub trait Alloc {
224224
/// It allows remote processes to stream stdout and stderr back to the client.
225225
/// A client can connect to the log source to obtain the streamed logs.
226226
/// A log source is allocation specific. Each allocator can decide how to stream the logs back.
227-
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
228-
// TODO: this should be implemented based on different allocators.
229-
// Having this temporarily here so that the client can connect to the log source.
230-
// But the client will not get anything.
231-
// The following diffs will gradually implement this for different allocators.
232-
LogSource::new_with_local_actor()
233-
.await
234-
.map_err(AllocatorError::from)
235-
}
227+
async fn log_source(&self) -> Result<LogSource, AllocatorError>;
236228

237229
/// Stop this alloc, shutting down all of its procs. A clean
238230
/// shutdown should result in Stop events from all allocs,
@@ -368,6 +360,12 @@ pub mod test_utils {
368360
self.alloc.transport()
369361
}
370362

363+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
364+
LogSource::new_with_local_actor()
365+
.await
366+
.map_err(AllocatorError::from)
367+
}
368+
371369
async fn stop(&mut self) -> Result<(), AllocatorError> {
372370
self.alloc.stop().await
373371
}

hyperactor_mesh/src/alloc/local.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::alloc::AllocSpec;
3333
use crate::alloc::Allocator;
3434
use crate::alloc::AllocatorError;
3535
use crate::alloc::ProcState;
36+
use crate::log_source::LogSource;
3637
use crate::proc_mesh::mesh_agent::MeshAgent;
3738
use crate::shortuuid::ShortUuid;
3839

@@ -252,6 +253,14 @@ impl Alloc for LocalAlloc {
252253
ChannelTransport::Local
253254
}
254255

256+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
257+
// Local alloc does not need to stream logs back.
258+
// The client can subscribe to it but local actors will not stream logs into it.
259+
LogSource::new_with_local_actor()
260+
.await
261+
.map_err(AllocatorError::from)
262+
}
263+
255264
async fn stop(&mut self) -> Result<(), AllocatorError> {
256265
for rank in 0..self.size() {
257266
self.todo_tx

hyperactor_mesh/src/alloc/process.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ use crate::bootstrap;
5050
use crate::bootstrap::Allocator2Process;
5151
use crate::bootstrap::Process2Allocator;
5252
use crate::bootstrap::Process2AllocatorMessage;
53+
use crate::log_source::LogSource;
54+
use crate::log_source::StateServerInfo;
5355
use crate::shortuuid::ShortUuid;
5456

5557
/// The maximum number of log lines to tail keep for managed processes.
@@ -86,6 +88,9 @@ impl Allocator for ProcessAllocator {
8688
let (bootstrap_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix))
8789
.await
8890
.map_err(anyhow::Error::from)?;
91+
let log_source = LogSource::new_with_local_actor()
92+
.await
93+
.map_err(AllocatorError::from)?;
8994

9095
let name = ShortUuid::generate();
9196
let n = spec.shape.slice().len();
@@ -94,6 +99,7 @@ impl Allocator for ProcessAllocator {
9499
world_id: WorldId(name.to_string()),
95100
spec: spec.clone(),
96101
bootstrap_addr,
102+
log_source,
97103
rx,
98104
index: 0,
99105
active: HashMap::new(),
@@ -112,6 +118,7 @@ pub struct ProcessAlloc {
112118
world_id: WorldId, // to provide storage
113119
spec: AllocSpec,
114120
bootstrap_addr: ChannelAddr,
121+
log_source: LogSource,
115122
rx: channel::ChannelRx<Process2Allocator>,
116123
index: usize,
117124
active: HashMap<usize, Child>,
@@ -142,6 +149,7 @@ struct Child {
142149
impl Child {
143150
fn monitored(
144151
mut process: tokio::process::Child,
152+
state_server_info: StateServerInfo,
145153
) -> (Self, impl Future<Output = ProcStopReason>) {
146154
let (group, handle) = monitor::group();
147155
let (exit_flag, exit_guard) = flag::guarded();
@@ -155,14 +163,8 @@ impl Child {
155163
Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
156164
) = if use_state_actor {
157165
// Parse the state actor address
158-
let state_actor_addr = match "tcp![::]:3000".parse::<ChannelAddr>() {
159-
Ok(addr) => addr,
160-
Err(e) => {
161-
tracing::warn!("Failed to parse state actor address: {}", e);
162-
// Create a dummy address that won't be used
163-
ChannelAddr::any(ChannelTransport::Unix)
164-
}
165-
};
166+
// TODO (@lky): do not hardcode actor id here; use state_server_info
167+
let state_actor_addr = state_server_info.state_proc_addr;
166168
(
167169
Box::new(StateActorWriter::new(
168170
OutputTarget::Stdout,
@@ -386,7 +388,8 @@ impl ProcessAlloc {
386388
None
387389
}
388390
Ok(rank) => {
389-
let (handle, monitor) = Child::monitored(process);
391+
let (handle, monitor) =
392+
Child::monitored(process, self.log_source.server_info());
390393
self.children.spawn(async move { (index, monitor.await) });
391394
self.active.insert(index, handle);
392395
// Adjust for shape slice offset for non-zero shapes (sub-shapes).
@@ -490,6 +493,10 @@ impl Alloc for ProcessAlloc {
490493
ChannelTransport::Unix
491494
}
492495

496+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
497+
Ok(self.log_source.clone())
498+
}
499+
493500
async fn stop(&mut self) -> Result<(), AllocatorError> {
494501
// We rely on the teardown here, and that the process should
495502
// exit on its own. We should have a hard timeout here as well,

0 commit comments

Comments
 (0)