Skip to content

Commit 9cc26ae

Browse files
committed
Writing basic tests for impulses
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent bbec9a3 commit 9cc26ae

File tree

9 files changed

+107
-50
lines changed

9 files changed

+107
-50
lines changed

src/callback.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::{
1919
BlockingCallback, AsyncCallback, Channel, InnerChannel, ChannelQueue,
20-
OperationRoster, StreamPack, Input, Provider, ProvideOnce, UnhandledErrors,
20+
OperationRoster, StreamPack, Input, Provider, ProvideOnce,
2121
AddOperation, OperateCallback, ManageInput, OperationError, SetupFailure,
2222
OrBroken, OperateTask, Operation, OperationSetup,
2323
};
@@ -108,14 +108,8 @@ impl<'a> CallbackRequest<'a> {
108108
let sender = self.world.get_resource_or_insert_with(|| ChannelQueue::new()).sender.clone();
109109
let task = AsyncComputeTaskPool::get().spawn(task);
110110
let task_id = self.world.spawn(()).id();
111-
let operation = OperateTask::new(task_id, session, self.source, self.target, task, None, sender);
112-
let setup = OperationSetup { source: task_id, world: self.world };
113-
if let Err(error) = operation.setup(setup) {
114-
self.world.get_resource_or_insert_with(|| UnhandledErrors::default())
115-
.setup
116-
.push(SetupFailure { broken_node: self.source, error });
117-
}
118-
self.roster.queue(task_id);
111+
OperateTask::new(task_id, session, self.source, self.target, task, None, sender)
112+
.add(self.world, self.roster);
119113
Ok(())
120114
}
121115

src/errors.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,20 @@ pub struct UnhandledErrors {
4040
pub miscellaneous: Vec<MiscellaneousFailure>,
4141
}
4242

43+
impl UnhandledErrors {
44+
pub fn is_empty(&self) -> bool {
45+
self.setup.is_empty()
46+
&& self.cancellations.is_empty()
47+
&& self.operations.is_empty()
48+
&& self.disposals.is_empty()
49+
&& self.stop_tasks.is_empty()
50+
&& self.broken.is_empty()
51+
&& self.unused_targets.is_empty()
52+
&& self.connections.is_empty()
53+
&& self.miscellaneous.is_empty()
54+
}
55+
}
56+
4357
#[derive(Clone, Debug)]
4458
pub struct SetupFailure {
4559
pub broken_node: Entity,

src/impulse.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ where
8989
pub fn take(self) -> Recipient<Response, Streams> {
9090
let (response_sender, response_promise) = Promise::<Response>::new();
9191
self.commands.add(AddImpulse::new(
92-
self.target,
92+
dbg!(self.target),
9393
TakenResponse::<Response>::new(response_sender),
9494
));
9595
let mut map = StreamTargetMap::default();
@@ -107,7 +107,7 @@ where
107107
pub fn take_response(self) -> Promise<Response> {
108108
let (response_sender, response_promise) = Promise::<Response>::new();
109109
self.commands.add(AddImpulse::new(
110-
self.target,
110+
dbg!(self.target),
111111
TakenResponse::<Response>::new(response_sender),
112112
));
113113
response_promise
@@ -323,3 +323,43 @@ impl<T> Default for Collection<T> {
323323
Self { items: Default::default() }
324324
}
325325
}
326+
327+
#[cfg(test)]
328+
mod tests {
329+
use crate::{*, testing::*};
330+
use std::time::Duration;
331+
332+
#[test]
333+
fn test_provide() {
334+
let mut context = TestingContext::minimal_plugins();
335+
336+
let mut promise = context.build(|commands| {
337+
commands.provide("hello".to_owned()).take_response()
338+
});
339+
assert!(promise.peek().available().is_some_and(|v| v == "hello"));
340+
}
341+
342+
#[test]
343+
fn test_async_map() {
344+
let mut context = TestingContext::minimal_plugins();
345+
346+
let mut promise = context.build(|commands| {
347+
commands
348+
.request(
349+
WaitRequest {
350+
duration: Duration::from_secs_f64(0.001),
351+
value: "hello".to_owned(),
352+
},
353+
wait.into_async_map(),
354+
)
355+
.take_response()
356+
});
357+
358+
context.run_with_conditions(
359+
&mut promise,
360+
FlushConditions::new()
361+
.with_timeout(Duration::from_secs_f64(5.0)),
362+
);
363+
assert!(promise.peek().available().is_some_and(|v| v == "hello"));
364+
}
365+
}

src/impulse/map.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::{
2626
Impulsive, OperationSetup, OperationRequest, SingleTargetStorage, StreamPack,
2727
InputBundle, OperationResult, OrBroken, Input, ManageInput,
2828
ChannelQueue, BlockingMap, AsyncMap, InnerChannel, OperateTask, ActiveTasksStorage,
29-
CallBlockingMapOnce, CallAsyncMapOnce, Operation,
29+
CallBlockingMapOnce, CallAsyncMapOnce,
3030
};
3131

3232
/// The key difference between this and [`crate::OperateBlockingMap`] is that
@@ -176,8 +176,7 @@ where
176176

177177
let task_source = world.spawn(()).id();
178178
OperateTask::new(task_source, session, source, target, task, None, sender)
179-
.setup(OperationSetup { source: task_source, world })?;
180-
roster.queue(task_source);
179+
.add(world, roster);
181180
Ok(())
182181
}
183182
}

