Skip to content

Commit 754e018

Browse files
James Sunfacebook-github-bot
authored andcommitted
(1/n) hook state client to client for logging (#445)
Summary: Launch logging client through proc mesh. State actor bootstrap needs to be manually setup. The following diffs will make things automated. Reviewed By: shayne-fletcher Differential Revision: D77848230
1 parent 8bfb112 commit 754e018

File tree

3 files changed

+206
-11
lines changed

3 files changed

+206
-11
lines changed

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use hyperactor::actor::remote::Remote;
2525
use hyperactor::cap;
2626
use hyperactor::channel;
2727
use hyperactor::channel::ChannelAddr;
28+
use hyperactor::id;
2829
use hyperactor::mailbox;
2930
use hyperactor::mailbox::BoxableMailboxSender;
3031
use hyperactor::mailbox::BoxedMailboxSender;
@@ -38,6 +39,10 @@ use hyperactor::proc::Proc;
3839
use hyperactor::reference::ProcId;
3940
use hyperactor::reference::Reference;
4041
use hyperactor::supervision::ActorSupervisionEvent;
42+
use hyperactor_state::client::ClientActor;
43+
use hyperactor_state::client::ClientActorParams;
44+
use hyperactor_state::state_actor::StateActor;
45+
use hyperactor_state::state_actor::StateMessageClient;
4146
use ndslice::Range;
4247
use ndslice::Shape;
4348
use ndslice::ShapeError;
@@ -200,7 +205,7 @@ impl ProcMesh {
200205
client_proc
201206
.clone()
202207
.serve(client_rx, mailbox::monitored_return_handle());
203-
router.bind(client_proc_id.clone().into(), client_proc_addr);
208+
router.bind(client_proc_id.clone().into(), client_proc_addr.clone());
204209

205210
// Bind this router to the global router, to enable cross-mesh routing.
206211
// TODO: unbind this when we incorporate mesh destruction too.
@@ -297,6 +302,42 @@ impl ProcMesh {
297302
.map_err(anyhow::Error::from)?;
298303
}
299304

305+
// Get a reference to the state actor for streaming logs.
306+
// TODO: bind logging options to python API so that users can choose if to stream (optionally aggregated) logs back or not.
307+
// TODO: spin up state actor locally and remotely with names and addresses passed in here.
308+
let state_actor_id = id!(state_server[0].state[0]);
309+
let state_actor_ref = ActorRef::<StateActor>::attest(state_actor_id.clone());
310+
let state_actor_addr = "tcp![::]:3000".parse::<ChannelAddr>().unwrap();
311+
router.bind(state_actor_id.into(), state_actor_addr.clone());
312+
313+
let log_handler = Box::new(hyperactor_state::client::StdlogHandler {});
314+
let params = ClientActorParams { log_handler };
315+
316+
let client_logging_actor: ActorRef<ClientActor> = client_proc
317+
.spawn::<ClientActor>("logging_client", params)
318+
.await
319+
.unwrap()
320+
.bind();
321+
322+
match state_actor_ref
323+
.subscribe_logs(
324+
&client,
325+
client_proc_addr.clone(),
326+
client_logging_actor.clone(),
327+
)
328+
.await
329+
{
330+
Ok(_) => {}
331+
Err(err) => {
332+
// TODO: talking to the state actor is not on the critical path.
333+
// However, if the state actor is not reachable, we will receive spams of mailbox warnings.
334+
tracing::warn!(
335+
"failed to subscribe to state actor logs; remote logging will not be available on the client: {}",
336+
err
337+
);
338+
}
339+
}
340+
300341
let shape = alloc.shape().clone();
301342
let world_id = alloc.world_id().clone();
302343

hyperactor_state/src/client.rs

Lines changed: 144 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,82 @@ use hyperactor_macros::HandleClient;
1616
use hyperactor_macros::RefClient;
1717
use serde::Deserialize;
1818
use serde::Serialize;
19-
use tokio::sync::mpsc::Sender;
2019

2120
use crate::object::GenericStateObject;
21+
use crate::object::Kind;
22+
use crate::object::LogSpec;
23+
use crate::object::LogState;
24+
use crate::object::Name;
25+
use crate::object::StateObject;
26+
27+
pub trait LogHandler: Sync + Send + std::fmt::Debug + 'static {
28+
// we cannot call it handle here as it conflicts with hyperactor macro
29+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()>;
30+
}
31+
32+
/// A log handler that flushes GenericStateObject to stdout.
33+
#[derive(Debug)]
34+
pub struct StdlogHandler;
35+
36+
impl LogHandler for StdlogHandler {
37+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
38+
for log in logs {
39+
let metadata = log.metadata();
40+
let deserialized_data: StateObject<LogSpec, LogState> = log.data().deserialized()?;
41+
42+
// Deserialize the message and process line by line with UTF-8
43+
let message_lines = deserialize_message_lines(&deserialized_data.state.message)?;
44+
45+
// TODO: @lky D77377307 do not use raw string to distinguish between stdout and stderr
46+
if metadata.kind != Kind::Log {
47+
continue;
48+
}
49+
match &metadata.name {
50+
Name::StdoutLog((hostname, pid)) => {
51+
for line in message_lines {
52+
// TODO: @lky hostname and pid should only be printed for non-aggregated logs. =
53+
// For aggregated logs, we should leave as is for better aggregation.
54+
println!("[{} {}] {}", hostname, pid, line);
55+
}
56+
}
57+
Name::StderrLog((hostname, pid)) => {
58+
for line in message_lines {
59+
eprintln!("[{} {}] {}", hostname, pid, line);
60+
}
61+
}
62+
}
63+
}
64+
Ok(())
65+
}
66+
}
67+
68+
/// Deserialize a Serialized message and split it into UTF-8 lines
69+
fn deserialize_message_lines(
70+
serialized_message: &hyperactor::data::Serialized,
71+
) -> Result<Vec<String>> {
72+
// Try to deserialize as String first
73+
if let Ok(message_str) = serialized_message.deserialized::<String>() {
74+
return Ok(message_str.lines().map(|s| s.to_string()).collect());
75+
}
76+
77+
// If that fails, try to deserialize as Vec<u8> and convert to UTF-8
78+
if let Ok(message_bytes) = serialized_message.deserialized::<Vec<u8>>() {
79+
let message_str = String::from_utf8(message_bytes)?;
80+
return Ok(message_str.lines().map(|s| s.to_string()).collect());
81+
}
82+
83+
// If both fail, return an error
84+
anyhow::bail!("Failed to deserialize message as either String or Vec<u8>")
85+
}
2286

