@@ -149,9 +149,9 @@ impl Actor for ControllerActor {
149
149
} )
150
150
}
151
151
152
- async fn init ( & mut self , this : & hyperactor:: Instance < Self > ) -> Result < ( ) , anyhow:: Error > {
152
+ async fn init ( & mut self , cx : & hyperactor:: Instance < Self > ) -> Result < ( ) , anyhow:: Error > {
153
153
self . comm_actor_ref . send (
154
- this ,
154
+ cx ,
155
155
CommActorMode :: ImplicitWithWorldId ( self . worker_gang_ref . gang_id ( ) . world_id ( ) . clone ( ) ) ,
156
156
) ?;
157
157
Ok ( ( ) )
@@ -213,12 +213,12 @@ impl ControllerActor {
213
213
// M = self.worker_progress_check_interval
214
214
async fn request_status_if_needed (
215
215
& mut self ,
216
- this : & Context < ' _ , Self > ,
216
+ cx : & Context < ' _ , Self > ,
217
217
) -> Result < ( ) , anyhow:: Error > {
218
218
if let Some ( ( expected_seq, ..) ) = self . history . deadline (
219
219
self . operations_per_worker_progress_request ,
220
220
self . operation_timeout ,
221
- this . clock ( ) ,
221
+ cx . clock ( ) ,
222
222
) {
223
223
if self . last_controller_request_status . is_none_or (
224
224
|( last_requested_seq, last_requested_time) | {
@@ -232,7 +232,7 @@ impl ControllerActor {
232
232
) {
233
233
// Send to all workers.
234
234
self . send (
235
- this ,
235
+ cx ,
236
236
Ranks :: Slice (
237
237
ndslice:: Slice :: new ( 0 , vec ! [ self . history. world_size( ) ] , vec ! [ 1 ] ) . unwrap ( ) ,
238
238
) ,
@@ -245,7 +245,7 @@ impl ControllerActor {
245
245
. await ?;
246
246
247
247
self . last_controller_request_status =
248
- Some ( ( expected_seq. clone ( ) , this . clock ( ) . now ( ) ) ) ;
248
+ Some ( ( expected_seq. clone ( ) , cx . clock ( ) . now ( ) ) ) ;
249
249
}
250
250
}
251
251
@@ -260,18 +260,18 @@ struct CheckWorkerProgress;
260
260
impl Handler < CheckWorkerProgress > for ControllerActor {
261
261
async fn handle (
262
262
& mut self ,
263
- this : & Context < Self > ,
263
+ cx : & Context < Self > ,
264
264
_check_worker_progress : CheckWorkerProgress ,
265
265
) -> Result < ( ) , anyhow:: Error > {
266
266
let client = self . client ( ) ?;
267
267
268
268
if let Some ( ( expected_seq, deadline, reported) ) = self . history . deadline (
269
269
self . operations_per_worker_progress_request ,
270
270
self . operation_timeout ,
271
- this . clock ( ) ,
271
+ cx . clock ( ) ,
272
272
) {
273
273
if !reported
274
- && this . clock ( ) . now ( ) > deadline
274
+ && cx . clock ( ) . now ( ) > deadline
275
275
&& expected_seq >= self . history . min_incomplete_seq_reported ( )
276
276
{
277
277
let timed_out_ranks = self
@@ -297,7 +297,7 @@ impl Handler<CheckWorkerProgress> for ControllerActor {
297
297
self . operation_timeout. as_secs( )
298
298
) ;
299
299
if client
300
- . log ( this , LogLevel :: Warn , message. clone ( ) )
300
+ . log ( cx , LogLevel :: Warn , message. clone ( ) )
301
301
. await
302
302
. is_ok ( )
303
303
{
@@ -307,7 +307,7 @@ impl Handler<CheckWorkerProgress> for ControllerActor {
307
307
if self . fail_on_worker_timeout {
308
308
client
309
309
. result (
310
- this ,
310
+ cx ,
311
311
expected_seq,
312
312
Some ( Err ( Exception :: Failure ( DeviceFailure {
313
313
actor_id : self . worker_gang_ref . rank ( failed_rank) . actor_id ( ) . clone ( ) ,
@@ -318,10 +318,10 @@ impl Handler<CheckWorkerProgress> for ControllerActor {
318
318
. await ?;
319
319
}
320
320
}
321
- self . request_status_if_needed ( this ) . await ?;
321
+ self . request_status_if_needed ( cx ) . await ?;
322
322
}
323
323
324
- this . self_message_with_delay ( CheckWorkerProgress , self . worker_progress_check_interval ) ?;
324
+ cx . self_message_with_delay ( CheckWorkerProgress , self . worker_progress_check_interval ) ?;
325
325
Ok ( ( ) )
326
326
}
327
327
}
@@ -360,7 +360,7 @@ fn slice_to_selection(slice: Slice) -> Selection {
360
360
impl ControllerMessageHandler for ControllerActor {
361
361
async fn attach (
362
362
& mut self ,
363
- this : & Context < Self > ,
363
+ cx : & Context < Self > ,
364
364
client_actor : ActorRef < ClientActor > ,
365
365
) -> Result < ( ) , anyhow:: Error > {
366
366
tracing:: debug!( "attaching client actor {}" , client_actor) ;
@@ -369,34 +369,34 @@ impl ControllerMessageHandler for ControllerActor {
369
369
. map_err ( |actor_ref| anyhow:: anyhow!( "client actor {} already attached" , actor_ref) ) ?;
370
370
371
371
// Trigger periodical checking of supervision status and worker progress.
372
- this . self_message_with_delay (
372
+ cx . self_message_with_delay (
373
373
ControllerMessage :: CheckSupervision { } ,
374
374
self . supervision_query_interval ,
375
375
) ?;
376
- this . self_message_with_delay ( CheckWorkerProgress , self . worker_progress_check_interval ) ?;
376
+ cx . self_message_with_delay ( CheckWorkerProgress , self . worker_progress_check_interval ) ?;
377
377
Ok ( ( ) )
378
378
}
379
379
380
380
async fn node (
381
381
& mut self ,
382
- this : & Context < Self > ,
382
+ cx : & Context < Self > ,
383
383
seq : Seq ,
384
384
defs : Vec < Ref > ,
385
385
uses : Vec < Ref > ,
386
386
) -> Result < ( ) , anyhow:: Error > {
387
387
let failures = self . history . add_invocation ( seq, uses, defs) ;
388
388
let client = self . client ( ) ?;
389
389
for ( seq, failure) in failures {
390
- let _ = client. result ( this , seq, failure) . await ;
390
+ let _ = client. result ( cx , seq, failure) . await ;
391
391
}
392
- self . request_status_if_needed ( this ) . await ?;
392
+ self . request_status_if_needed ( cx ) . await ?;
393
393
394
394
Ok ( ( ) )
395
395
}
396
396
397
397
async fn drop_refs (
398
398
& mut self ,
399
- _this : & Context < Self > ,
399
+ _cx : & Context < Self > ,
400
400
refs : Vec < Ref > ,
401
401
) -> Result < ( ) , anyhow:: Error > {
402
402
self . history . delete_invocations_for_refs ( refs) ;
@@ -405,7 +405,7 @@ impl ControllerMessageHandler for ControllerActor {
405
405
406
406
async fn send (
407
407
& mut self ,
408
- this : & Context < Self > ,
408
+ cx : & Context < Self > ,
409
409
ranks : Ranks ,
410
410
message : Serialized ,
411
411
) -> Result < ( ) , anyhow:: Error > {
@@ -423,7 +423,7 @@ impl ControllerMessageHandler for ControllerActor {
423
423
} ) ,
424
424
} ;
425
425
let message = CastMessageEnvelope :: from_serialized (
426
- this . self_id ( ) . clone ( ) ,
426
+ cx . self_id ( ) . clone ( ) ,
427
427
DestinationPort :: new :: < WorkerActor , WorkerMessage > (
428
428
// This is awkward, but goes away entirely with meshes.
429
429
self . worker_gang_ref
@@ -442,7 +442,7 @@ impl ControllerMessageHandler for ControllerActor {
442
442
. reshape_with_limit ( Limit :: from ( CASTING_FANOUT_SIZE ) ) ;
443
443
444
444
self . comm_actor_ref . send (
445
- this ,
445
+ cx ,
446
446
CastMessage {
447
447
dest : Uslice {
448
448
// TODO: pass both slice and selection from client side
@@ -457,20 +457,20 @@ impl ControllerMessageHandler for ControllerActor {
457
457
458
458
async fn remote_function_failed (
459
459
& mut self ,
460
- this : & Context < Self > ,
460
+ cx : & Context < Self > ,
461
461
seq : Seq ,
462
462
error : WorkerError ,
463
463
) -> Result < ( ) , anyhow:: Error > {
464
464
let rank = error. worker_actor_id . rank ( ) ;
465
465
self . history
466
466
. propagate_exception ( seq, Exception :: Error ( seq, seq, error. clone ( ) ) ) ;
467
- mark_worker_complete_and_propagate_exceptions ( self , this , rank, & seq) . await ?;
467
+ mark_worker_complete_and_propagate_exceptions ( self , cx , rank, & seq) . await ?;
468
468
Ok ( ( ) )
469
469
}
470
470
471
471
async fn status (
472
472
& mut self ,
473
- _this : & Context < Self > ,
473
+ cx : & Context < Self > ,
474
474
seq : Seq ,
475
475
worker_actor_id : ActorId ,
476
476
controller : bool ,
@@ -480,26 +480,26 @@ impl ControllerMessageHandler for ControllerActor {
480
480
if controller {
481
481
self . history . update_deadline_tracking ( rank, seq) ;
482
482
} else {
483
- mark_worker_complete_and_propagate_exceptions ( self , _this , rank, & seq) . await ?;
483
+ mark_worker_complete_and_propagate_exceptions ( self , cx , rank, & seq) . await ?;
484
484
}
485
485
Ok ( ( ) )
486
486
}
487
487
488
488
async fn fetch_result (
489
489
& mut self ,
490
- _this : & Context < Self > ,
490
+ _cx : & Context < Self > ,
491
491
seq : Seq ,
492
492
result : Result < Serialized , WorkerError > ,
493
493
) -> Result < ( ) , anyhow:: Error > {
494
494
self . history . set_result ( seq, result) ;
495
495
Ok ( ( ) )
496
496
}
497
497
498
- async fn check_supervision ( & mut self , this : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
498
+ async fn check_supervision ( & mut self , cx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
499
499
let gang_id: GangId = self . worker_gang_ref . clone ( ) . into ( ) ;
500
500
let world_state = self
501
501
. system_supervision_actor_ref
502
- . state ( this , gang_id. world_id ( ) . clone ( ) )
502
+ . state ( cx , gang_id. world_id ( ) . clone ( ) )
503
503
. await ?;
504
504
505
505
if let Some ( world_state) = world_state {
@@ -545,7 +545,7 @@ impl ControllerMessageHandler for ControllerActor {
545
545
tracing:: error!( "Sending failure to client: {exc:?}" ) ;
546
546
// Seq does not matter as the client will raise device error immediately before setting the results.
547
547
self . client ( ) ?
548
- . result ( this , Seq :: default ( ) , Some ( Err ( exc) ) )
548
+ . result ( cx , Seq :: default ( ) , Some ( Err ( exc) ) )
549
549
. await ?;
550
550
tracing:: error!( "Failure successfully sent to client" ) ;
551
551
@@ -554,7 +554,7 @@ impl ControllerMessageHandler for ControllerActor {
554
554
}
555
555
556
556
// Schedule the next supervision check.
557
- this . self_message_with_delay (
557
+ cx . self_message_with_delay (
558
558
ControllerMessage :: CheckSupervision { } ,
559
559
self . supervision_query_interval ,
560
560
) ?;
@@ -563,27 +563,27 @@ impl ControllerMessageHandler for ControllerActor {
563
563
564
564
async fn debugger_message (
565
565
& mut self ,
566
- this : & Context < Self > ,
566
+ cx : & Context < Self > ,
567
567
debugger_actor_id : ActorId ,
568
568
action : DebuggerAction ,
569
569
) -> Result < ( ) , anyhow:: Error > {
570
570
self . client ( ) ?
571
- . debugger_message ( this , debugger_actor_id, action)
571
+ . debugger_message ( cx , debugger_actor_id, action)
572
572
. await
573
573
}
574
574
575
575
#[ cfg( test) ]
576
576
async fn get_first_incomplete_seqs_unit_tests_only (
577
577
& mut self ,
578
- _this : & Context < Self > ,
578
+ _cx : & Context < Self > ,
579
579
) -> Result < Vec < Seq > , anyhow:: Error > {
580
580
Ok ( self . history . first_incomplete_seqs ( ) . to_vec ( ) )
581
581
}
582
582
583
583
#[ cfg( not( test) ) ]
584
584
async fn get_first_incomplete_seqs_unit_tests_only (
585
585
& mut self ,
586
- _this : & Context < Self > ,
586
+ _cx : & Context < Self > ,
587
587
) -> Result < Vec < Seq > , anyhow:: Error > {
588
588
unimplemented ! ( "get_first_incomplete_seqs_unit_tests_only is only for unit tests" )
589
589
}
@@ -1798,7 +1798,7 @@ mod tests {
1798
1798
impl PanickingMessageHandler for PanickingActor {
1799
1799
async fn panic (
1800
1800
& mut self ,
1801
- _this : & Context < Self > ,
1801
+ _cx : & Context < Self > ,
1802
1802
err_msg : String ,
1803
1803
) -> Result < ( ) , anyhow:: Error > {
1804
1804
panic ! ( "{}" , err_msg) ;
0 commit comments