9
9
use std:: collections:: BTreeMap ;
10
10
use std:: collections:: HashMap ;
11
11
use std:: collections:: HashSet ;
12
+ use std:: collections:: VecDeque ;
12
13
use std:: error:: Error ;
13
14
use std:: fmt:: Debug ;
14
15
use std:: fmt:: Formatter ;
@@ -21,6 +22,7 @@ use std::sync::atomic::AtomicUsize;
21
22
use async_trait:: async_trait;
22
23
use hyperactor:: Actor ;
23
24
use hyperactor:: ActorHandle ;
25
+ use hyperactor:: ActorId ;
24
26
use hyperactor:: ActorRef ;
25
27
use hyperactor:: Context ;
26
28
use hyperactor:: HandleClient ;
@@ -30,7 +32,6 @@ use hyperactor::PortRef;
30
32
use hyperactor:: cap:: CanSend ;
31
33
use hyperactor:: mailbox:: MailboxSenderError ;
32
34
use hyperactor_mesh:: Mesh ;
33
- use hyperactor_mesh:: ProcMesh ;
34
35
use hyperactor_mesh:: actor_mesh:: RootActorMesh ;
35
36
use hyperactor_mesh:: shared_cell:: SharedCell ;
36
37
use hyperactor_mesh:: shared_cell:: SharedCellRef ;
@@ -44,6 +45,9 @@ use monarch_messages::controller::ControllerActor;
44
45
use monarch_messages:: controller:: ControllerMessage ;
45
46
use monarch_messages:: controller:: Seq ;
46
47
use monarch_messages:: controller:: WorkerError ;
48
+ use monarch_messages:: debugger:: DebuggerAction ;
49
+ use monarch_messages:: debugger:: DebuggerActor ;
50
+ use monarch_messages:: debugger:: DebuggerMessage ;
47
51
use monarch_messages:: worker:: Ref ;
48
52
use monarch_messages:: worker:: WorkerMessage ;
49
53
use monarch_messages:: worker:: WorkerParams ;
@@ -611,12 +615,84 @@ struct MeshControllerActor {
611
615
workers : Option < SharedCell < RootActorMesh < ' static , WorkerActor > > > ,
612
616
history : History ,
613
617
id : usize ,
618
+ debugger_active : Option < ActorRef < DebuggerActor > > ,
619
+ debugger_paused : VecDeque < ActorRef < DebuggerActor > > ,
614
620
}
615
621
616
622
impl MeshControllerActor {
617
623
fn workers ( & self ) -> SharedCellRef < RootActorMesh < ' static , WorkerActor > > {
618
624
self . workers . as_ref ( ) . unwrap ( ) . borrow ( ) . unwrap ( )
619
625
}
626
+ fn handle_debug (
627
+ & mut self ,
628
+ this : & Context < Self > ,
629
+ debugger_actor_id : ActorId ,
630
+ action : DebuggerAction ,
631
+ ) -> anyhow:: Result < ( ) > {
632
+ if matches ! ( action, DebuggerAction :: Paused ( ) ) {
633
+ self . debugger_paused
634
+ . push_back ( ActorRef :: attest ( debugger_actor_id) ) ;
635
+ } else {
636
+ let debugger_actor = self
637
+ . debugger_active
638
+ . as_ref ( )
639
+ . ok_or_else ( || anyhow:: anyhow!( "no active debugger" ) ) ?;
640
+ if debugger_actor_id != * debugger_actor. actor_id ( ) {
641
+ anyhow:: bail!( "debugger action for wrong actor" ) ;
642
+ }
643
+ match action {
644
+ DebuggerAction :: Detach ( ) => {
645
+ self . debugger_active = None ;
646
+ }
647
+ DebuggerAction :: Read { requested_size } => {
648
+ Python :: with_gil ( |py| {
649
+ let read = py
650
+ . import ( "monarch.controller.debugger" )
651
+ . unwrap ( )
652
+ . getattr ( "read" )
653
+ . unwrap ( ) ;
654
+ let bytes: Vec < u8 > =
655
+ read. call1 ( ( requested_size, ) ) . unwrap ( ) . extract ( ) . unwrap ( ) ;
656
+
657
+ debugger_actor. send (
658
+ this,
659
+ DebuggerMessage :: Action {
660
+ action : DebuggerAction :: Write { bytes } ,
661
+ } ,
662
+ )
663
+ } ) ?;
664
+ }
665
+ DebuggerAction :: Write { bytes } => {
666
+ Python :: with_gil ( |py| -> Result < ( ) , anyhow:: Error > {
667
+ let write = py
668
+ . import ( "monarch.controller.debugger" )
669
+ . unwrap ( )
670
+ . getattr ( "write" )
671
+ . unwrap ( ) ;
672
+ write. call1 ( ( String :: from_utf8 ( bytes) ?, ) ) . unwrap ( ) ;
673
+ Ok ( ( ) )
674
+ } ) ?;
675
+ }
676
+ _ => {
677
+ anyhow:: bail!( "unexpected action: {:?}" , action) ;
678
+ }
679
+ }
680
+ }
681
+ if self . debugger_active . is_none ( ) {
682
+ self . debugger_active = self . debugger_paused . pop_front ( ) . and_then ( |pdb_actor| {
683
+ pdb_actor
684
+ . send (
685
+ this,
686
+ DebuggerMessage :: Action {
687
+ action : DebuggerAction :: Attach ( ) ,
688
+ } ,
689
+ )
690
+ . map ( |_| pdb_actor)
691
+ . ok ( )
692
+ } ) ;
693
+ }
694
+ Ok ( ( ) )
695
+ }
620
696
}
621
697
622
698
impl Debug for MeshControllerActor {
@@ -642,6 +718,8 @@ impl Actor for MeshControllerActor {
642
718
workers : None ,
643
719
history : History :: new ( world_size) ,
644
720
id,
721
+ debugger_active : None ,
722
+ debugger_paused : VecDeque :: new ( ) ,
645
723
} )
646
724
}
647
725
async fn init ( & mut self , this : & Instance < Self > ) -> Result < ( ) , anyhow:: Error > {
@@ -681,8 +759,7 @@ impl Handler<ControllerMessage> for MeshControllerActor {
681
759
debugger_actor_id,
682
760
action,
683
761
} => {
684
- let dm = crate :: client:: DebuggerMessage :: new ( debugger_actor_id. into ( ) , action) ?;
685
- panic ! ( "NYI: debugger message handling" ) ;
762
+ self . handle_debug ( this, debugger_actor_id, action) ?;
686
763
}
687
764
ControllerMessage :: Status {
688
765
seq,
0 commit comments