Skip to content

Commit 02ca8c0

Browse files
James Sunfacebook-github-bot
authored andcommitted
(4/n) Make log handler async (#485)
Summary: Pull Request resolved: #485 Forgot to anonotate with async_trait thus having the issue before Reviewed By: shayne-fletcher, kaiyuan-li Differential Revision: D78028382
1 parent 9b63cd1 commit 02ca8c0

File tree

4 files changed

+26
-35
lines changed

4 files changed

+26
-35
lines changed

hyperactor_mesh/src/alloc/remoteprocess.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,17 +117,12 @@ struct ForwarderLogHandler {
117117
forwarder: Mailbox,
118118
}
119119

120+
#[async_trait]
120121
impl LogHandler for ForwarderLogHandler {
121-
fn handle_log(&self, logs: Vec<GenericStateObject>) -> anyhow::Result<()> {
122+
async fn handle_log(&self, logs: Vec<GenericStateObject>) -> anyhow::Result<()> {
122123
let actor = self.parent_state_actor.clone();
123124
let forwarder = self.forwarder.clone();
124-
// TODO: (@jamessun) this is horribly wrong. ClientActor's log handler needs to be type erased
125-
// so that the state actor can be subscribed by different kinds of ClientActor.
126-
// However, async function and Box do not work well together.
127-
tokio::spawn(async move {
128-
actor.push_logs(&forwarder, logs).await.unwrap();
129-
});
130-
Ok(())
125+
actor.push_logs(&forwarder, logs).await
131126
}
132127
}
133128

@@ -1126,12 +1121,10 @@ mod test {
11261121
sender: Sender<Vec<GenericStateObject>>,
11271122
}
11281123

1124+
#[async_trait]
11291125
impl LogHandler for MpscLogHandler {
1130-
fn handle_log(&self, logs: Vec<GenericStateObject>) -> anyhow::Result<()> {
1131-
let sender = self.sender.clone();
1132-
tokio::spawn(async move {
1133-
sender.send(logs).await.unwrap();
1134-
});
1126+
async fn handle_log(&self, logs: Vec<GenericStateObject>) -> anyhow::Result<()> {
1127+
self.sender.send(logs).await.unwrap();
11351128
Ok(())
11361129
}
11371130
}

hyperactor_mesh/src/log_source.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ mod tests {
117117
use std::str::FromStr;
118118
use std::time::Duration;
119119

120+
use async_trait::async_trait;
120121
use hyperactor::channel;
121122
use hyperactor::channel::ChannelAddr;
122123
use hyperactor::clock::Clock;
@@ -162,12 +163,10 @@ mod tests {
162163
sender: Sender<Vec<GenericStateObject>>,
163164
}
164165

166+
#[async_trait]
165167
impl LogHandler for MpscLogHandler {
166-
fn handle_log(&self, logs: Vec<GenericStateObject>) -> anyhow::Result<()> {
167-
let sender = self.sender.clone();
168-
tokio::spawn(async move {
169-
sender.send(logs).await.unwrap();
170-
});
168+
async fn handle_log(&self, logs: Vec<GenericStateObject>) -> anyhow::Result<()> {
169+
self.sender.send(logs).await.unwrap();
171170
Ok(())
172171
}
173172
}

hyperactor_state/src/client.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,34 @@ use crate::object::LogState;
2424
use crate::object::Name;
2525
use crate::object::StateObject;
2626

27+
/// A trait for handling logs received by the client actor.
28+
#[async_trait]
2729
pub trait LogHandler: Sync + Send + std::fmt::Debug + 'static {
28-
// we cannot call it handle here as it conflicts with hyperactor macro
29-
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()>;
30+
/// Handle the logs received by the client actor.
31+
async fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()>;
3032
}
3133

3234
/// A log handler that flushes GenericStateObject to stdout.
3335
#[derive(Debug)]
3436
pub struct StdlogHandler;
3537

38+
#[async_trait]
3639
impl LogHandler for StdlogHandler {
37-
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
40+
async fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
3841
for log in logs {
3942
let metadata = log.metadata();
4043
let deserialized_data: StateObject<LogSpec, LogState> = log.data().deserialized()?;
4144

4245
// Deserialize the message and process line by line with UTF-8
4346
let message_lines = deserialize_message_lines(&deserialized_data.state.message)?;
4447

45-
// TODO: @lky D77377307 do not use raw string to distinguish between stdout and stderr
4648
if metadata.kind != Kind::Log {
4749
continue;
4850
}
4951
match &metadata.name {
5052
Name::StdoutLog((hostname, pid)) => {
5153
for line in message_lines {
52-
// TODO: @lky hostname and pid should only be printed for non-aggregated logs. =
54+
// TODO: @lky hostname and pid should only be printed for non-aggregated logs.
5355
// For aggregated logs, we should leave as is for better aggregation.
5456
println!("[{} {}] {}", hostname, pid, line);
5557
}
@@ -90,7 +92,8 @@ fn deserialize_message_lines(
9092
handlers = [ClientMessage],
9193
)]
9294
pub struct ClientActor {
93-
// TODO: extend hyperactor macro to support a generic to avoid using Box here.
95+
// Use boxed log handler to erase types.
96+
// This is needed because the client actor ref needs to be sent over the wire and used by the state actor.
9497
log_handler: Box<dyn LogHandler>,
9598
}
9699

@@ -124,7 +127,7 @@ impl ClientMessageHandler for ClientActor {
124127
_cx: &Context<Self>,
125128
logs: Vec<GenericStateObject>,
126129
) -> Result<(), anyhow::Error> {
127-
self.log_handler.handle_log(logs)?;
130+
self.log_handler.handle_log(logs).await?;
128131
Ok(())
129132
}
130133
}
@@ -151,12 +154,10 @@ mod tests {
151154
sender: Sender<Vec<GenericStateObject>>,
152155
}
153156

157+
#[async_trait]
154158
impl LogHandler for MpscLogHandler {
155-
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
156-
let sender = self.sender.clone();
157-
tokio::spawn(async move {
158-
sender.send(logs).await.unwrap();
159-
});
159+
async fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
160+
self.sender.send(logs).await.unwrap();
160161
Ok(())
161162
}
162163
}

hyperactor_state/src/state_actor.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,10 @@ mod tests {
112112
sender: Sender<Vec<GenericStateObject>>,
113113
}
114114

115+
#[async_trait]
115116
impl LogHandler for MpscLogHandler {
116-
fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
117-
let sender = self.sender.clone();
118-
tokio::spawn(async move {
119-
sender.send(logs).await.unwrap();
120-
});
117+
async fn handle_log(&self, logs: Vec<GenericStateObject>) -> Result<()> {
118+
self.sender.send(logs).await.unwrap();
121119
Ok(())
122120
}
123121
}

0 commit comments

Comments
 (0)