Skip to content

Commit f652c0c

Browse files
committed
Fix existing tests
Signed-off-by: Michael X. Grey <mxgrey@intrinsic.ai>
1 parent 9cc26ae commit f652c0c

File tree

7 files changed

+116
-25
lines changed

7 files changed

+116
-25
lines changed

src/callback.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use crate::{
1919
BlockingCallback, AsyncCallback, Channel, InnerChannel, ChannelQueue,
2020
OperationRoster, StreamPack, Input, Provider, ProvideOnce,
21-
AddOperation, OperateCallback, ManageInput, OperationError, SetupFailure,
22-
OrBroken, OperateTask, Operation, OperationSetup,
21+
AddOperation, OperateCallback, ManageInput, OperationError,
22+
OrBroken, OperateTask,
2323
};
2424

2525
use bevy::{

src/channel.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ impl<T: Stream> StreamChannel<T> {
160160
mod tests {
161161
use crate::{*, testing::*};
162162
use bevy::ecs::system::EntityCommands;
163+
use std::time::Duration;
163164

164165
#[test]
165166
fn test_channel_request() {
@@ -192,8 +193,14 @@ mod tests {
192193
repeat,
193194
).take().response
194195
});
195-
context.run_while_pending(&mut promise);
196+
197+
context.run_with_conditions(
198+
&mut promise,
199+
FlushConditions::new().with_timeout(Duration::from_secs(5)),
200+
);
201+
196202
assert!(promise.peek().is_available());
203+
assert!(context.no_unhandled_errors());
197204
}
198205

199206
let count = context.app.world.get::<RunCount>(hello.provider()).unwrap().0;

src/flush.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,14 @@ fn collect_from_channels(
131131
// TODO(@mxgrey): Make sure this works for services which are spawned by
132132
// providers that are being flushed.
133133
for (e, mut hook) in new_service_query.iter_mut(world) {
134-
hook.lifecycle = Some(ServiceLifecycle::new(e, lifecycles.sender.clone()));
134+
if hook.lifecycle.is_none() {
135+
// Check if the lifecycle is none, because collect_from_channels
136+
// can be run multiple times per flush, in which case we will
137+
// iterate over the query again, and end up dropping the lifecycle
138+
// managers that we just created. When that happens, the service
139+
// gets treated as despawned prematurely.
140+
hook.lifecycle = Some(ServiceLifecycle::new(e, lifecycles.sender.clone()));
141+
}
135142
}
136143
});
137144

src/impulse.rs

