Skip to content

Add BRP watch_id and bevy/unwatch #16407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions crates/bevy_remote/src/builtin_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bevy_ecs::{
query::QueryBuilder,
reflect::{AppTypeRegistry, ReflectComponent},
removal_detection::RemovedComponentEntity,
system::{In, Local},
system::{In, Local, Res},
world::{EntityRef, EntityWorldMut, FilteredEntityRef, World},
};
use bevy_hierarchy::BuildChildren as _;
Expand All @@ -22,7 +22,10 @@ use bevy_utils::HashMap;
use serde::{de::DeserializeSeed as _, Deserialize, Serialize};
use serde_json::{Map, Value};

use crate::{error_codes, BrpError, BrpResult};
use crate::{
error_codes, BrpError, BrpResult, RemoteWatchingRequestId, RemoteWatchingRequests,
RemoteWatchingSystemParams,
};

/// The method path for a `bevy/get` request.
pub const BRP_GET_METHOD: &str = "bevy/get";
Expand Down Expand Up @@ -54,6 +57,9 @@ pub const BRP_GET_AND_WATCH_METHOD: &str = "bevy/get+watch";
/// The method path for a `bevy/list+watch` request.
pub const BRP_LIST_AND_WATCH_METHOD: &str = "bevy/list+watch";

/// The method path for a `bevy/unwatch` request.
pub const BRP_UNWATCH_METHOD: &str = "bevy/unwatch";

/// `bevy/get`: Retrieves one or more components from the entity with the given
/// ID.
///
Expand Down Expand Up @@ -187,6 +193,16 @@ pub struct BrpListParams {
pub entity: Entity,
}

/// `bevy/unwatch`: Cancels the watcher with the supplied `watch_id`.
/// If no `watch_id` is supplied, all running watchers will be canceled.
///
/// The server responds with a null.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BrpUnwatchParams {
/// The ID of the watcher to unwatch.
pub watch_id: Option<RemoteWatchingRequestId>,
}

/// Describes the data that is to be fetched in a query.
#[derive(Debug, Serialize, Deserialize, Clone, Default)]
pub struct BrpQuery {
Expand Down Expand Up @@ -348,7 +364,7 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World)

