Skip to content

Commit 73d21f0

Browse files
committed
Implementing surface API for requests and targets
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 620dd8a commit 73d21f0

21 files changed

+712
-228
lines changed

src/cancel.rs

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717

1818
use bevy::{
19-
prelude::{Entity, Component, Bundle, Resource, World},
19+
prelude::{Entity, Component, Bundle, World},
2020
ecs::world::EntityMut,
2121
};
2222

@@ -27,8 +27,8 @@ use smallvec::SmallVec;
2727
use std::sync::Arc;
2828

2929
use crate::{
30-
Disposal, DisposalFailure, Filtered, OperationError, ScopeStorage, OrBroken,
31-
OperationResult, SingleTargetStorage, OperationRoster, Supplanted,
30+
Disposal, Filtered, OperationError, ScopeStorage, OrBroken, CancelFailure,
31+
OperationResult, OperationRoster, Supplanted, UnhandledErrors,
3232
};
3333

3434
/// Information about the cancellation that occurred.
@@ -282,9 +282,10 @@ fn try_emit_cancel(
282282
// the scope
283283
let scope = scope.get();
284284
roster.cancel(Cancel { source, target: scope, session, cancellation });
285-
} else if let Some(target) = source_mut.get::<SingleTargetStorage>() {
286-
let target = target.get();
287-
roster.cancel(Cancel { source, target, session, cancellation });
285+
} else if let Some(session) = session {
286+
// The cancellation is not happening inside a scope, so we should tell
287+
// the session itself to cancel.
288+
roster.cancel(Cancel { source, target: session, session: Some(session), cancellation });
288289
} else {
289290
return Err(CancelFailure::new(
290291
OperationError::Broken(Some(Backtrace::new())),
@@ -300,32 +301,6 @@ fn try_emit_cancel(
300301
Ok(())
301302
}
302303

303-
pub struct CancelFailure {
304-
/// The error produced while the cancellation was happening
305-
pub error: OperationError,
306-
/// The cancellation that was being emitted
307-
pub cancel: Cancel,
308-
}
309-
310-
impl CancelFailure {
311-
fn new(
312-
error: OperationError,
313-
cancel: Cancel,
314-
) -> Self {
315-
Self { error, cancel }
316-
}
317-
}
318-
319-
// TODO(@mxgrey): Consider moving this into its own module since more than just
320-
// cancellation will use this resource.
321-
#[derive(Resource, Default)]
322-
pub struct UnhandledErrors {
323-
pub cancellations: Vec<CancelFailure>,
324-
pub operations: Vec<OperationError>,
325-
pub disposals: Vec<DisposalFailure>,
326-
pub stop_tasks: Vec<StopTaskFailure>,
327-
}
328-
329304
pub struct OperationCancel<'a> {
330305
pub cancel: Cancel,
331306
pub world: &'a mut World,
@@ -346,10 +321,3 @@ impl CancellableBundle {
346321
CancellableBundle { storage: Default::default(), cancel: OperationCancelStorage(cancel) }
347322
}
348323
}
349-
350-
pub struct StopTaskFailure {
351-
/// The task that was unable to be stopped
352-
pub task: Entity,
353-
/// The backtrace to indicate why it failed
354-
pub backtrace: Option<Backtrace>,
355-
}

src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl<'w, 's, 'a, Response: 'static + Send + Sync, Streams, L, C> Chain<'w, 's, '
180180
{
181181
let source = self.target;
182182
let target = self.commands.spawn(UnusedTarget).id();
183-
provider.provide(source, target, self.commands);
183+
provider.connect(source, target, self.commands);
184184
Chain::new(source, target, self.commands)
185185
}
186186

src/channel.rs

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,44 @@
1717

1818
use bevy::{
1919
prelude::{Entity, Resource, World},
20-
ecs::system::{Command, CommandQueue, Commands},
20+
ecs::system::{CommandQueue, Commands},
2121
};
2222

2323
use crossbeam::channel::{unbounded, Sender as CbSender, Receiver as CbReceiver};
2424

25-
use crate::{StreamPack, Provider, Promise, RequestExt, OperationRoster};
25+
use std::sync::Arc;
26+
27+
use crate::{
28+
Stream, StreamPack, StreamRequest, Provider, Promise, RequestExt,
29+
OperationRoster, OperationError,
30+
};
2631

2732
#[derive(Clone)]
28-
pub struct Channel<Streams = ()> {
29-
inner: InnerChannel,
33+
pub struct Channel<Streams: StreamPack = ()> {
34+
inner: Arc<InnerChannel>,
35+
streams: Streams::Channel,
3036
_ignore: std::marker::PhantomData<Streams>,
3137
}
3238

33-
impl<Streams> Channel<Streams> {
34-
pub fn push<C: Command>(&self, command: C) {
35-
let mut queue = CommandQueue::default();
36-
queue.push(command);
37-
self.push_batch(queue);
38-
}
39-
40-
pub fn push_batch(&self, mut queue: CommandQueue) {
41-
self.inner.sender.send(Box::new(
42-
move |world: &mut World, _: &mut OperationRoster| {
43-
queue.apply(world);
44-
}
45-
)).ok();
46-
}
47-
39+
impl<Streams: StreamPack> Channel<Streams> {
4840
pub fn query<P: Provider>(&self, request: P::Request, provider: P) -> Promise<P::Response>
4941
where
5042
P::Request: 'static + Send + Sync,
5143
P::Response: 'static + Send + Sync,
5244
P::Streams: 'static + StreamPack,
53-
P: 'static + Send,
45+
P: 'static + Send + Sync,
5446
{
55-
self.build(move |commands| {
56-
commands.request(request, provider).take()
47+
self.command(move |commands| {
48+
commands.request(request, provider).take().response
5749
}).flatten()
5850
}
5951

60-
pub fn build<F, U>(&self, f: F) -> Promise<U>
52+
pub fn command<F, U>(&self, f: F) -> Promise<U>
6153
where
6254
F: FnOnce(&mut Commands) -> U + 'static + Send,
6355
U: 'static + Send,
6456
{
65-
let (promise, sender) = Promise::new();
57+
let (sender, promise) = Promise::new();
6658
self.inner.sender.send(Box::new(
6759
move |world: &mut World, _: &mut OperationRoster| {
6860
let mut command_queue = CommandQueue::default();
@@ -80,16 +72,34 @@ impl<Streams> Channel<Streams> {
8072
#[derive(Clone)]
8173
pub(crate) struct InnerChannel {
8274
source: Entity,
75+
session: Entity,
8376
sender: CbSender<ChannelItem>,
8477
}
8578

8679
impl InnerChannel {
87-
pub(crate) fn new(source: Entity, sender: CbSender<ChannelItem>) -> Self {
88-
InnerChannel { source, sender }
80+
pub fn source(&self) -> Entity {
81+
self.source
82+
}
83+
84+
pub fn sender(&self) -> &CbSender<ChannelItem> {
85+
&self.sender
86+
}
87+
88+
pub(crate) fn new(
89+
source: Entity,
90+
session: Entity,
91+
sender: CbSender<ChannelItem>,
92+
) -> Self {
93+
InnerChannel { source, session, sender }
8994
}
9095

91-
pub(crate) fn into_specific<Streams>(self) -> Channel<Streams> {
92-
Channel { inner: self, _ignore: Default::default() }
96+
pub(crate) fn into_specific<Streams: StreamPack>(
97+
self,
98+
world: &World,
99+
) -> Result<Channel<Streams>, OperationError> {
100+
let inner = Arc::new(self);
101+
let streams = Streams::make_channel(&inner, world)?;
102+
Ok(Channel { inner, streams, _ignore: Default::default() })
93103
}
94104
}
95105

@@ -114,17 +124,28 @@ impl Default for ChannelQueue {
114124
}
115125
}
116126

117-
struct StreamCommand<T> {
118-
source: Entity,
119-
data: T,
127+
/// Use this channel to stream data using the [`StreamChannel::send`] method.
128+
pub struct StreamChannel<T> {
129+
target: Entity,
130+
inner: Arc<InnerChannel>,
131+
_ignore: std::marker::PhantomData<T>,
120132
}
121133

122-
impl<T: StreamPack> Command for StreamCommand<T> {
123-
fn apply(self, world: &mut World) {
124-
let Some(mut source_mut) = world.get_entity_mut(self.source) else {
125-
return;
126-
};
134+
impl<T: Stream> StreamChannel<T> {
135+
/// Send an instance of data out over a stream.
136+
pub fn send(&self, data: T) {
137+
let source = self.inner.source;
138+
let session = self.inner.session;
139+
let target = self.target;
140+
self.inner.sender.send(Box::new(
141+
move |world: &mut World, roster: &mut OperationRoster| {
142+
data.send(StreamRequest { source, session, target, world, roster });
143+
}
144+
));
145+
}
127146

147+
pub(crate) fn new(target: Entity, inner: Arc<InnerChannel>) -> Self {
148+
Self { target, inner, _ignore: Default::default() }
128149
}
129150
}
130151

@@ -162,7 +183,7 @@ mod tests {
162183
commands.request(
163184
RepeatRequest { service: hello, count: 5 },
164185
repeat,
165-
).take()
186+
).take().response
166187
});
167188
context.run_while_pending(&mut promise);
168189
assert!(promise.peek().is_available());

src/disposal.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use std::collections::HashMap;
2828

2929
use crate::{
3030
OperationRoster, operation::ScopeStorage, Cancellation, UnhandledErrors,
31+
DisposalFailure,
3132
};
3233

3334
#[derive(Debug, Clone)]
@@ -299,15 +300,3 @@ struct DisposalStorage {
299300
/// A map from a session to all the disposals that occurred for the session
300301
disposals: HashMap<Entity, Vec<Disposal>>,
301302
}
302-
303-
/// When it is impossible for some reason to perform a disposal, the incident
304-
/// will be logged in this resource. This may happen if a node somehow gets
305-
/// despawned while its service is attempting to dispose a request.
306-
pub struct DisposalFailure {
307-
/// The disposal that was attempted
308-
pub disposal: Disposal,
309-
/// The node which was attempting to report the disposal
310-
pub broken_node: Entity,
311-
/// The backtrace indicating what led up to the failure
312-
pub backtrace: Option<Backtrace>,
313-
}

src/errors.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (C) 2024 Open Source Robotics Foundation
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
use bevy::prelude::{Resource, Entity};
19+
20+
use backtrace::Backtrace;
21+
22+
use anyhow::Error as Anyhow;
23+
24+
use crate::{OperationError, Cancel, Disposal};
25+
26+
/// This resource stores errors that have occurred that could not be handled
27+
/// internally or communicated to the user by any other means.
28+
#[derive(Resource, Default)]
29+
pub struct UnhandledErrors {
30+
pub cancellations: Vec<CancelFailure>,
31+
pub operations: Vec<OperationError>,
32+
pub disposals: Vec<DisposalFailure>,
33+
pub stop_tasks: Vec<StopTaskFailure>,
34+
pub miscellaneous: Vec<MiscellaneousFailure>,
35+
}
36+
37+
38+
pub struct CancelFailure {
39+
/// The error produced while the cancellation was happening
40+
pub error: OperationError,
41+
/// The cancellation that was being emitted
42+
pub cancel: Cancel,
43+
}
44+
45+
impl CancelFailure {
46+
pub fn new(
47+
error: OperationError,
48+
cancel: Cancel,
49+
) -> Self {
50+
Self { error, cancel }
51+
}
52+
}
53+
54+
/// When it is impossible for some reason to perform a disposal, the incident
55+
/// will be logged in this resource. This may happen if a node somehow gets
56+
/// despawned while its service is attempting to dispose a request.
57+
pub struct DisposalFailure {
58+
/// The disposal that was attempted
59+
pub disposal: Disposal,
60+
/// The node which was attempting to report the disposal
61+
pub broken_node: Entity,
62+
/// The backtrace indicating what led up to the failure
63+
pub backtrace: Option<Backtrace>,
64+
}
65+
66+
pub struct StopTaskFailure {
67+
/// The task that was unable to be stopped
68+
pub task: Entity,
69+
/// The backtrace to indicate why it failed
70+
pub backtrace: Option<Backtrace>,
71+
}
72+
73+
/// Use this for any failures that are not covered by the other categories
74+
pub struct MiscellaneousFailure {
75+
pub error: Anyhow,
76+
pub backtrace: Option<Backtrace>,
77+
}

src/flush.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
*/
1717

1818
use bevy::{
19-
prelude::{Entity, World, Query, QueryState, Added, With},
19+
prelude::{
20+
Entity, World, Query, QueryState, Added, With, Resource, Deref, DerefMut,
21+
},
2022
ecs::system::SystemState,
2123
};
2224

@@ -116,3 +118,8 @@ pub fn flush_impulses(
116118
}
117119
}
118120
}
121+
122+
/// This resource is used to queue up operations in the roster in situations
123+
/// where the regular roster is not available.
124+
#[derive(Resource, Default, Deref, DerefMut)]
125+
pub(crate) struct DeferredRoster(pub OperationRoster);

0 commit comments

Comments
 (0)