src/operation/operate_map.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
*
1616
*/
1717

18+
use bevy::{
19+
prelude::{Component, Entity, Bundle},
20+
tasks::AsyncComputeTaskPool,
21+
};
22+
23+
use std::future::Future;
24+
1825
use crate::{
1926
BlockingMap, AsyncMap, Operation, ChannelQueue, InnerChannel,
2027
SingleTargetStorage, StreamPack, Input, ManageInput, OperationCleanup,
@@ -23,13 +30,6 @@ use crate::{
2330
OperationReachability, ReachabilityResult, InputBundle,
2431
};
2532

26-
use bevy::{
27-
prelude::{Component, Entity, Bundle},
28-
tasks::AsyncComputeTaskPool,
29-
};
30-
31-
use std::future::Future;
32-
3333
#[derive(Bundle)]
3434
pub(crate) struct OperateBlockingMap<F, Request, Response, Streams>
3535
where
@@ -189,9 +189,9 @@ where
189189
.f = Some(f);
190190

191191
let task_source = world.spawn(()).id();
192-
OperateTask::new(task_source, session, source, target, task, None, sender)
193-
.setup(OperationSetup { source: task_source, world })?;
194-
roster.queue(task_source);
192+
OperateTask::new(
193+
dbg!(task_source), session, source, dbg!(target), task, None, sender
194+
).add(world, roster);
195195
Ok(())
196196
}
197197

src/operation/operate_task.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use bevy::{
1919
prelude::{Component, Entity, World, Resource, BuildWorldChildren},
2020
tasks::{Task as BevyTask, AsyncComputeTaskPool},
21+
ecs::system::Command,
2122
};
2223

2324
use std::{
@@ -36,7 +37,7 @@ use smallvec::SmallVec;
3637

3738
use crate::{
3839
OperationRoster, Blocker, ManageInput, ChannelQueue, UnhandledErrors,
39-
OperationSetup, OperationRequest, OperationResult, Operation,
40+
OperationSetup, OperationRequest, OperationResult, Operation, AddOperation,
4041
OrBroken, OperationCleanup, ChannelItem, OperationError, Broken,
4142
OperationReachability, ReachabilityResult, emit_disposal, Disposal,
4243
};
@@ -92,6 +93,12 @@ impl<Response: 'static + Send + Sync> OperateTask<Response> {
9293
) -> Self {
9394
Self { source, session, node, target, task: Some(task), blocker, sender, disposal: None }
9495
}
96+
97+
pub(crate) fn add(self, world: &mut World, roster: &mut OperationRoster) {
98+
let source = self.source;
99+
AddOperation::new(source, self).apply(world);
100+
roster.queue(source);
101+
}
95102
}
96103

97104
impl<Response: 'static + Send + Sync> Drop for OperateTask<Response> {

src/request.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use bevy::prelude::{Commands, BuildChildren};
2020
use std::future::Future;
2121

