Skip to content

Commit b65857b

Browse files
kaiyuan-lifacebook-github-bot
authored andcommitted
rust example to capture stdout and stderr
Differential Revision: D77377307
1 parent eb4d4de commit b65857b

File tree

5 files changed

+568
-3
lines changed

5 files changed

+568
-3
lines changed

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ erased-serde = "0.3.27"
3535
futures = { version = "0.3.30", features = ["async-await", "compat"] }
3636
hyperactor = { version = "0.0.0", path = "../hyperactor" }
3737
hyperactor_mesh_macros = { version = "0.0.0", path = "../hyperactor_mesh_macros" }
38+
hyperactor_state = { version = "0.0.0", path = "../hyperactor_state" }
3839
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
3940
libc = "0.2.139"
4041
mockall = "0.13.1"

hyperactor_mesh/src/alloc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod local;
1313
pub(crate) mod logtailer;
1414
pub mod process;
1515
pub mod remoteprocess;
16+
pub mod state_actor_writer;
1617

1718
use std::collections::HashMap;
1819
use std::fmt;

hyperactor_mesh/src/alloc/process.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ use super::AllocatorError;
4343
use super::ProcState;
4444
use super::ProcStopReason;
4545
use super::logtailer::LogTailer;
46+
use super::state_actor_writer::OutputTarget;
47+
use super::state_actor_writer::StateActorWriter;
4648
use crate::assign::Ranks;
4749
use crate::bootstrap;
4850
use crate::bootstrap::Allocator2Process;
@@ -143,18 +145,49 @@ impl Child {
143145
) -> (Self, impl Future<Output = ProcStopReason>) {
144146
let (group, handle) = monitor::group();
145147
let (exit_flag, exit_guard) = flag::guarded();
148+
let stop_reason = Arc::new(OnceLock::new());
149+
150+
// TODO(lky): enable state actor branch
151+
let use_state_actor = true;
152+
153+
let (stdout_tee, stderr_tee): (
154+
Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
155+
Box<dyn io::AsyncWrite + Send + Unpin + 'static>,
156+
) = if use_state_actor {
157+
// Parse the state actor address
158+
let state_actor_addr = match "tcp![::]:3000".parse::<ChannelAddr>() {
159+
Ok(addr) => addr,
160+
Err(e) => {
161+
tracing::warn!("Failed to parse state actor address: {}", e);
162+
// Create a dummy address that won't be used
163+
ChannelAddr::any(ChannelTransport::Unix)
164+
}
165+
};
166+
(
167+
Box::new(StateActorWriter::new(
168+
OutputTarget::Stdout,
169+
state_actor_addr.clone(),
170+
)),
171+
Box::new(StateActorWriter::new(
172+
OutputTarget::Stderr,
173+
state_actor_addr,
174+
)),
175+
)
176+
} else {
177+
(Box::new(io::stdout()), Box::new(io::stderr()))
178+
};
146179

147180
let stdout = LogTailer::tee(
148181
MAX_TAIL_LOG_LINES,
149182
process.stdout.take().unwrap(),
150-
io::stdout(),
183+
stdout_tee,
151184
);
185+
152186
let stderr = LogTailer::tee(
153187
MAX_TAIL_LOG_LINES,
154188
process.stderr.take().unwrap(),
155-
io::stderr(),
189+
stderr_tee,
156190
);
157-
let stop_reason = Arc::new(OnceLock::new());
158191

159192
let child = Self {
160193
channel: ChannelState::NotConnected,

0 commit comments

Comments
 (0)