@@ -29,7 +29,6 @@ use hyperactor::channel::ChannelTx;
29
29
use hyperactor:: channel:: Rx ;
30
30
use hyperactor:: channel:: Tx ;
31
31
use hyperactor:: channel:: TxStatus ;
32
- use hyperactor:: id;
33
32
use hyperactor:: sync:: flag;
34
33
use hyperactor:: sync:: monitor;
35
34
use hyperactor_state:: state_actor:: StateActor ;
@@ -53,6 +52,8 @@ use crate::bootstrap;
53
52
use crate :: bootstrap:: Allocator2Process ;
54
53
use crate :: bootstrap:: Process2Allocator ;
55
54
use crate :: bootstrap:: Process2AllocatorMessage ;
55
+ use crate :: log_source:: LogSource ;
56
+ use crate :: log_source:: StateServerInfo ;
56
57
use crate :: shortuuid:: ShortUuid ;
57
58
58
59
/// The maximum number of log lines to tail keep for managed processes.
@@ -89,6 +90,9 @@ impl Allocator for ProcessAllocator {
89
90
let ( bootstrap_addr, rx) = channel:: serve ( ChannelAddr :: any ( ChannelTransport :: Unix ) )
90
91
. await
91
92
. map_err ( anyhow:: Error :: from) ?;
93
+ let log_source = LogSource :: new_with_local_actor ( )
94
+ . await
95
+ . map_err ( AllocatorError :: from) ?;
92
96
93
97
let name = ShortUuid :: generate ( ) ;
94
98
let n = spec. shape . slice ( ) . len ( ) ;
@@ -97,6 +101,7 @@ impl Allocator for ProcessAllocator {
97
101
world_id : WorldId ( name. to_string ( ) ) ,
98
102
spec : spec. clone ( ) ,
99
103
bootstrap_addr,
104
+ log_source,
100
105
rx,
101
106
index : 0 ,
102
107
active : HashMap :: new ( ) ,
@@ -115,6 +120,7 @@ pub struct ProcessAlloc {
115
120
world_id : WorldId , // to provide storage
116
121
spec : AllocSpec ,
117
122
bootstrap_addr : ChannelAddr ,
123
+ log_source : LogSource ,
118
124
rx : channel:: ChannelRx < Process2Allocator > ,
119
125
index : usize ,
120
126
active : HashMap < usize , Child > ,
@@ -145,13 +151,14 @@ struct Child {
145
151
impl Child {
146
152
fn monitored (
147
153
mut process : tokio:: process:: Child ,
154
+ state_server_info : StateServerInfo ,
148
155
) -> ( Self , impl Future < Output = ProcStopReason > ) {
149
156
let ( group, handle) = monitor:: group ( ) ;
150
157
let ( exit_flag, exit_guard) = flag:: guarded ( ) ;
151
158
let stop_reason = Arc :: new ( OnceLock :: new ( ) ) ;
152
159
153
160
// TODO(lky): enable state actor branch and remove this flag
154
- let use_state_actor = false ;
161
+ let use_state_actor = true ;
155
162
156
163
// Set up stdout and stderr writers
157
164
let mut stdout_tee: Box < dyn io:: AsyncWrite + Send + Unpin + ' static > =
@@ -161,24 +168,20 @@ impl Child {
161
168
162
169
// If state actor is enabled, try to set up LogWriter instances
163
170
if use_state_actor {
164
- let state_actor_ref = ActorRef :: < StateActor > :: attest ( id ! ( state_server[ 0 ] . state[ 0 ] ) ) ;
165
- // Parse the state actor address
166
- if let Ok ( state_actor_addr) = "tcp![::]:3000" . parse :: < ChannelAddr > ( ) {
167
- // Use the helper function to create both writers at once
168
- match hyperactor_state:: log_writer:: create_log_writers (
169
- state_actor_addr,
170
- state_actor_ref,
171
- ) {
172
- Ok ( ( stdout_writer, stderr_writer) ) => {
173
- stdout_tee = stdout_writer;
174
- stderr_tee = stderr_writer;
175
- }
176
- Err ( e) => {
177
- tracing:: error!( "failed to create log writers: {}" , e) ;
178
- }
171
+ let state_actor_ref = ActorRef :: < StateActor > :: attest ( state_server_info. state_actor_id ) ;
172
+ let state_actor_addr = state_server_info. state_proc_addr ;
173
+ // Use the helper function to create both writers at once
174
+ match hyperactor_state:: log_writer:: create_log_writers (
175
+ state_actor_addr,
176
+ state_actor_ref,
177
+ ) {
178
+ Ok ( ( stdout_writer, stderr_writer) ) => {
179
+ stdout_tee = stdout_writer;
180
+ stderr_tee = stderr_writer;
181
+ }
182
+ Err ( e) => {
183
+ tracing:: error!( "failed to create log writers: {}" , e) ;
179
184
}
180
- } else {
181
- tracing:: error!( "failed to parse state actor address" ) ;
182
185
}
183
186
}
184
187
@@ -394,7 +397,8 @@ impl ProcessAlloc {
394
397
None
395
398
}
396
399
Ok ( rank) => {
397
- let ( handle, monitor) = Child :: monitored ( process) ;
400
+ let ( handle, monitor) =
401
+ Child :: monitored ( process, self . log_source . server_info ( ) ) ;
398
402
self . children . spawn ( async move { ( index, monitor. await ) } ) ;
399
403
self . active . insert ( index, handle) ;
400
404
// Adjust for shape slice offset for non-zero shapes (sub-shapes).
@@ -498,6 +502,10 @@ impl Alloc for ProcessAlloc {
498
502
ChannelTransport :: Unix
499
503
}
500
504
505
+ async fn log_source ( & self ) -> Result < LogSource , AllocatorError > {
506
+ Ok ( self . log_source . clone ( ) )
507
+ }
508
+
501
509
async fn stop ( & mut self ) -> Result < ( ) , AllocatorError > {
502
510
// We rely on the teardown here, and that the process should
503
511
// exit on its own. We should have a hard timeout here as well,
0 commit comments