2222
use crate::{
23-
UnusedTarget, InputCommand, StreamPack, ProvideOnce, IntoAsyncMap, Impulse,
24-
Detached,
23+
UnusedTarget, StreamPack, ProvideOnce, IntoBlockingMapOnce, IntoAsyncMap,
24+
Impulse, Detached, InputCommand,
2525
};
2626

2727
/// Extensions for creating impulse chains by making a request to a provider or
@@ -97,9 +97,9 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> {
9797
.set_parent(target)
9898
.id();
9999

100-
provider.connect(source, target, self);
100+
provider.connect(source, dbg!(target), self);
101101

102-
self.add(InputCommand { session: source, target: source, data: request });
102+
self.add(InputCommand { session: dbg!(source), target: dbg!(source), data: request });
103103

104104
Impulse {
105105
source,
@@ -113,22 +113,7 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> {
113113
&'a mut self,
114114
value: T,
115115
) -> Impulse<'w, 's, 'a, T, ()> {
116-
let target = self.spawn((
117-
Detached::default(),
118-
UnusedTarget,
119-
)).id();
120-
121-
self.add(InputCommand { session: target, target, data: value });
122-
123-
Impulse {
124-
target,
125-
// The source field won't actually matter for an impulse produced by
126-
// this provide method, so we'll just use the session value as a
127-
// placeholder
128-
source: target,
129-
commands: self,
130-
_ignore: Default::default(),
131-
}
116+
self.request(value, provide_value.into_blocking_map_once())
132117
}
133118

134119
fn serve<'a, T: 'static + Send + Sync + Future>(
@@ -142,6 +127,10 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> {
142127
}
143128
}
144129

130+
fn provide_value<T>(value: T) -> T {
131+
value
132+
}
133+
145134
async fn async_server<T: Future>(value: T) -> T::Output {
146135
value.await
147136
}

src/service/async_srv.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use crate::{
1919
AsyncService, InAsyncService, IntoService, ServiceTrait, ServiceBundle, ServiceRequest,
2020
InnerChannel, ChannelQueue, OperationRoster, Blocker,
2121
StreamPack, ServiceBuilder, ChooseAsyncServiceDelivery, OperationRequest,
22-
OperationError, OrBroken, ManageInput, Input, OperateTask, Operation,
23-
OperationSetup, SingleTargetStorage, dispose_for_despawned_service,
22+
OperationError, OrBroken, ManageInput, Input, OperateTask,
23+
SingleTargetStorage, dispose_for_despawned_service,
2424
service::builder::{SerialChosen, ParallelChosen}, Disposal, emit_disposal,
2525
StopTask, UnhandledErrors, StopTaskFailure, Delivery, DeliveryInstructions,
2626
Deliver, DeliveryOrder, DeliveryUpdate, insert_new_order, pop_next_delivery,
@@ -238,8 +238,7 @@ where
238238
let task = AsyncComputeTaskPool::get().spawn(job);
239239

240240
OperateTask::new(task_id, session, source, target, task, blocker, sender)
241-
.setup(OperationSetup { source: task_id, world })?;
242-
roster.queue(task_id);
241+
.add(world, roster);
243242
Ok(())
244243
}
245244

src/testing.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ pub use bevy::{
2424
ecs::system::CommandQueue,
2525
};
2626

27-
use crate::{Promise, Service, InAsyncService, InBlockingService, flush_impulses};
27+
use crate::{
28+
Promise, Service, InAsyncService, InBlockingService, UnhandledErrors,
29+
flush_impulses,
30+
};
2831

2932
pub struct TestingContext {
3033
pub app: App,
@@ -109,6 +112,18 @@ impl TestingContext {
109112

110113
return true;
111114
}
115+
116+
pub fn no_unhandled_errors(&self) -> bool {
117+
let Some(errors) = self.app.world.get_resource::<UnhandledErrors>() else {
118+
return true;
119+
};
120+
121+
errors.is_empty()
122+
}
123+
124+
pub fn get_unhandled_errors(&self) -> Option<&UnhandledErrors> {
125+
self.app.world.get_resource::<UnhandledErrors>()
126+
}
112127
}
113128

114129
#[derive(Default, Clone)]

0 commit comments

Comments
 (0)