Skip to content

Commit daaee6d

Browse files
James Sunfacebook-github-bot
authored andcommitted
(3/n) Specialize state actor bootstrap for each alloc (#459)
Summary: Different alloc should have different setup of state actor. The remote allocator should bootstrap the state actor inside the initializer. This will be done in the follow-up diffs. Differential Revision: D77914042
1 parent 34802be commit daaee6d

File tree

9 files changed

+414
-32
lines changed

9 files changed

+414
-32
lines changed

hyperactor_extension/src/alloc.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use hyperactor_mesh::alloc::AllocConstraints;
1818
use hyperactor_mesh::alloc::AllocSpec;
1919
use hyperactor_mesh::alloc::AllocatorError;
2020
use hyperactor_mesh::alloc::ProcState;
21+
use hyperactor_mesh::log_source::LogSource;
2122
use hyperactor_mesh::shape::Shape;
2223
use ndslice::Slice;
2324
use pyo3::exceptions::PyValueError;
@@ -84,6 +85,10 @@ impl Alloc for PyAllocWrapper {
8485
self.inner.transport()
8586
}
8687

88+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
89+
self.inner.log_source().await
90+
}
91+
8792
async fn stop(&mut self) -> Result<(), AllocatorError> {
8893
self.inner.stop().await
8994
}

hyperactor_mesh/src/alloc.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,7 @@ pub trait Alloc {
223223
/// It allows remote processes to stream stdout and stderr back to the client.
224224
/// A client can connect to the log source to obtain the streamed logs.
225225
/// A log source is allocation specific. Each allocator can decide how to stream the logs back.
226-
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
227-
// TODO: this should be implemented based on different allocators.
228-
// Having this temporarily here so that the client can connect to the log source.
229-
// But the client will not get anything.
230-
// The following diffs will gradually implement this for different allocators.
231-
LogSource::new_with_local_actor()
232-
.await
233-
.map_err(AllocatorError::from)
234-
}
226+
async fn log_source(&self) -> Result<LogSource, AllocatorError>;
235227

236228
/// Stop this alloc, shutting down all of its procs. A clean
237229
/// shutdown should result in Stop events from all allocs,
@@ -367,6 +359,10 @@ pub mod test_utils {
367359
self.alloc.transport()
368360
}
369361

362+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
363+
self.alloc.log_source().await
364+
}
365+
370366
async fn stop(&mut self) -> Result<(), AllocatorError> {
371367
self.alloc.stop().await
372368
}

hyperactor_mesh/src/alloc/local.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::alloc::AllocSpec;
3333
use crate::alloc::Allocator;
3434
use crate::alloc::AllocatorError;
3535
use crate::alloc::ProcState;
36+
use crate::log_source::LogSource;
3637
use crate::proc_mesh::mesh_agent::MeshAgent;
3738
use crate::shortuuid::ShortUuid;
3839

@@ -252,6 +253,14 @@ impl Alloc for LocalAlloc {
252253
ChannelTransport::Local
253254
}
254255

