Skip to content

Commit 91f24b1

Browse files
James Sunfacebook-github-bot
authored andcommitted
(6/n) delete usage of state actor (#521)
Summary: Pull Request resolved: #521 we are streaming log directly back now Reviewed By: shayne-fletcher Differential Revision: D78199598 fbshipit-source-id: 6a993b7a5c2317e3ffd6c2d3a014a80ab6cdbe48
1 parent 62b2e32 commit 91f24b1

File tree

11 files changed

+17
-611
lines changed

11 files changed

+17
-611
lines changed

hyperactor_mesh/src/alloc.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use serde::Deserialize;
3636
use serde::Serialize;
3737

3838
use crate::alloc::test_utils::MockAllocWrapper;
39-
use crate::log_source::LogSource;
4039
use crate::proc_mesh::mesh_agent::MeshAgent;
4140

4241
/// Errors that occur during allocation operations.
@@ -220,12 +219,6 @@ pub trait Alloc {
220219
/// The channel transport used the procs in this alloc.
221220
fn transport(&self) -> ChannelTransport;
222221

223-
/// The log source for this alloc.
224-
/// It allows remote processes to stream stdout and stderr back to the client.
225-
/// A client can connect to the log source to obtain the streamed logs.
226-
/// A log source is allocation specific. Each allocator can decide how to stream the logs back.
227-
async fn log_source(&self) -> Result<LogSource, AllocatorError>;
228-
229222
/// Stop this alloc, shutting down all of its procs. A clean
230223
/// shutdown should result in Stop events from all allocs,
231224
/// followed by the end of the event stream.
@@ -360,10 +353,6 @@ pub mod test_utils {
360353
self.alloc.transport()
361354
}
362355

363-
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
364-
self.alloc.log_source().await
365-
}
366-
367356
async fn stop(&mut self) -> Result<(), AllocatorError> {
368357
self.alloc.stop().await
369358
}

hyperactor_mesh/src/alloc/local.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use crate::alloc::AllocSpec;
3333
use crate::alloc::Allocator;
3434
use crate::alloc::AllocatorError;
3535
use crate::alloc::ProcState;
36-
use crate::log_source::LogSource;
3736
use crate::proc_mesh::mesh_agent::MeshAgent;
3837
use crate::shortuuid::ShortUuid;
3938

@@ -259,14 +258,6 @@ impl Alloc for LocalAlloc {
259258
self.transport.clone()
260259
}
261260

262-
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
263-
// Local alloc does not need to stream logs back.
264-
// The client can subscribe to it but local actors will not stream logs into it.
265-
LogSource::new_with_local_actor()
266-
.await
267-
.map_err(AllocatorError::from)
268-
}
269-
270261
async fn stop(&mut self) -> Result<(), AllocatorError> {
271262
for rank in 0..self.size() {
272263
self.todo_tx

hyperactor_mesh/src/alloc/process.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ use crate::bootstrap;
5050
use crate::bootstrap::Allocator2Process;
5151
use crate::bootstrap::Process2Allocator;
5252
use crate::bootstrap::Process2AllocatorMessage;
53-
use crate::log_source::LogSource;
5453
use crate::shortuuid::ShortUuid;
5554

5655
/// The maximum number of log lines to tail keep for managed processes.
@@ -87,9 +86,6 @@ impl Allocator for ProcessAllocator {
8786
let (bootstrap_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix))
8887
.await
8988
.map_err(anyhow::Error::from)?;
90-
let log_source = LogSource::new_with_local_actor()
91-
.await
92-
.map_err(AllocatorError::from)?;
9389

9490
let name = ShortUuid::generate();
9591
let n = spec.shape.slice().len();
@@ -98,7 +94,6 @@ impl Allocator for ProcessAllocator {
9894
world_id: WorldId(name.to_string()),
9995
spec: spec.clone(),
10096
bootstrap_addr,
101-
log_source,
10297
rx,
10398
index: 0,
10499
active: HashMap::new(),
@@ -117,7 +112,6 @@ pub struct ProcessAlloc {
117112
world_id: WorldId, // to provide storage
118113
spec: AllocSpec,
119114
bootstrap_addr: ChannelAddr,
120-
log_source: LogSource,
121115
rx: channel::ChannelRx<Process2Allocator>,
122116
index: usize,
123117
active: HashMap<usize, Child>,
@@ -492,10 +486,6 @@ impl Alloc for ProcessAlloc {
492486
ChannelTransport::Unix
493487
}
494488

495-
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
496-
Ok(self.log_source.clone())
497-
}
498-
499489
async fn stop(&mut self) -> Result<(), AllocatorError> {
500490
// We rely on the teardown here, and that the process should
501491
// exit on its own. We should have a hard timeout here as well,

0 commit comments

Comments
 (0)