Skip to content

Commit d03d512

Browse files
highkerfacebook-github-bot
authored andcommitted
(1/n) hook state client to client for logging
Summary: Launch logging client through proc mesh. State actor bootstrap needs to be manually setup. The following diffs will make things automated. Differential Revision: D77848230
1 parent b65857b commit d03d512

File tree

4 files changed

+120
-11
lines changed

4 files changed

+120
-11
lines changed

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use hyperactor::actor::remote::Remote;
2525
use hyperactor::cap;
2626
use hyperactor::channel;
2727
use hyperactor::channel::ChannelAddr;
28+
use hyperactor::id;
2829
use hyperactor::mailbox;
2930
use hyperactor::mailbox::BoxableMailboxSender;
3031
use hyperactor::mailbox::BoxedMailboxSender;
@@ -38,6 +39,10 @@ use hyperactor::proc::Proc;
3839
use hyperactor::reference::ProcId;
3940
use hyperactor::reference::Reference;
4041
use hyperactor::supervision::ActorSupervisionEvent;
42+
use hyperactor_state::client::ClientActor;
43+
use hyperactor_state::client::ClientActorParams;
44+
use hyperactor_state::state_actor::StateActor;
45+
use hyperactor_state::state_actor::StateMessageClient;
4146
use ndslice::Range;
4247
use ndslice::Shape;
4348
use ndslice::ShapeError;
@@ -199,7 +204,7 @@ impl ProcMesh {
199204
client_proc
200205
.clone()
201206
.serve(client_rx, mailbox::monitored_return_handle());
202-
router.bind(client_proc_id.clone().into(), client_proc_addr);
207+
router.bind(client_proc_id.clone().into(), client_proc_addr.clone());
203208

204209
// Bind this router to the global router, to enable cross-mesh routing.
205210
// TODO: unbind this when we incorporate mesh destruction too.
@@ -296,6 +301,42 @@ impl ProcMesh {
296301
.map_err(anyhow::Error::from)?;
297302
}
298303

304+
// Get a reference to the state actor for streaming logs.
305+
// TODO: bind logging options to python API so that users can choose if to stream (optionally aggregated) logs back or not.
306+
// TODO: spin up state actor locally and remotely with names and addresses passed in here.
307+
let state_actor_id = id!(state_server[0].state[0]);
308+
let state_actor_ref = ActorRef::<StateActor>::attest(state_actor_id.clone());
309+
let state_actor_addr = "tcp![::]:3000".parse::<ChannelAddr>().unwrap();
310+
router.bind(state_actor_id.into(), state_actor_addr.clone());
311+
312+
let log_handler = Box::new(hyperactor_state::client::StdlogHandler {});
313+
let params = ClientActorParams { log_handler };
314+
315+
let client_logging_actor: ActorRef<ClientActor> = client_proc
316+
.spawn::<ClientActor>("logging_client", params)
317+
.await
318+
.unwrap()
319+
.bind();
320+
321+
match state_actor_ref
322+
.subscribe_logs(
323+
&client,
324+
client_proc_addr.clone(),
325+
client_logging_actor.clone(),
326+
)
327+
.await
328+
{
329+
Ok(_) => {}
330+
Err(err) => {
331+
// TODO: talking to the state actor is not on the critical path.
332+
// However, if the state actor is not reachable, we will receive spams of mailbox warnings.
333+
tracing::warn!(
334+
"failed to subscribe to state actor logs; remote logging will not be available on the client: {}",
335+
err
336+
);
337+
}
338+
}
339+
299340
let shape = alloc.shape().clone();
300341
let world_id = alloc.world_id().clone();
301342

hyperactor_state/src/client.rs

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,46 @@ use hyperactor_macros::HandleClient;
1616
use hyperactor_macros::RefClient;
1717
use serde::Deserialize;
1818
use serde::Serialize;
19-
use tokio::sync::mpsc::Sender;
2019

2120
use crate::object::GenericStateObject;
21+
use crate::object::LogSpec;
22+
use crate::object::LogState;
23+
use crate::object::StateObject;
24+
25+
pub trait LogHandler: Sync + Send + std::fmt::Debug + 'static {
26+
// we cannot call it handle here as it conflicts with hyperactor macro
27+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()>;
28+
}
29+
30+
/// A log handler that flushes GenericStateObject to stdout.
31+
#[derive(Debug)]
32+
pub struct StdlogHandler;
33+
34+
impl LogHandler for StdlogHandler {
35+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
36+
for log in logs {
37+
let metadata = log.metadata();
38+
let deserialized_data: StateObject<LogSpec, LogState> = log.data().deserialized()?;
39+
// TODO: @lky D77377307 do not use raw string to distinguish between stdout and stderr
40+
if metadata.name == "stderr" {
41+
eprintln!("(remote logging) {}", deserialized_data.state.message);
42+
} else {
43+
println!("(remote logging) {}", deserialized_data.state.message);
44+
}
45+
println!();
46+
}
47+
Ok(())
48+
}
49+
}
2250

