Skip to content

Commit 27dab00

Browse files
kaiyuan-lifacebook-github-bot
authored andcommitted
stream stdout and stderr to StateActor (#430)
Summary: Pull Request resolved: #430 Example to show running an arbitrary command in rust and pipe the stderr and stdout out. Reviewed By: highker Differential Revision: D77377307 fbshipit-source-id: 91b4688b2dec78f168f7f2e41b2fdd410a8d037e
1 parent c85f6cf commit 27dab00

File tree

9 files changed

+742
-47
lines changed

9 files changed

+742
-47
lines changed

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ erased-serde = "0.3.27"
3535
futures = { version = "0.3.30", features = ["async-await", "compat"] }
3636
hyperactor = { version = "0.0.0", path = "../hyperactor" }
3737
hyperactor_mesh_macros = { version = "0.0.0", path = "../hyperactor_mesh_macros" }
38+
hyperactor_state = { version = "0.0.0", path = "../hyperactor_state" }
3839
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
3940
libc = "0.2.139"
4041
mockall = "0.13.1"

hyperactor_mesh/src/alloc/process.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::OnceLock;
1818

1919
use async_trait::async_trait;
2020
use enum_as_inner::EnumAsInner;
21+
use hyperactor::ActorRef;
2122
use hyperactor::ProcId;
2223
use hyperactor::WorldId;
2324
use hyperactor::channel;
@@ -28,8 +29,10 @@ use hyperactor::channel::ChannelTx;
2829
use hyperactor::channel::Rx;
2930
use hyperactor::channel::Tx;
3031
use hyperactor::channel::TxStatus;
32+
use hyperactor::id;
3133
use hyperactor::sync::flag;
3234
use hyperactor::sync::monitor;
35+
use hyperactor_state::state_actor::StateActor;
3336
use ndslice::Shape;
3437
use tokio::io;
3538
use tokio::process::Command;
@@ -143,18 +146,51 @@ impl Child {
143146
) -> (Self, impl Future<Output = ProcStopReason>) {
144147
let (group, handle) = monitor::group();
145148
let (exit_flag, exit_guard) = flag::guarded();
149+
let stop_reason = Arc::new(OnceLock::new());
150+
151+
// TODO(lky): enable state actor branch and remove this flag
152+
let use_state_actor = false;
153+
154+
// Set up stdout and stderr writers
155+
let mut stdout_tee: Box<dyn io::AsyncWrite + Send + Unpin + 'static> =
156+
Box::new(io::stdout());
157+
let mut stderr_tee: Box<dyn io::AsyncWrite + Send + Unpin + 'static> =
158+
Box::new(io::stderr());
159+
160+
// If state actor is enabled, try to set up LogWriter instances
161+
if use_state_actor {
162+
let state_actor_ref = ActorRef::<StateActor>::attest(id!(state_server[0].state[0]));
163+
// Parse the state actor address
164+
if let Ok(state_actor_addr) = "tcp![::]:3000".parse::<ChannelAddr>() {
165+
// Use the helper function to create both writers at once
166+
match hyperactor_state::log_writer::create_log_writers(
167+
state_actor_addr,
168+
state_actor_ref,
169+
) {
170+
Ok((stdout_writer, stderr_writer)) => {
171+
stdout_tee = stdout_writer;
172+
stderr_tee = stderr_writer;
173+
}
174+
Err(e) => {
175+
tracing::error!("failed to create log writers: {}", e);
176+
}
177+
}
178+
} else {
179+
tracing::error!("failed to parse state actor address");
180+
}
181+
}
146182

147183
let stdout = LogTailer::tee(
148184
MAX_TAIL_LOG_LINES,
149185
process.stdout.take().unwrap(),
150-
io::stdout(),
186+
stdout_tee,
151187
);
188+
152189
let stderr = LogTailer::tee(
153190
MAX_TAIL_LOG_LINES,
154191
process.stderr.take().unwrap(),
155-
io::stderr(),
192+
stderr_tee,
156193
);
157-
let stop_reason = Arc::new(OnceLock::new());
158194

159195
let child = Self {
160196
channel: ChannelState::NotConnected,

hyperactor_state/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ license = "BSD-3-Clause"
1010
[dependencies]
1111
anyhow = "1.0.98"
1212
async-trait = "0.1.86"
13+
futures = { version = "0.3.30", features = ["async-await", "compat"] }
14+
hostname = "0.3"
1315
hyperactor = { version = "0.0.0", path = "../hyperactor" }
1416
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
1517
serde = { version = "1.0.185", features = ["derive", "rc"] }

hyperactor_state/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ mod tests {
7272

7373
use super::*;
7474
use crate::create_remote_client;
75-
use crate::spawn_actor;
7675
use crate::test_utils::log_items;
76+
use crate::test_utils::spawn_actor;
7777

7878
#[tracing_test::traced_test]
7979
#[tokio::test]
@@ -83,7 +83,7 @@ mod tests {
8383
let params = ClientActorParams { sender };
8484
let client_proc_id =
8585
hyperactor::reference::ProcId(hyperactor::WorldId("client_server".to_string()), 0);
86-
let (client_actor_addr, client_actor_handle) = spawn_actor::<ClientActor>(
86+
let (client_actor_addr, client_actor_handle, _client_mailbox) = spawn_actor::<ClientActor>(
8787
client_actor_addr.clone(),
8888
client_proc_id,
8989
"state_client",

hyperactor_state/src/lib.rs

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,10 @@ use hyperactor::mailbox::MailboxServer;
2323
use hyperactor::proc::Proc;
2424

2525
pub mod client;
26+
pub mod log_writer;
2627
pub mod object;
2728
pub mod state_actor;
2829

29-
/// Creates a state actor server at given address. Returns the server address and a handle to the
30-
/// state actor.
31-
#[allow(dead_code)]
32-
pub async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
33-
addr: ChannelAddr,
34-
proc_id: ProcId,
35-
actor_name: &str,
36-
params: T::Params,
37-
) -> Result<(ChannelAddr, ActorHandle<T>)> {
38-
// Use the provided ProcId directly
39-
let proc = Proc::new(proc_id, BoxedMailboxSender::new(DialMailboxRouter::new()));
40-
41-
// Set up the channel server
42-
let (local_addr, rx) = channel::serve(addr.clone()).await?;
43-
44-
// Spawn the actor with just a name - the system will generate the full actor ID
45-
let actor_handle: ActorHandle<T> = proc.spawn(actor_name, params).await?;
46-
47-
// Undeliverable messages encountered by the mailbox server
48-
// are to be returned to the system actor.
49-
let _mailbox_handle = proc.clone().serve(rx, actor_handle.port());
50-
51-
// Return the address and handle (not a ref)
52-
Ok((local_addr, actor_handle))
53-
}
54-
5530
/// Creates a remote client that can send message to actors in the remote addr.
5631
/// It is important to keep the client proc alive for the remote_client's lifetime.
5732
pub async fn create_remote_client(addr: ChannelAddr) -> Result<(Proc, Mailbox)> {
@@ -66,26 +41,57 @@ pub async fn create_remote_client(addr: ChannelAddr) -> Result<(Proc, Mailbox)>
6641
}
6742

6843
pub mod test_utils {
44+
use super::*;
6945
use crate::object::GenericStateObject;
46+
use crate::object::Kind;
7047
use crate::object::LogSpec;
7148
use crate::object::LogState;
49+
use crate::object::Name;
7250
use crate::object::StateMetadata;
7351
use crate::object::StateObject;
7452

75-
pub fn log_items(seq_low: usize, seq_high: usize) -> Vec<GenericStateObject> {
53+
pub fn log_items(seq_low: u64, seq_high: u64) -> Vec<GenericStateObject> {
7654
let mut log_items = vec![];
7755
let metadata = StateMetadata {
78-
name: "test".to_string(),
79-
kind: "log".to_string(),
56+
name: Name::StdoutLog(("test_host".to_string(), 12345)),
57+
kind: Kind::Log,
8058
};
8159
let spec = LogSpec {};
8260
for seq in seq_low..seq_high {
83-
let state = LogState::new(seq, format!("state {}", seq));
61+
let state = LogState::from_string(seq, format!("state {}", seq)).unwrap();
8462
let state_object =
8563
StateObject::<LogSpec, LogState>::new(metadata.clone(), spec.clone(), state);
8664
let generic_state_object = GenericStateObject::try_from(state_object).unwrap();
8765
log_items.push(generic_state_object);
8866
}
8967
log_items
9068
}
69+
70+
/// Creates a state actor server at given address. Returns the server address, a handle to the
71+
/// state actor, and a client mailbox for sending messages to the actor.
72+
pub async fn spawn_actor<T: Actor + RemoteActor + Binds<T>>(
73+
addr: ChannelAddr,
74+
proc_id: ProcId,
75+
actor_name: &str,
76+
params: T::Params,
77+
) -> Result<(ChannelAddr, ActorHandle<T>, Mailbox)> {
78+
// Use the provided ProcId directly
79+
let proc = Proc::new(proc_id, BoxedMailboxSender::new(DialMailboxRouter::new()));
80+
81+
// Set up the channel server
82+
let (local_addr, rx) = channel::serve(addr.clone()).await?;
83+
84+
// Spawn the actor with just a name - the system will generate the full actor ID
85+
let actor_handle: ActorHandle<T> = proc.spawn(actor_name, params).await?;
86+
87+
// Create a client mailbox for sending messages to the actor
88+
let client_mailbox = proc.attach("client")?;
89+
90+
// Undeliverable messages encountered by the mailbox server
91+
// are to be returned to the system actor.
92+
let _mailbox_handle = proc.clone().serve(rx, actor_handle.port());
93+
94+
// Return the address, handle, and client mailbox
95+
Ok((local_addr, actor_handle, client_mailbox))
96+
}
9197
}

0 commit comments

Comments
 (0)