| 
1 | 1 | from __future__ import annotations  | 
 | 2 | + | 
 | 3 | +import asyncio  | 
2 | 4 | import os  | 
3 | 5 | import time  | 
4 | 6 | from asyncio import get_event_loop_policy  | 
 | 
30 | 32 | from .secrets.secrets_rest_api import SecretsRestAPI  | 
31 | 33 | 
 
  | 
32 | 34 | if TYPE_CHECKING:  | 
33 |  | -    from typing import Any, Optional  | 
 | 35 | +    from typing import Any  | 
34 | 36 |     from asyncio import AbstractEventLoop  | 
35 | 37 | 
 
  | 
36 | 38 | from jupyter_collaboration import (  # type:ignore[import-untyped]  # isort:skip  | 
@@ -216,14 +218,15 @@ def initialize(self, argv: Any = None) -> None:  | 
216 | 218 |         self.ychats_by_room: dict[str, YChat] = {}  | 
217 | 219 |         """Cache of YChat instances, indexed by room ID."""  | 
218 | 220 | 
 
  | 
219 |  | -        if self.serverapp is not None:  | 
220 |  | -            self.event_logger = self.serverapp.web_app.settings["event_logger"]  | 
221 |  | -            self.event_logger.add_listener(  | 
222 |  | -                schema_id=JUPYTER_COLLABORATION_EVENTS_URI, listener=self.connect_chat  | 
223 |  | -            )  | 
 | 221 | +        # Original direct event listener (commented out for router integration)  | 
 | 222 | +        # if self.serverapp is not None:  | 
 | 223 | +        #     self.event_logger = self.serverapp.web_app.settings["event_logger"]  | 
 | 224 | +        #     self.event_logger.add_listener(  | 
 | 225 | +        #         schema_id=JUPYTER_COLLABORATION_EVENTS_URI, listener=self.connect_chat  | 
 | 226 | +        #     )  | 
224 | 227 | 
 
  | 
225 | 228 |     @property  | 
226 |  | -    def event_loop(self) -> "AbstractEventLoop":  | 
 | 229 | +    def event_loop(self) -> AbstractEventLoop:  | 
227 | 230 |         """  | 
228 | 231 |         Returns a reference to the asyncio event loop.  | 
229 | 232 |         """  | 
@@ -347,6 +350,8 @@ def initialize_settings(self):  | 
347 | 350 |         # When a message's interrupt event is set, the response is halted.  | 
348 | 351 |         self.settings["jai_message_interrupted"] = {}  | 
349 | 352 | 
 
  | 
 | 353 | +        self.event_loop.create_task(self._setup_router_integration())  | 
 | 354 | + | 
350 | 355 |         # Log server extension startup time  | 
351 | 356 |         self.log.info(f"Registered {self.name} server extension")  | 
352 | 357 |         startup_time = round((time.time() - start) * 1000)  | 
@@ -377,14 +382,14 @@ async def _stop_extension(self):  | 
377 | 382 | 
 
  | 
378 | 383 |     def _init_persona_manager(  | 
379 | 384 |         self, room_id: str, ychat: YChat  | 
380 |  | -    ) -> Optional[PersonaManager]:  | 
 | 385 | +    ) -> PersonaManager | None:  | 
381 | 386 |         """  | 
382 | 387 |         Initializes a `PersonaManager` instance scoped to a `YChat`.  | 
383 | 388 | 
  | 
384 | 389 |         This method should not raise an exception. Upon encountering an  | 
385 | 390 |         exception, this method will catch it, log it, and return `None`.  | 
386 | 391 |         """  | 
387 |  | -        persona_manager: Optional[PersonaManager] = None  | 
 | 392 | +        persona_manager: PersonaManager | None = None  | 
388 | 393 | 
 
  | 
389 | 394 |         try:  | 
390 | 395 |             config_manager = self.settings.get("jai_config_manager", None)  | 
@@ -428,6 +433,56 @@ def _init_persona_manager(  | 
428 | 433 |         finally:  | 
429 | 434 |             return persona_manager  | 
430 | 435 | 
 
  | 
 | 436 | +    async def _setup_router_integration(self) -> None:  | 
 | 437 | +        """  | 
 | 438 | +        Set up integration with jupyter-ai-router if available.  | 
 | 439 | +        This allows persona manager to work through the centralized MessageRouter.  | 
 | 440 | +        """  | 
 | 441 | +        self.log.info("Waiting for the router to be ready")  | 
 | 442 | + | 
 | 443 | +        # Wait until the router field is available  | 
 | 444 | +        while True:  | 
 | 445 | +            router = self.serverapp.web_app.settings.get("jupyter-ai", {}).get("router")  | 
 | 446 | +            if router is not None:  | 
 | 447 | +                self.log.info("Router is ready, continuing with the integration")  | 
 | 448 | +                break  | 
 | 449 | +            await asyncio.sleep(0.1)  # Check every 100ms  | 
 | 450 | + | 
 | 451 | +        try:  | 
 | 452 | +            self.log.info("Found jupyter-ai-router, registering callbacks")  | 
 | 453 | + | 
 | 454 | +            # Register callback for new chat initialization  | 
 | 455 | +            router.observe_chat_init(self._on_router_chat_init)  | 
 | 456 | + | 
 | 457 | +            # Store reference to router for later use  | 
 | 458 | +            self.router = router  | 
 | 459 | + | 
 | 460 | +        except Exception as e:  | 
 | 461 | +            self.log.error(f"Error setting up router integration: {e}")  | 
 | 462 | + | 
 | 463 | +    def _on_router_chat_init(self, room_id: str, ychat: YChat) -> None:  | 
 | 464 | +        """  | 
 | 465 | +        Callback for when router detects a new chat initialization.  | 
 | 466 | +        This replaces the direct event listener approach.  | 
 | 467 | +        """  | 
 | 468 | +        self.log.info(f"Router detected new chat room: {room_id}")  | 
 | 469 | + | 
 | 470 | +        # Cache the YChat instance  | 
 | 471 | +        self.ychats_by_room[room_id] = ychat  | 
 | 472 | + | 
 | 473 | +        # Initialize persona manager for this chat  | 
 | 474 | +        persona_manager = self._init_persona_manager(room_id, ychat)  | 
 | 475 | +        if not persona_manager:  | 
 | 476 | +            self.log.error(  | 
 | 477 | +                "Jupyter AI was unable to initialize its AI personas. They are not available for use in chat until this error is resolved. "  | 
 | 478 | +                + "Please verify your configuration and open a new issue on GitHub if this error persists."  | 
 | 479 | +            )  | 
 | 480 | +            return  | 
 | 481 | + | 
 | 482 | +        # Register persona manager callbacks with router  | 
 | 483 | +        self.router.observe_chat_msg(room_id, persona_manager.on_chat_message)  | 
 | 484 | +        self.router.observe_slash_cmd_msg(room_id, persona_manager.on_slash_cmd_message)  | 
 | 485 | + | 
431 | 486 |     def _link_jupyter_server_extension(self, server_app: ServerApp):  | 
432 | 487 |         """Setup custom config needed by this extension."""  | 
433 | 488 |         c = Config()  | 
 | 
0 commit comments