-
-
Notifications
You must be signed in to change notification settings - Fork 4k
Add BRP watch_id
and bevy/unwatch
#16407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
332b630
e9889d9
3d8f74f
b85f6a5
5de8916
35cebdd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,7 +10,7 @@ use bevy_ecs::{ | |
query::QueryBuilder, | ||
reflect::{AppTypeRegistry, ReflectComponent}, | ||
removal_detection::RemovedComponentEntity, | ||
system::{In, Local}, | ||
system::{In, Local, Res}, | ||
world::{EntityRef, EntityWorldMut, FilteredEntityRef, World}, | ||
}; | ||
use bevy_hierarchy::BuildChildren as _; | ||
|
@@ -22,7 +22,10 @@ use bevy_utils::HashMap; | |
use serde::{de::DeserializeSeed as _, Deserialize, Serialize}; | ||
use serde_json::{Map, Value}; | ||
|
||
use crate::{error_codes, BrpError, BrpResult}; | ||
use crate::{ | ||
error_codes, BrpError, BrpResult, RemoteWatchingRequestId, RemoteWatchingRequests, | ||
RemoteWatchingSystemParams, | ||
}; | ||
|
||
/// The method path for a `bevy/get` request. | ||
pub const BRP_GET_METHOD: &str = "bevy/get"; | ||
|
@@ -54,6 +57,9 @@ pub const BRP_GET_AND_WATCH_METHOD: &str = "bevy/get+watch"; | |
/// The method path for a `bevy/list+watch` request. | ||
pub const BRP_LIST_AND_WATCH_METHOD: &str = "bevy/list+watch"; | ||
|
||
/// The method path for a `bevy/unwatch` request. | ||
pub const BRP_UNWATCH_METHOD: &str = "bevy/unwatch"; | ||
|
||
/// `bevy/get`: Retrieves one or more components from the entity with the given | ||
/// ID. | ||
/// | ||
|
@@ -187,6 +193,16 @@ pub struct BrpListParams { | |
pub entity: Entity, | ||
} | ||
|
||
/// `bevy/unwatch`: Cancels the watcher with the supplied `watch_id`. | ||
/// If no `watch_id` is supplied, all running watchers will be canceled. | ||
/// | ||
/// The server responds with a null. | ||
#[derive(Debug, Serialize, Deserialize, Clone)] | ||
pub struct BrpUnwatchParams { | ||
/// The ID of the watcher to unwatch. | ||
pub watch_id: Option<RemoteWatchingRequestId>, | ||
} | ||
|
||
/// Describes the data that is to be fetched in a query. | ||
#[derive(Debug, Serialize, Deserialize, Clone, Default)] | ||
pub struct BrpQuery { | ||
|
@@ -348,7 +364,7 @@ pub fn process_remote_get_request(In(params): In<Option<Value>>, world: &World) | |
|
||
/// Handles a `bevy/get+watch` request coming from a client. | ||
pub fn process_remote_get_watching_request( | ||
In(params): In<Option<Value>>, | ||
In((_, params)): In<RemoteWatchingSystemParams>, | ||
world: &World, | ||
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>, | ||
) -> BrpResult<Option<Value>> { | ||
|
@@ -749,7 +765,7 @@ pub fn process_remote_list_request(In(params): In<Option<Value>>, world: &World) | |
|
||
/// Handles a `bevy/list` request (list all components) coming from a client. | ||
pub fn process_remote_list_watching_request( | ||
In(params): In<Option<Value>>, | ||
In((_, params)): In<RemoteWatchingSystemParams>, | ||
world: &World, | ||
mut removal_cursors: Local<HashMap<ComponentId, EventCursor<RemovedComponentEntity>>>, | ||
) -> BrpResult<Option<Value>> { | ||
|
@@ -793,6 +809,28 @@ pub fn process_remote_list_watching_request( | |
} | ||
} | ||
|
||
/// Handles a `bevy/unwatch` request (unwatch a running `+watch`-suffixed request) coming from a client. | ||
pub fn process_unwatch_request( | ||
In(params): In<Option<Value>>, | ||
watching_requests: Res<RemoteWatchingRequests>, | ||
) -> BrpResult { | ||
let params_watch_id = match params.map(parse::<BrpUnwatchParams>) { | ||
Some(Ok(params)) => params.watch_id, | ||
Some(Err(e)) => { | ||
return Err(e); | ||
} | ||
_ => None, | ||
}; | ||
|
||
for (watch_id, message, _) in watching_requests.0.iter() { | ||
if params_watch_id.is_none() || params_watch_id.unwrap() == *watch_id { | ||
message.sender.close(); | ||
} | ||
} | ||
|
||
Ok(Value::Null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to return an error if a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! That makes sense. |
||
} | ||
|
||
/// Immutably retrieves an entity from the [`World`], returning an error if the | ||
/// entity isn't present. | ||
fn get_entity(world: &World, entity: Entity) -> Result<EntityRef<'_>, BrpError> { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -256,6 +256,17 @@ | |
//! - `removed`: An array of fully-qualified type names of components removed from the entity | ||
//! in the last tick. | ||
//! | ||
//! ### bevy/unwatch | ||
//! | ||
//! Cancels the watcher (`+watch`-suffixed request) with the supplied `watch_id`. | ||
//! | ||
//! When `params` is not provided, all running watchers will be canceled. | ||
//! | ||
//! `params`: | ||
//! - `watch_id`: The ID of request to unwatch. | ||
//! | ||
//! `result`: null. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to return an error if a watch_id is provided that doesn't exist |
||
//! | ||
//! | ||
//! ## Custom methods | ||
//! | ||
|
@@ -306,7 +317,7 @@ use bevy_derive::{Deref, DerefMut}; | |
use bevy_ecs::{ | ||
entity::Entity, | ||
schedule::{IntoSystemConfigs, ScheduleLabel}, | ||
system::{Commands, In, IntoSystem, ResMut, Resource, System, SystemId}, | ||
system::{Commands, In, IntoSystem, Local, ResMut, Resource, System, SystemId}, | ||
world::World, | ||
}; | ||
use bevy_utils::{prelude::default, HashMap}; | ||
|
@@ -359,7 +370,7 @@ impl RemotePlugin { | |
pub fn with_watching_method<M>( | ||
mut self, | ||
name: impl Into<String>, | ||
handler: impl IntoSystem<In<Option<Value>>, BrpResult<Option<Value>>, M>, | ||
handler: impl IntoSystem<In<RemoteWatchingSystemParams>, BrpResult<Option<Value>>, M>, | ||
) -> Self { | ||
self.methods.get_mut().unwrap().push(( | ||
name.into(), | ||
|
@@ -412,6 +423,10 @@ impl Default for RemotePlugin { | |
builtin_methods::BRP_LIST_AND_WATCH_METHOD, | ||
builtin_methods::process_remote_list_watching_request, | ||
) | ||
.with_method( | ||
builtin_methods::BRP_UNWATCH_METHOD, | ||
builtin_methods::process_unwatch_request, | ||
) | ||
} | ||
} | ||
|
||
|
@@ -464,7 +479,7 @@ pub enum RemoteMethodHandler { | |
/// A handler that only runs once and returns one response. | ||
Instant(Box<dyn System<In = In<Option<Value>>, Out = BrpResult>>), | ||
/// A handler that watches for changes and response when a change is detected. | ||
Watching(Box<dyn System<In = In<Option<Value>>, Out = BrpResult<Option<Value>>>>), | ||
Watching(Box<dyn System<In = In<RemoteWatchingSystemParams>, Out = BrpResult<Option<Value>>>>), | ||
} | ||
|
||
/// The [`SystemId`] of a function that implements a remote instant method (`bevy/get`, `bevy/query`, etc.) | ||
|
@@ -478,13 +493,22 @@ pub type RemoteInstantMethodSystemId = SystemId<In<Option<Value>>, BrpResult>; | |
|
||
/// The [`SystemId`] of a function that implements a remote watching method (`bevy/get+watch`, `bevy/list+watch`, etc.) | ||
/// | ||
/// The first parameter is the JSON value of the `params`. Typically, an | ||
/// implementation will deserialize these as the first thing they do. | ||
/// The first parameter is the [`RemoteWatchingSystemParams`]. | ||
/// | ||
/// The optional returned JSON value will be sent as a response. If no | ||
/// changes were detected this should be [`None`]. Re-running of this | ||
/// handler is done in the [`RemotePlugin`]. | ||
pub type RemoteWatchingMethodSystemId = SystemId<In<Option<Value>>, BrpResult<Option<Value>>>; | ||
pub type RemoteWatchingMethodSystemId = | ||
SystemId<In<RemoteWatchingSystemParams>, BrpResult<Option<Value>>>; | ||
|
||
/// The parameters passed as [`In`] params to a remote watching handler system. | ||
/// | ||
/// Tuple of: | ||
/// - The [`RemoteWatchingRequestId`] of this watching request. | ||
/// It can be used to distinguish different running watches of the same method. | ||
/// - The JSON value of the `params`. Typically, an | ||
/// implementation will deserialize these as the first thing they do. | ||
pub type RemoteWatchingSystemParams = (RemoteWatchingRequestId, Option<Value>); | ||
|
||
/// The [`SystemId`] of a function that can be used as a remote method. | ||
#[derive(Debug, Clone, Copy)] | ||
|
@@ -524,9 +548,38 @@ impl RemoteMethods { | |
} | ||
} | ||
|
||
/// Holds the [`BrpMessage`]'s of all ongoing watching requests along with their handlers. | ||
/// A server owned ID identifying a `+watch`-suffixed request. | ||
/// | ||
/// Sent in the initial response of all watching requests. | ||
/// | ||
/// Not to be confused with the `id` field in JSON-RPC 2.0, which is a client owned ID that might be unset or contain duplicates. | ||
pub type RemoteWatchingRequestId = u32; | ||
|
||
/// Holds the [`BrpMessage`]'s of all ongoing watching requests along with their IDs and handlers. | ||
#[derive(Debug, Resource, Default)] | ||
pub struct RemoteWatchingRequests(Vec<(BrpMessage, RemoteWatchingMethodSystemId)>); | ||
pub struct RemoteWatchingRequests( | ||
Vec<( | ||
villor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
RemoteWatchingRequestId, | ||
BrpMessage, | ||
RemoteWatchingMethodSystemId, | ||
)>, | ||
); | ||
|
||
/// A result for starting a watching request. | ||
/// | ||
/// Sent as the first message in the watch stream. | ||
#[derive(Debug, Serialize, Deserialize, Clone)] | ||
#[serde(rename_all = "snake_case")] | ||
pub struct BrpWatchResult { | ||
/// The server owned ID identifying the request. | ||
watch_id: RemoteWatchingRequestId, | ||
} | ||
|
||
impl BrpWatchResult { | ||
fn new(watch_id: RemoteWatchingRequestId) -> Self { | ||
BrpWatchResult { watch_id } | ||
} | ||
} | ||
|
||
/// A single request from a Bevy Remote Protocol client to the server, | ||
/// serialized in JSON. | ||
|
@@ -766,7 +819,7 @@ fn setup_mailbox_channel(mut commands: Commands) { | |
/// | ||
/// This needs exclusive access to the [`World`] because clients can manipulate | ||
/// anything in the ECS. | ||
fn process_remote_requests(world: &mut World) { | ||
fn process_remote_requests(world: &mut World, mut watch_id_counter: Local<u32>) { | ||
if !world.contains_resource::<BrpReceiver>() { | ||
return; | ||
} | ||
|
@@ -783,9 +836,9 @@ fn process_remote_requests(world: &mut World) { | |
return; | ||
}; | ||
|
||
match handler { | ||
let result = match handler { | ||
RemoteMethodSystemId::Instant(id) => { | ||
let result = match world.run_system_with_input(id, message.params) { | ||
match world.run_system_with_input(id, message.params) { | ||
Ok(result) => result, | ||
Err(error) => { | ||
let _ = message.sender.force_send(Err(BrpError { | ||
|
@@ -795,26 +848,33 @@ fn process_remote_requests(world: &mut World) { | |
})); | ||
continue; | ||
} | ||
}; | ||
|
||
let _ = message.sender.force_send(result); | ||
} | ||
} | ||
RemoteMethodSystemId::Watching(id) => { | ||
world | ||
.resource_mut::<RemoteWatchingRequests>() | ||
.0 | ||
.push((message, id)); | ||
*watch_id_counter += 1; | ||
let watch_id = *watch_id_counter; | ||
|
||
world.resource_mut::<RemoteWatchingRequests>().0.push(( | ||
watch_id, | ||
message.clone(), | ||
id, | ||
)); | ||
|
||
Ok(serde_json::to_value(BrpWatchResult::new(watch_id)).unwrap()) | ||
} | ||
} | ||
}; | ||
|
||
let _ = message.sender.force_send(result); | ||
} | ||
} | ||
|
||
/// A system that checks all ongoing watching requests for changes that should be sent | ||
/// and handles it if so. | ||
fn process_ongoing_watching_requests(world: &mut World) { | ||
world.resource_scope::<RemoteWatchingRequests, ()>(|world, requests| { | ||
for (message, system_id) in requests.0.iter() { | ||
let handler_result = process_single_ongoing_watching_request(world, message, system_id); | ||
for (watch_id, message, system_id) in requests.0.iter() { | ||
let handler_result = | ||
process_single_ongoing_watching_request(world, *watch_id, message, system_id); | ||
let sender_result = match handler_result { | ||
Ok(Some(value)) => message.sender.try_send(Ok(value)), | ||
Err(err) => message.sender.try_send(Err(err)), | ||
|
@@ -831,11 +891,12 @@ fn process_ongoing_watching_requests(world: &mut World) { | |
|
||
fn process_single_ongoing_watching_request( | ||
world: &mut World, | ||
watch_id: RemoteWatchingRequestId, | ||
message: &BrpMessage, | ||
system_id: &RemoteWatchingMethodSystemId, | ||
) -> BrpResult<Option<Value>> { | ||
world | ||
.run_system_with_input(*system_id, message.params.clone()) | ||
.run_system_with_input(*system_id, (watch_id, message.params.clone())) | ||
.map_err(|error| BrpError { | ||
code: error_codes::INTERNAL_ERROR, | ||
message: format!("Failed to run method handler: {error}"), | ||
|
@@ -845,7 +906,7 @@ fn process_single_ongoing_watching_request( | |
|
||
fn remove_closed_watching_requests(mut requests: ResMut<RemoteWatchingRequests>) { | ||
for i in (0..requests.0.len()).rev() { | ||
let Some((message, _)) = requests.0.get(i) else { | ||
let Some((_, message, _)) = requests.0.get(i) else { | ||
unreachable!() | ||
}; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we providing the
watch_id
if they are not used anywhere in the watch systems?In case they might be used in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to give watch handlers a way to identify which watcher it is handling, giving them a unique key for use with any needed state/cache that needs to be handled on watcher level, rather than method level.
As you’ve noticed it is not used by any of the built in methods, as they are not stateful. But I have encountered the need for watcher state while building some custom methods.
I’m not completely certain this is the best solution though. There are other ways. For example each watcher could have their own SystemState, allowing use of Local instead of managing state using ids and hash maps. Another benefit is cleanup would be automatic.