2351
/// A client to interact with the state actor.
2452
#[derive(Debug)]
2553
#[hyperactor::export(
2654
handlers = [ClientMessage],
2755
)]
2856
pub struct ClientActor {
29-
sender: Sender<Vec<GenericStateObject>>,
57+
// TODO: extend hyperactor macro to support a generic to avoid using Box here.
58+
log_handler: Box<dyn LogHandler>,
3059
}
3160

3261
/// Endpoints for the client actor.
@@ -37,15 +66,17 @@ pub enum ClientMessage {
3766
}
3867

3968
pub struct ClientActorParams {
40-
pub sender: Sender<Vec<GenericStateObject>>,
69+
pub log_handler: Box<dyn LogHandler>,
4170
}
4271

4372
#[async_trait]
4473
impl Actor for ClientActor {
4574
type Params = ClientActorParams;
4675

47-
async fn new(ClientActorParams { sender }: ClientActorParams) -> Result<Self, anyhow::Error> {
48-
Ok(Self { sender })
76+
async fn new(
77+
ClientActorParams { log_handler }: ClientActorParams,
78+
) -> Result<Self, anyhow::Error> {
79+
Ok(Self { log_handler })
4980
}
5081
}
5182

@@ -57,7 +88,7 @@ impl ClientMessageHandler for ClientActor {
5788
_cx: &Context<Self>,
5889
logs: Vec<GenericStateObject>,
5990
) -> Result<(), anyhow::Error> {
60-
self.sender.send(logs).await?;
91+
self.log_handler.handle_log(logs)?;
6192
Ok(())
6293
}
6394
}
@@ -69,18 +100,36 @@ mod tests {
69100
use hyperactor::ActorRef;
70101
use hyperactor::channel;
71102
use hyperactor::channel::ChannelAddr;
103+
use tokio::sync::mpsc::Sender;
72104

73105
use super::*;
74106
use crate::create_remote_client;
75107
use crate::spawn_actor;
76108
use crate::test_utils::log_items;
77109

110+
/// A log handler that flushes GenericStateObject to a mpsc channel.
111+
#[derive(Debug)]
112+
struct MpscLogHandler {
113+
sender: Sender<Vec<GenericStateObject>>,
114+
}
115+
116+
impl LogHandler for MpscLogHandler {
117+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
118+
let sender = self.sender.clone();
119+
tokio::spawn(async move {
120+
sender.send(logs).await.unwrap();
121+
});
122+
Ok(())
123+
}
124+
}
125+
78126
#[tracing_test::traced_test]
79127
#[tokio::test]
80128
async fn test_client_basics() {
81129
let client_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
82130
let (sender, mut receiver) = tokio::sync::mpsc::channel::<Vec<GenericStateObject>>(10);
83-
let params = ClientActorParams { sender };
131+
let log_handler = Box::new(MpscLogHandler { sender });
132+
let params = ClientActorParams { log_handler };
84133
let client_proc_id =
85134
hyperactor::reference::ProcId(hyperactor::WorldId("client_server".to_string()), 0);
86135
let (client_actor_addr, client_actor_handle) = spawn_actor::<ClientActor>(

hyperactor_state/src/object.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct StateMetadata {
2323
pub struct StateObject<S, T> {
2424
metadata: StateMetadata,
2525
spec: S,
26-
state: T,
26+
pub(crate) state: T,
2727
}
2828

2929
impl<S, T> StateObject<S, T> {
@@ -46,7 +46,7 @@ pub struct LogState {
4646
/// A monotonically increasing sequence number.
4747
seq: usize,
4848
/// The message in the log.
49-
message: String,
49+
pub(crate) message: String,
5050
}
5151

5252
impl LogState {

hyperactor_state/src/state_actor.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,31 @@ mod tests {
9292
use std::time::Duration;
9393

9494
use hyperactor::channel;
95+
use tokio::sync::mpsc::Sender;
9596

9697
use super::*;
9798
use crate::client::ClientActorParams;
99+
use crate::client::LogHandler;
98100
use crate::create_remote_client;
99101
use crate::spawn_actor;
100102
use crate::test_utils::log_items;
101103

104+
/// A log handler that flushes GenericStateObject to a mpsc channel.
105+
#[derive(Debug)]
106+
struct MpscLogHandler {
107+
sender: Sender<Vec<GenericStateObject>>,
108+
}
109+
110+
impl LogHandler for MpscLogHandler {
111+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
112+
let sender = self.sender.clone();
113+
tokio::spawn(async move {
114+
sender.send(logs).await.unwrap();
115+
});
116+
Ok(())
117+
}
118+
}
119+
102120
#[tokio::test]
103121
async fn test_subscribe_logs() {
104122
let state_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
@@ -112,7 +130,8 @@ mod tests {
112130

113131
let client_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
114132
let (sender, mut receiver) = tokio::sync::mpsc::channel::<Vec<GenericStateObject>>(20);
115-
let params = ClientActorParams { sender };
133+
let log_handler = Box::new(MpscLogHandler { sender });
134+
let params = ClientActorParams { log_handler };
116135
let client_proc_id =
117136
hyperactor::reference::ProcId(hyperactor::WorldId("client_server".to_string()), 0);
118137
let (client_actor_addr, client_actor_handle) = spawn_actor::<ClientActor>(

0 commit comments

Comments
 (0)