Skip to content

feat: use axum WS #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@
//! Routers can also be served over axum websockets. When both `axum` and
//! `pubsub` features are enabled, the `pubsub` module provides
//! [`pubsub::AxumWsCfg`] and the [`pubsub::ajj_websocket`] axum handler. This
//! handler will serve the router over websockets at a specific route.
//! handler will serve the router over websockets at a specific route. The
//! router is a property of the `AxumWsCfg` object, and is passed to the
//! handler via axum's `State` extractor.
//!
//! ```no_run
//! # #[cfg(all(feature = "axum", feature = "pubsub"))]
Expand All @@ -110,7 +112,10 @@
//! implementations of the `Connect` trait for [`std::net::SocketAddr`] to
//! create simple WS servers, and
//! [`interprocess::local_socket::ListenerOptions`] to create simple IPC
//! servers.
//! servers. We generally recommend using `axum` for WebSocket connections, as
//! it provides a more complete and robust implementation, however, users
//! needing additional control, or wanting to avoid the `axum` dependency
//! can use the `pubsub` module directly.
//!
//! ```no_run
//! # #[cfg(feature = "pubsub")]
Expand Down
83 changes: 80 additions & 3 deletions src/pubsub/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,34 @@ impl Listener for AxumListener {
///
/// The main points of configuration are:
/// - The runtime [`Handle`] on which to execute tasks, which can be set with
/// [`Self::with_handle`].
/// [`Self::with_handle`]. This defaults to the current thread's runtime
/// handle.
/// - The notification buffer size per client, which can be set with
/// [`Self::with_notification_buffer_per_client`]. See the [`crate::pubsub`]
/// module documentation for more details.
///
/// This struct is used as the [`State`] for the [`ajj_websocket`] handler, and
/// should be created from a fully-configured [`Router<()>`].
///
/// # Note
///
/// If [`AxumWsCfg`] is NOT used within a `tokio` runtime,
/// [`AxumWsCfg::with_handle`] MUST be called to set the runtime handle before
/// any requests are routed. Attempting to execute a task without an active
/// runtime will result in a panic.
///
/// # Example
///
/// ```no_run
/// # #[cfg(all(feature = "axum", feature = "pubsub"))]
/// # use ajj::{Router, pubsub::{ajj_websocket, AxumWsCfg}};
/// # {
/// # async fn _main(router: Router<()>, axum: axum::Router<AxumWsCfg>, handle: tokio::runtime::Handle) {
/// let cfg = AxumWsCfg::from(router)
/// .with_handle(handle)
/// .with_notification_buffer_per_client(10);
/// # }}
/// ```
#[derive(Clone)]
pub struct AxumWsCfg {
inner: Arc<ConnectionManager>,
Expand Down Expand Up @@ -113,7 +137,8 @@ impl AxumWsCfg {
}
}

/// Set the notification buffer size per client.
/// Set the notification buffer size per client. See the [`crate::pubsub`]
/// module documentation for more details.
pub fn with_notification_buffer_per_client(
self,
notification_buffer_per_client: usize,
Expand All @@ -127,7 +152,15 @@ impl AxumWsCfg {
}
}

/// Axum handler for WebSocket connections. Used to serve
/// Axum handler for WebSocket connections.
///
/// Used to serve [`crate::Router`]s over WebSocket connections via [`axum`]'s
/// built-in WebSocket support. This handler is used in conjunction with
/// [`AxumWsCfg`], which is passed as the [`State`] to the handler.
///
/// # Examples
///
/// Basic usage:
///
/// ```no_run
/// # #[cfg(all(feature = "axum", feature = "pubsub"))]
Expand All @@ -143,6 +176,50 @@ impl AxumWsCfg {
/// .with_state(cfg)
/// # }}
/// ```
///
/// The [`Router`] is a property of the [`AxumWsCfg`]. This means it is not
/// paramterized until the [`axum::Router::with_state`] method is called. This
/// has two significant consequences:
/// 1. You can easily register the same [`Router`] with multiple handlers.
/// 2. In order to register a second [`Router`] you need a second [`AxumWsCfg`].
///
/// Registering the same [`Router`] with multiple handlers:
///
/// ```no_run
/// # #[cfg(all(feature = "axum", feature = "pubsub"))]
/// # use ajj::{Router, pubsub::{ajj_websocket, AxumWsCfg}};
/// # {
/// # async fn _main(router: Router<()>, axum: axum::Router<AxumWsCfg>) -> axum::Router<()>{
/// // The config object contains the tokio runtime handle, and the
/// // notification buffer size.
/// let cfg = AxumWsCfg::new(router);
///
/// axum
/// .route("/ws", axum::routing::any(ajj_websocket))
/// .route("/super-secret-ws", axum::routing::any(ajj_websocket))
/// .with_state(cfg)
/// # }}
/// ```
///
/// Registering a second [`Router`] at a different path:
///
/// ```no_run
/// # #[cfg(all(feature = "axum", feature = "pubsub"))]
/// # use ajj::{Router, pubsub::{ajj_websocket, AxumWsCfg}};
/// # {
/// # async fn _main(router: Router<()>, other_router: Router<()>, axum: axum::Router<AxumWsCfg>) -> axum::Router<()>{
/// // The config object contains the tokio runtime handle, and the
/// // notification buffer size.
/// let cfg = AxumWsCfg::new(router);
/// let other_cfg = AxumWsCfg::new(other_router);
///
/// axum
/// .route("/really-cool-ws-1", axum::routing::any(ajj_websocket))
/// .with_state(cfg)
/// .route("/even-cooler-ws-2", axum::routing::any(ajj_websocket))
/// .with_state(other_cfg)
/// # }}
/// ```
pub async fn ajj_websocket(ws: WebSocketUpgrade, State(state): State<AxumWsCfg>) -> Response {
ws.on_upgrade(move |ws| {
let (sink, stream) = ws.split();
Expand Down
5 changes: 3 additions & 2 deletions src/pubsub/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ConnectionManager {
/// Create a new [`ConnectionManager`] with the given [`crate::Router`].
pub(crate) fn new(router: crate::Router<()>) -> Self {
Self {
root_tasks: Handle::current().into(),
root_tasks: Default::default(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caught me a little bug

next_id: AtomicU64::new(0).into(),
router,
notification_buffer_per_task: DEFAULT_NOTIFICATION_BUFFER_PER_CLIENT,
Expand All @@ -86,7 +86,8 @@ impl ConnectionManager {
self
}

/// Set the handle, overriding the root tasks.
/// Set the handle, overriding the root tasks. This should generally not be
/// used after tasks have been spawned.
pub(crate) fn with_handle(mut self, handle: Handle) -> Self {
self.root_tasks = handle.into();
self
Expand Down