Skip to content

Commit 62b2e32

Browse files
James Sunfacebook-github-bot
authored andcommitted
(5/n) simplify log streaming without state actor (pytorch-labs#519)
Summary: Pull Request resolved: pytorch-labs#519 Similar to many other actors (rsync, pdb), let's just stream logs directly back to the client without state actor. Reviewed By: shayne-fletcher Differential Revision: D78142248 fbshipit-source-id: c488012b1621a5f0a116c5d76facc5955f2f3b17
1 parent 54471df commit 62b2e32

File tree

11 files changed

+618
-185
lines changed

11 files changed

+618
-185
lines changed

hyperactor_mesh/src/alloc/process.rs

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::OnceLock;
1818

1919
use async_trait::async_trait;
2020
use enum_as_inner::EnumAsInner;
21-
use hyperactor::ActorRef;
2221
use hyperactor::ProcId;
2322
use hyperactor::WorldId;
2423
use hyperactor::channel;
@@ -31,7 +30,6 @@ use hyperactor::channel::Tx;
3130
use hyperactor::channel::TxStatus;
3231
use hyperactor::sync::flag;
3332
use hyperactor::sync::monitor;
34-
use hyperactor_state::state_actor::StateActor;
3533
use ndslice::Shape;
3634
use nix::sys::signal;
3735
use nix::unistd::Pid;
@@ -53,7 +51,6 @@ use crate::bootstrap::Allocator2Process;
5351
use crate::bootstrap::Process2Allocator;
5452
use crate::bootstrap::Process2AllocatorMessage;
5553
use crate::log_source::LogSource;
56-
use crate::log_source::StateServerInfo;
5754
use crate::shortuuid::ShortUuid;
5855

5956
/// The maximum number of log lines to tail keep for managed processes.
@@ -151,38 +148,29 @@ struct Child {
151148
impl Child {
152149
fn monitored(
153150
mut process: tokio::process::Child,
154-
state_server_info: StateServerInfo,
151+
log_channel: ChannelAddr,
155152
) -> (Self, impl Future<Output = ProcStopReason>) {
156153
let (group, handle) = monitor::group();
157154
let (exit_flag, exit_guard) = flag::guarded();
158155
let stop_reason = Arc::new(OnceLock::new());
159156

160-
// TODO(lky): enable state actor branch and remove this flag
161-
let use_state_actor = false;
162-
163157
// Set up stdout and stderr writers
164158
let mut stdout_tee: Box<dyn io::AsyncWrite + Send + Unpin + 'static> =
165159
Box::new(io::stdout());
166160
let mut stderr_tee: Box<dyn io::AsyncWrite + Send + Unpin + 'static> =
167161
Box::new(io::stderr());
168162

169-
// If state actor is enabled, try to set up LogWriter instances
170-
if use_state_actor {
171-
let state_actor_ref = ActorRef::<StateActor>::attest(state_server_info.state_actor_id);
172-
let state_actor_addr = state_server_info.state_proc_addr;
173-
// Use the helper function to create both writers at once
174-
match hyperactor_state::log_writer::create_log_writers(
175-
state_actor_addr,
176-
state_actor_ref,
177-
process.id().unwrap_or(0),
178-
) {
179-
Ok((stdout_writer, stderr_writer)) => {
180-
stdout_tee = stdout_writer;
181-
stderr_tee = stderr_writer;
182-
}
183-
Err(e) => {
184-
tracing::error!("failed to create log writers: {}", e);
185-
}
163+
// Use the helper function to create both writers at once
164+
match hyperactor_state::log_writer::create_log_writers(
165+
log_channel,
166+
process.id().unwrap_or(0),
167+
) {
168+
Ok((stdout_writer, stderr_writer)) => {
169+
stdout_tee = stdout_writer;
170+
stderr_tee = stderr_writer;
171+
}
172+
Err(e) => {
173+
tracing::error!("failed to create log writers: {}", e);
186174
}
187175
}
188176

@@ -367,12 +355,14 @@ impl ProcessAlloc {
367355
let mut cmd = self.cmd.lock().await;
368356
let index = self.index;
369357
self.index += 1;
358+
let log_channel: ChannelAddr = ChannelAddr::any(ChannelTransport::Unix);
370359

371360
cmd.env(
372361
bootstrap::BOOTSTRAP_ADDR_ENV,
373362
self.bootstrap_addr.to_string(),
374363
);
375364
cmd.env(bootstrap::BOOTSTRAP_INDEX_ENV, index.to_string());
365+
cmd.env(bootstrap::BOOTSTRAP_LOG_CHANNEL, log_channel.to_string());
376366
cmd.stdout(Stdio::piped());
377367
cmd.stderr(Stdio::piped());
378368

@@ -398,8 +388,7 @@ impl ProcessAlloc {
398388
None
399389
}
400390
Ok(rank) => {
401-
let (handle, monitor) =
402-
Child::monitored(process, self.log_source.server_info());
391+
let (handle, monitor) = Child::monitored(process, log_channel);
403392
self.children.spawn(async move { (index, monitor.await) });
404393
self.active.insert(index, handle);
405394
// Adjust for shape slice offset for non-zero shapes (sub-shapes).

hyperactor_mesh/src/bootstrap.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ use crate::proc_mesh::mesh_agent::MeshAgent;
2828

2929
pub(crate) const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
3030
pub(crate) const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
31+
/// A channel used by each process to receive its own stdout and stderr
32+
/// Because stdout and stderr can only be obtained by the parent process,
33+
/// they need to be streamed back to the process.
34+
pub(crate) const BOOTSTRAP_LOG_CHANNEL: &str = "BOOTSTRAP_LOG_CHANNEL";
3135

3236
/// Messages sent from the process to the allocator. This is an envelope
3337
/// containing the index of the process (i.e., its "address" assigned by

0 commit comments

Comments
 (0)