2387
/// A client to interact with the state actor.
2488
#[derive(Debug)]
2589
#[hyperactor::export(
2690
handlers = [ClientMessage],
2791
)]
2892
pub struct ClientActor {
29-
sender: Sender<Vec<GenericStateObject>>,
93+
// TODO: extend hyperactor macro to support a generic to avoid using Box here.
94+
log_handler: Box<dyn LogHandler>,
3095
}
3196

3297
/// Endpoints for the client actor.
@@ -37,15 +102,17 @@ pub enum ClientMessage {
37102
}
38103

39104
pub struct ClientActorParams {
40-
pub sender: Sender<Vec<GenericStateObject>>,
105+
pub log_handler: Box<dyn LogHandler>,
41106
}
42107

43108
#[async_trait]
44109
impl Actor for ClientActor {
45110
type Params = ClientActorParams;
46111

47-
async fn new(ClientActorParams { sender }: ClientActorParams) -> Result<Self, anyhow::Error> {
48-
Ok(Self { sender })
112+
async fn new(
113+
ClientActorParams { log_handler }: ClientActorParams,
114+
) -> Result<Self, anyhow::Error> {
115+
Ok(Self { log_handler })
49116
}
50117
}
51118

@@ -57,7 +124,7 @@ impl ClientMessageHandler for ClientActor {
57124
_cx: &Context<Self>,
58125
logs: Vec<GenericStateObject>,
59126
) -> Result<(), anyhow::Error> {
60-
self.sender.send(logs).await?;
127+
self.log_handler.handle_log(logs)?;
61128
Ok(())
62129
}
63130
}
@@ -69,18 +136,38 @@ mod tests {
69136
use hyperactor::ActorRef;
70137
use hyperactor::channel;
71138
use hyperactor::channel::ChannelAddr;
139+
use hyperactor::clock::Clock;
140+
use hyperactor::data::Serialized;
141+
use tokio::sync::mpsc::Sender;
72142

73143
use super::*;
74144
use crate::create_remote_client;
75145
use crate::test_utils::log_items;
76146
use crate::test_utils::spawn_actor;
77147

148+
/// A log handler that flushes GenericStateObject to a mpsc channel.
149+
#[derive(Debug)]
150+
struct MpscLogHandler {
151+
sender: Sender<Vec<GenericStateObject>>,
152+
}
153+
154+
impl LogHandler for MpscLogHandler {
155+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
156+
let sender = self.sender.clone();
157+
tokio::spawn(async move {
158+
sender.send(logs).await.unwrap();
159+
});
160+
Ok(())
161+
}
162+
}
163+
78164
#[tracing_test::traced_test]
79165
#[tokio::test]
80166
async fn test_client_basics() {
81167
let client_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
82168
let (sender, mut receiver) = tokio::sync::mpsc::channel::<Vec<GenericStateObject>>(10);
83-
let params = ClientActorParams { sender };
169+
let log_handler = Box::new(MpscLogHandler { sender });
170+
let params = ClientActorParams { log_handler };
84171
let client_proc_id =
85172
hyperactor::reference::ProcId(hyperactor::WorldId("client_server".to_string()), 0);
86173
let (client_actor_addr, client_actor_handle, _client_mailbox) = spawn_actor::<ClientActor>(
@@ -102,7 +189,9 @@ mod tests {
102189
.unwrap();
103190

104191
// Collect received messages with timeout
105-
let fetched_logs = tokio::time::timeout(Duration::from_secs(1), receiver.recv())
192+
let clock = hyperactor::clock::ClockKind::default();
193+
let fetched_logs = clock
194+
.timeout(Duration::from_secs(1), receiver.recv())
106195
.await
107196
.expect("timed out waiting for message")
108197
.expect("channel closed unexpectedly");
@@ -112,7 +201,53 @@ mod tests {
112201
assert_eq!(fetched_logs, log_items_0_10);
113202

114203
// Now test that no extra message is waiting
115-
let extra = tokio::time::timeout(Duration::from_millis(100), receiver.recv()).await;
204+
let extra = clock
205+
.timeout(Duration::from_millis(100), receiver.recv())
206+
.await;
116207
assert!(extra.is_err(), "expected no more messages");
117208
}
209+
210+
#[test]
211+
fn test_deserialize_message_lines_string() {
212+
// Test deserializing a String message with multiple lines
213+
let message = "Line 1\nLine 2\nLine 3".to_string();
214+
let serialized = Serialized::serialize_anon(&message).unwrap();
215+
216+
let result = deserialize_message_lines(&serialized).unwrap();
217+
218+
assert_eq!(result, vec!["Line 1", "Line 2", "Line 3"]);
219+
220+
// Test deserializing a Vec<u8> message with UTF-8 content
221+
let message_bytes = "Hello\nWorld\nUTF-8 \u{1F980}".as_bytes().to_vec();
222+
let serialized = Serialized::serialize_anon(&message_bytes).unwrap();
223+
224+
let result = deserialize_message_lines(&serialized).unwrap();
225+
226+
assert_eq!(result, vec!["Hello", "World", "UTF-8 \u{1F980}"]);
227+
228+
// Test deserializing a single line message
229+
let message = "Single line message".to_string();
230+
let serialized = Serialized::serialize_anon(&message).unwrap();
231+
232+
let result = deserialize_message_lines(&serialized).unwrap();
233+
234+
assert_eq!(result, vec!["Single line message"]);
235+
236+
// Test deserializing an empty lines
237+
let message = "\n\n".to_string();
238+
let serialized = Serialized::serialize_anon(&message).unwrap();
239+
240+
let result = deserialize_message_lines(&serialized).unwrap();
241+
242+
assert_eq!(result, vec!["", ""]);
243+
244+
// Test error handling for invalid UTF-8 bytes
245+
let invalid_utf8_bytes = vec![0xFF, 0xFE, 0xFD]; // Invalid UTF-8 sequence
246+
let serialized = Serialized::serialize_anon(&invalid_utf8_bytes).unwrap();
247+
248+
let result = deserialize_message_lines(&serialized);
249+
250+
assert!(result.is_err());
251+
assert!(result.unwrap_err().to_string().contains("invalid utf-8"));
252+
}
118253
}

hyperactor_state/src/state_actor.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,31 @@ mod tests {
9292
use std::time::Duration;
9393

9494
use hyperactor::channel;
95+
use tokio::sync::mpsc::Sender;
9596

9697
use super::*;
9798
use crate::client::ClientActorParams;
99+
use crate::client::LogHandler;
98100
use crate::create_remote_client;
99101
use crate::test_utils::log_items;
100102
use crate::test_utils::spawn_actor;
101103

104+
/// A log handler that flushes GenericStateObject to a mpsc channel.
105+
#[derive(Debug)]
106+
struct MpscLogHandler {
107+
sender: Sender<Vec<GenericStateObject>>,
108+
}
109+
110+
impl LogHandler for MpscLogHandler {
111+
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
112+
let sender = self.sender.clone();
113+
tokio::spawn(async move {
114+
sender.send(logs).await.unwrap();
115+
});
116+
Ok(())
117+
}
118+
}
119+
102120
#[tokio::test]
103121
async fn test_subscribe_logs() {
104122
let state_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
@@ -112,7 +130,8 @@ mod tests {
112130

113131
let client_actor_addr = ChannelAddr::any(channel::ChannelTransport::Unix);
114132
let (sender, mut receiver) = tokio::sync::mpsc::channel::<Vec<GenericStateObject>>(20);
115-
let params = ClientActorParams { sender };
133+
let log_handler = Box::new(MpscLogHandler { sender });
134+
let params = ClientActorParams { log_handler };
116135
let client_proc_id =
117136
hyperactor::reference::ProcId(hyperactor::WorldId("client_server".to_string()), 0);
118137
let (client_actor_addr, client_actor_handle, _client_mailbox) = spawn_actor::<ClientActor>(

0 commit comments

Comments
 (0)