Lines changed: 94 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ where
8989
pub fn take(self) -> Recipient<Response, Streams> {
9090
let (response_sender, response_promise) = Promise::<Response>::new();
9191
self.commands.add(AddImpulse::new(
92-
dbg!(self.target),
92+
self.target,
9393
TakenResponse::<Response>::new(response_sender),
9494
));
9595
let mut map = StreamTargetMap::default();
@@ -107,7 +107,7 @@ where
107107
pub fn take_response(self) -> Promise<Response> {
108108
let (response_sender, response_promise) = Promise::<Response>::new();
109109
self.commands.add(AddImpulse::new(
110-
dbg!(self.target),
110+
self.target,
111111
TakenResponse::<Response>::new(response_sender),
112112
));
113113
response_promise
@@ -327,39 +327,115 @@ impl<T> Default for Collection<T> {
327327
#[cfg(test)]
328328
mod tests {
329329
use crate::{*, testing::*};
330-
use std::time::Duration;
330+
use std::time::{Instant, Duration};
331331

332332
#[test]
333-
fn test_provide() {
333+
fn test_blocking_map() {
334334
let mut context = TestingContext::minimal_plugins();
335335

336336
let mut promise = context.build(|commands| {
337-
commands.provide("hello".to_owned()).take_response()
337+
commands
338+
.request("hello".to_owned(), to_uppercase.into_blocking_map())
339+
.take_response()
338340
});
339-
assert!(promise.peek().available().is_some_and(|v| v == "hello"));
341+
342+
context.run_while_pending(&mut promise);
343+
assert!(promise.peek().available().is_some_and(|v| v == "HELLO"));
344+
assert!(context.no_unhandled_errors());
345+
346+
let mut promise = context.build(|commands| {
347+
commands
348+
.request("hello".to_owned(), to_uppercase.into_blocking_map_once())
349+
.take_response()
350+
});
351+
352+
context.run_while_pending(&mut promise);
353+
assert!(promise.peek().available().is_some_and(|v| v == "HELLO"));
354+
assert!(context.no_unhandled_errors());
355+
356+
let mut promise = context.build(|commands| {
357+
commands
358+
.provide("hello".to_owned())
359+
.map_block(to_uppercase)
360+
.take_response()
361+
});
362+
363+
context.run_while_pending(&mut promise);
364+
assert!(promise.peek().available().is_some_and(|v| v == "HELLO"));
365+
assert!(context.no_unhandled_errors());
366+
367+
let mut promise = context.build(|commands| {
368+
commands
369+
.provide("hello".to_owned())
370+
.map_block(|request| request.to_uppercase())
371+
.take_response()
372+
});
373+
374+
context.run_while_pending(&mut promise);
375+
assert!(promise.peek().available().is_some_and(|v| v == "HELLO"));
376+
assert!(context.no_unhandled_errors());
340377
}
341378

342379
#[test]
343380
fn test_async_map() {
344381
let mut context = TestingContext::minimal_plugins();
345382

383+
let request = WaitRequest {
384+
duration: Duration::from_secs_f64(0.001),
385+
value: "hello".to_owned(),
386+
};
387+
388+
let conditions = FlushConditions::new()
389+
.with_timeout(Duration::from_secs_f64(5.0));
390+
346391
let mut promise = context.build(|commands| {
347392
commands
348-
.request(
349-
WaitRequest {
350-
duration: Duration::from_secs_f64(0.001),
351-
value: "hello".to_owned(),
352-
},
353-
wait.into_async_map(),
354-
)
393+
.request(request.clone(), wait.into_async_map())
355394
.take_response()
356395
});
357396

358-
context.run_with_conditions(
359-
&mut promise,
360-
FlushConditions::new()
361-
.with_timeout(Duration::from_secs_f64(5.0)),
362-
);
397+
assert!(context.run_with_conditions(&mut promise, conditions.clone()));
398+
assert!(promise.peek().available().is_some_and(|v| v == "hello"));
399+
assert!(context.no_unhandled_errors());
400+
401+
let mut promise = context.build(|commands| {
402+
commands
403+
.request(request.clone(), wait.into_async_map_once())
404+
.take_response()
405+
});
406+
407+
assert!(context.run_with_conditions(&mut promise, conditions.clone()));
408+
assert!(promise.peek().available().is_some_and(|v| v == "hello"));
409+
assert!(context.no_unhandled_errors());
410+
411+
let mut promise = context.build(|commands| {
412+
commands
413+
.provide(request.clone())
414+
.map_async(wait)
415+
.take_response()
416+
});
417+
418+
assert!(context.run_with_conditions(&mut promise, conditions.clone()));
419+
assert!(promise.peek().available().is_some_and(|v| v == "hello"));
420+
assert!(context.no_unhandled_errors());
421+
422+
let mut promise = context.build(|commands| {
423+
commands
424+
.provide(request.clone())
425+
.map_async(|request| {
426+
async move {
427+
let t = Instant::now();
428+
while t.elapsed() < request.duration {
429+
// Busy wait
430+
}
431+
request.value
432+
}
433+
})
434+
.take_response()
435+
});
436+
437+
assert!(context.run_with_conditions(&mut promise, conditions.clone()));
363438
assert!(promise.peek().available().is_some_and(|v| v == "hello"));
439+
assert!(context.no_unhandled_errors());
364440
}
365441
}

src/operation/operate_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ where
190190

191191
let task_source = world.spawn(()).id();
192192
OperateTask::new(
193-
dbg!(task_source), session, source, dbg!(target), task, None, sender
193+
task_source, session, source, target, task, None, sender,
194194
).add(world, roster);
195195
Ok(())
196196
}

src/request.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ impl<'w, 's> RequestExt<'w, 's> for Commands<'w, 's> {
9797
.set_parent(target)
9898
.id();
9999

100-
provider.connect(source, dbg!(target), self);
100+
provider.connect(source, target, self);
101101

102-
self.add(InputCommand { session: dbg!(source), target: dbg!(source), data: request });
102+
self.add(InputCommand { session: source, target: source, data: request });
103103

104104
Impulse {
105105
source,

src/testing.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ pub fn to_uppercase(value: String) -> String {
210210
value.to_uppercase()
211211
}
212212

213+
#[derive(Clone, Copy)]
213214
pub struct WaitRequest<Value> {
214215
pub duration: std::time::Duration,
215216
pub value: Value

0 commit comments

Comments
 (0)