/// Handles a `bevy/get+watch` request coming from a client.
pub fn process_remote_get_watching_request(
In(params): In<Option<Value>>,
In((_, params)): In<RemoteWatchingSystemParams>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we providing the watch_id if they are not used anywhere in the watch systems?
In case they might be used in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to give watch handlers a way to identify which watcher it is handling, giving them a unique key for use with any needed state/cache that needs to be handled on watcher level, rather than method level.

As you’ve noticed it is not used by any of the built in methods, as they are not stateful. But I have encountered the need for watcher state while building some custom methods.

I’m not completely certain this is the best solution though. There are other ways. For example each watcher could have their own SystemState, allowing use of Local instead of managing state using ids and hash maps. Another benefit is cleanup would be automatic.

world: &World,
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>,
) -> BrpResult<Option<Value>> {
Expand Down Expand Up @@ -749,7 +765,7 @@ pub fn process_remote_list_request(In(params): In<Option<Value>>, world: &World)

/// Handles a `bevy/list` request (list all components) coming from a client.
pub fn process_remote_list_watching_request(
In(params): In<Option<Value>>,
In((_, params)): In<RemoteWatchingSystemParams>,
world: &World,
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>,
) -> BrpResult<Option<Value>> {
Expand Down Expand Up @@ -793,6 +809,28 @@ pub fn process_remote_list_watching_request(
}
}

/// Handles a `bevy/unwatch` request (unwatch a running `+watch`-suffixed request) coming from a client.
pub fn process_unwatch_request(
In(params): In<Option<Value>>,
watching_requests: Res<RemoteWatchingRequests>,
) -> BrpResult {
let params_watch_id = match params.map(parse::<BrpUnwatchParams>) {
Some(Ok(params)) => params.watch_id,
Some(Err(e)) => {
return Err(e);
}
_ => None,
};

for (watch_id, message, _) in watching_requests.0.iter() {
if params_watch_id.is_none() || params_watch_id.unwrap() == *watch_id {
message.sender.close();
}
}

Ok(Value::Null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to return an error if a watch_id is provided that doesn't exist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That makes sense.

}

/// Immutably retrieves an entity from the [`World`], returning an error if the
/// entity isn't present.
fn get_entity(world: &World, entity: Entity) -> Result<EntityRef<'_>, BrpError> {
Expand Down
107 changes: 84 additions & 23 deletions crates/bevy_remote/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,17 @@
//! - `removed`: An array of fully-qualified type names of components removed from the entity
//! in the last tick.
//!
//! ### bevy/unwatch
//!
//! Cancels the watcher (`+watch`-suffixed request) with the supplied `watch_id`.
//!
//! When `params` is not provided, all running watchers will be canceled.
//!
//! `params`:
//! - `watch_id`: The ID of request to unwatch.
//!
//! `result`: null.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to return an error if a watch_id is provided that doesn't exist

//!
//!
//! ## Custom methods
//!
Expand Down Expand Up @@ -306,7 +317,7 @@ use bevy_derive::{Deref, DerefMut};
use bevy_ecs::{
entity::Entity,
schedule::{IntoSystemConfigs, ScheduleLabel},
system::{Commands, In, IntoSystem, ResMut, Resource, System, SystemId},
system::{Commands, In, IntoSystem, Local, ResMut, Resource, System, SystemId},
world::World,
};
use bevy_utils::{prelude::default, HashMap};
Expand Down Expand Up @@ -359,7 +370,7 @@ impl RemotePlugin {
pub fn with_watching_method<M>(
mut self,
name: impl Into<String>,
handler: impl IntoSystem<In<Option<Value>>, BrpResult<Option<Value>>, M>,
handler: impl IntoSystem<In<RemoteWatchingSystemParams>, BrpResult<Option<Value>>, M>,
) -> Self {
self.methods.get_mut().unwrap().push((
name.into(),
Expand Down Expand Up @@ -412,6 +423,10 @@ impl Default for RemotePlugin {
builtin_methods::BRP_LIST_AND_WATCH_METHOD,
builtin_methods::process_remote_list_watching_request,
)
.with_method(
builtin_methods::BRP_UNWATCH_METHOD,
builtin_methods::process_unwatch_request,
)
}
}

Expand Down Expand Up @@ -464,7 +479,7 @@ pub enum RemoteMethodHandler {
/// A handler that only runs once and returns one response.
Instant(Box<dyn System<In = In<Option<Value>>, Out = BrpResult>>),
/// A handler that watches for changes and response when a change is detected.
Watching(Box<dyn System<In = In<Option<Value>>, Out = BrpResult<Option<Value>>>>),
Watching(Box<dyn System<In = In<RemoteWatchingSystemParams>, Out = BrpResult<Option<Value>>>>),
}

/// The [`SystemId`] of a function that implements a remote instant method (`bevy/get`, `bevy/query`, etc.)
Expand All @@ -478,13 +493,22 @@ pub type RemoteInstantMethodSystemId = SystemId<In<Option<Value>>, BrpResult>;

/// The [`SystemId`] of a function that implements a remote watching method (`bevy/get+watch`, `bevy/list+watch`, etc.)
///
/// The first parameter is the JSON value of the `params`. Typically, an
/// implementation will deserialize these as the first thing they do.
/// The first parameter is the [`RemoteWatchingSystemParams`].
///
/// The optional returned JSON value will be sent as a response. If no
/// changes were detected this should be [`None`]. Re-running of this
/// handler is done in the [`RemotePlugin`].
pub type RemoteWatchingMethodSystemId = SystemId<In<Option<Value>>, BrpResult<Option<Value>>>;
pub type RemoteWatchingMethodSystemId =
SystemId<In<RemoteWatchingSystemParams>, BrpResult<Option<Value>>>;

/// The parameters passed as [`In`] params to a remote watching handler system.
///
/// Tuple of:
/// - The [`RemoteWatchingRequestId`] of this watching request.
/// It can be used to distinguish different running watches of the same method.
/// - The JSON value of the `params`. Typically, an
/// implementation will deserialize these as the first thing they do.
pub type RemoteWatchingSystemParams = (RemoteWatchingRequestId, Option<Value>);

/// The [`SystemId`] of a function that can be used as a remote method.
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -524,9 +548,38 @@ impl RemoteMethods {
}
}

/// Holds the [`BrpMessage`]'s of all ongoing watching requests along with their handlers.
/// A server owned ID identifying a `+watch`-suffixed request.
///
/// Sent in the initial response of all watching requests.
///
/// Not to be confused with the `id` field in JSON-RPC 2.0, which is a client owned ID that might be unset or contain duplicates.
pub type RemoteWatchingRequestId = u32;

/// Holds the [`BrpMessage`]'s of all ongoing watching requests along with their IDs and handlers.
#[derive(Debug, Resource, Default)]
pub struct RemoteWatchingRequests(Vec<(BrpMessage, RemoteWatchingMethodSystemId)>);
pub struct RemoteWatchingRequests(
Vec<(
RemoteWatchingRequestId,
BrpMessage,
RemoteWatchingMethodSystemId,
)>,
);

/// A result for starting a watching request.
///
/// Sent as the first message in the watch stream.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct BrpWatchResult {
/// The server owned ID identifying the request.
watch_id: RemoteWatchingRequestId,
}

impl BrpWatchResult {
fn new(watch_id: RemoteWatchingRequestId) -> Self {
BrpWatchResult { watch_id }
}
}

/// A single request from a Bevy Remote Protocol client to the server,
/// serialized in JSON.
Expand Down Expand Up @@ -766,7 +819,7 @@ fn setup_mailbox_channel(mut commands: Commands) {
///
/// This needs exclusive access to the [`World`] because clients can manipulate
/// anything in the ECS.
fn process_remote_requests(world: &mut World) {
fn process_remote_requests(world: &mut World, mut watch_id_counter: Local<u32>) {
if !world.contains_resource::<BrpReceiver>() {
return;
}
Expand All @@ -783,9 +836,9 @@ fn process_remote_requests(world: &mut World) {
return;
};

match handler {
let result = match handler {
RemoteMethodSystemId::Instant(id) => {
let result = match world.run_system_with_input(id, message.params) {
match world.run_system_with_input(id, message.params) {
Ok(result) => result,
Err(error) => {
let _ = message.sender.force_send(Err(BrpError {
Expand All @@ -795,26 +848,33 @@ fn process_remote_requests(world: &mut World) {
}));
continue;
}
};

let _ = message.sender.force_send(result);
}
}
RemoteMethodSystemId::Watching(id) => {
world
.resource_mut::<RemoteWatchingRequests>()
.0
.push((message, id));
*watch_id_counter += 1;
let watch_id = *watch_id_counter;

world.resource_mut::<RemoteWatchingRequests>().0.push((
watch_id,
message.clone(),
id,
));

Ok(serde_json::to_value(BrpWatchResult::new(watch_id)).unwrap())
}
}
};

let _ = message.sender.force_send(result);
}
}

/// A system that checks all ongoing watching requests for changes that should be sent
/// and handles it if so.
fn process_ongoing_watching_requests(world: &mut World) {
world.resource_scope::<RemoteWatchingRequests, ()>(|world, requests| {
for (message, system_id) in requests.0.iter() {
let handler_result = process_single_ongoing_watching_request(world, message, system_id);
for (watch_id, message, system_id) in requests.0.iter() {
let handler_result =
process_single_ongoing_watching_request(world, *watch_id, message, system_id);
let sender_result = match handler_result {
Ok(Some(value)) => message.sender.try_send(Ok(value)),
Err(err) => message.sender.try_send(Err(err)),
Expand All @@ -831,11 +891,12 @@ fn process_ongoing_watching_requests(world: &mut World) {

fn process_single_ongoing_watching_request(
world: &mut World,
watch_id: RemoteWatchingRequestId,
message: &BrpMessage,
system_id: &RemoteWatchingMethodSystemId,
) -> BrpResult<Option<Value>> {
world
.run_system_with_input(*system_id, message.params.clone())
.run_system_with_input(*system_id, (watch_id, message.params.clone()))
.map_err(|error| BrpError {
code: error_codes::INTERNAL_ERROR,
message: format!("Failed to run method handler: {error}"),
Expand All @@ -845,7 +906,7 @@ fn process_single_ongoing_watching_request(

fn remove_closed_watching_requests(mut requests: ResMut<RemoteWatchingRequests>) {
for i in (0..requests.0.len()).rev() {
let Some((message, _)) = requests.0.get(i) else {
let Some((_, message, _)) = requests.0.get(i) else {
unreachable!()
};

Expand Down