256+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
257+
// Local alloc does not need to stream logs back.
258+
// The client can subscribe to it but local actors will not stream logs into it.
259+
LogSource::new_with_local_actor()
260+
.await
261+
.map_err(AllocatorError::from)
262+
}
263+
255264
async fn stop(&mut self) -> Result<(), AllocatorError> {
256265
for rank in 0..self.size() {
257266
self.todo_tx

hyperactor_mesh/src/alloc/process.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use hyperactor::channel::ChannelTx;
2929
use hyperactor::channel::Rx;
3030
use hyperactor::channel::Tx;
3131
use hyperactor::channel::TxStatus;
32-
use hyperactor::id;
3332
use hyperactor::sync::flag;
3433
use hyperactor::sync::monitor;
3534
use hyperactor_state::state_actor::StateActor;
@@ -51,6 +50,8 @@ use crate::bootstrap;
5150
use crate::bootstrap::Allocator2Process;
5251
use crate::bootstrap::Process2Allocator;
5352
use crate::bootstrap::Process2AllocatorMessage;
53+
use crate::log_source::LogSource;
54+
use crate::log_source::StateServerInfo;
5455
use crate::shortuuid::ShortUuid;
5556

5657
/// The maximum number of log lines to tail keep for managed processes.
@@ -87,6 +88,9 @@ impl Allocator for ProcessAllocator {
8788
let (bootstrap_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix))
8889
.await
8990
.map_err(anyhow::Error::from)?;
91+
let log_source = LogSource::new_with_local_actor()
92+
.await
93+
.map_err(AllocatorError::from)?;
9094

9195
let name = ShortUuid::generate();
9296
let n = spec.shape.slice().len();
@@ -95,6 +99,7 @@ impl Allocator for ProcessAllocator {
9599
world_id: WorldId(name.to_string()),
96100
spec: spec.clone(),
97101
bootstrap_addr,
102+
log_source,
98103
rx,
99104
index: 0,
100105
active: HashMap::new(),
@@ -113,6 +118,7 @@ pub struct ProcessAlloc {
113118
world_id: WorldId, // to provide storage
114119
spec: AllocSpec,
115120
bootstrap_addr: ChannelAddr,
121+
log_source: LogSource,
116122
rx: channel::ChannelRx<Process2Allocator>,
117123
index: usize,
118124
active: HashMap<usize, Child>,
@@ -143,13 +149,14 @@ struct Child {
143149
impl Child {
144150
fn monitored(
145151
mut process: tokio::process::Child,
152+
state_server_info: StateServerInfo,
146153
) -> (Self, impl Future<Output = ProcStopReason>) {
147154
let (group, handle) = monitor::group();
148155
let (exit_flag, exit_guard) = flag::guarded();
149156
let stop_reason = Arc::new(OnceLock::new());
150157

151158
// TODO(lky): enable state actor branch and remove this flag
152-
let use_state_actor = false;
159+
let use_state_actor = true;
153160

154161
// Set up stdout and stderr writers
155162
let mut stdout_tee: Box<dyn io::AsyncWrite + Send + Unpin + 'static> =
@@ -159,24 +166,20 @@ impl Child {
159166

160167
// If state actor is enabled, try to set up LogWriter instances
161168
if use_state_actor {
162-
let state_actor_ref = ActorRef::<StateActor>::attest(id!(state_server[0].state[0]));
163-
// Parse the state actor address
164-
if let Ok(state_actor_addr) = "tcp![::]:3000".parse::<ChannelAddr>() {
165-
// Use the helper function to create both writers at once
166-
match hyperactor_state::log_writer::create_log_writers(
167-
state_actor_addr,
168-
state_actor_ref,
169-
) {
170-
Ok((stdout_writer, stderr_writer)) => {
171-
stdout_tee = stdout_writer;
172-
stderr_tee = stderr_writer;
173-
}
174-
Err(e) => {
175-
tracing::error!("failed to create log writers: {}", e);
176-
}
169+
let state_actor_ref = ActorRef::<StateActor>::attest(state_server_info.state_actor_id);
170+
let state_actor_addr = state_server_info.state_proc_addr;
171+
// Use the helper function to create both writers at once
172+
match hyperactor_state::log_writer::create_log_writers(
173+
state_actor_addr,
174+
state_actor_ref,
175+
) {
176+
Ok((stdout_writer, stderr_writer)) => {
177+
stdout_tee = stdout_writer;
178+
stderr_tee = stderr_writer;
179+
}
180+
Err(e) => {
181+
tracing::error!("failed to create log writers: {}", e);
177182
}
178-
} else {
179-
tracing::error!("failed to parse state actor address");
180183
}
181184
}
182185

@@ -389,7 +392,8 @@ impl ProcessAlloc {
389392
None
390393
}
391394
Ok(rank) => {
392-
let (handle, monitor) = Child::monitored(process);
395+
let (handle, monitor) =
396+
Child::monitored(process, self.log_source.server_info());
393397
self.children.spawn(async move { (index, monitor.await) });
394398
self.active.insert(index, handle);
395399
// Adjust for shape slice offset for non-zero shapes (sub-shapes).
@@ -493,6 +497,10 @@ impl Alloc for ProcessAlloc {
493497
ChannelTransport::Unix
494498
}
495499

500+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
501+
Ok(self.log_source.clone())
502+
}
503+
496504
async fn stop(&mut self) -> Result<(), AllocatorError> {
497505
// We rely on the teardown here, and that the process should
498506
// exit on its own. We should have a hard timeout here as well,

0 commit comments

Comments
 (0)