Skip to content

Commit f35612d

Browse files
committed
Add tests for multiple streams
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 3c06628 commit f35612d

File tree

2 files changed

+205
-15
lines changed

2 files changed

+205
-15
lines changed

src/map.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use bevy::prelude::{Entity, Commands};
2525
use std::future::Future;
2626

2727
/// A newtype to indicate that the map definition is given directly by F.
28+
#[derive(Clone, Copy)]
2829
pub struct MapDef<F>(F);
2930

3031
/// Convert an [`FnMut`] that takes in a [`BlockingMap`] or an [`AsyncMap`] into
@@ -61,6 +62,15 @@ pub struct BlockingMapDef<Def, Request, Response, Streams> {
6162
_ignore: std::marker::PhantomData<(Request, Response, Streams)>,
6263
}
6364

65+
impl<Def: Clone, Request, Response, Streams> Clone for BlockingMapDef<Def, Request, Response, Streams> {
66+
fn clone(&self) -> Self {
67+
Self {
68+
def: self.def.clone(),
69+
_ignore: Default::default(),
70+
}
71+
}
72+
}
73+
6474
impl<Def, Request, Response, Streams> ProvideOnce for BlockingMapDef<Def, Request, Response, Streams>
6575
where
6676
Def: CallBlockingMap<Request, Response, Streams> + 'static + Send + Sync,
@@ -173,6 +183,15 @@ pub struct AsyncMapDef<Def, Request, Task, Streams> {
173183
_ignore: std::marker::PhantomData<(Request, Task, Streams)>,
174184
}
175185

186+
impl<Def: Clone, Request, Task, Streams> Clone for AsyncMapDef<Def, Request, Task, Streams> {
187+
fn clone(&self) -> Self {
188+
Self {
189+
def: self.def.clone(),
190+
_ignore: Default::default(),
191+
}
192+
}
193+
}
194+
176195
impl<Def, Request, Task, Streams> ProvideOnce for AsyncMapDef<Def, Request, Task, Streams>
177196
where
178197
Def: CallAsyncMap<Request, Task, Streams> + 'static + Send + Sync,

src/stream.rs

Lines changed: 186 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -717,21 +717,6 @@ impl<T1: StreamPack, T2: StreamPack, T3: StreamPack> StreamPack for (T1, T2, T3)
717717
mod tests {
718718
use crate::{*, testing::*};
719719

720-
fn test_counting_stream(
721-
provider: impl Provider<Request = u32, Response = u32, Streams = StreamOf<u32>>,
722-
context: &mut TestingContext,
723-
) {
724-
let mut recipient = context.command(|commands| {
725-
commands.request(10, provider).take()
726-
});
727-
728-
context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
729-
assert!(recipient.response.peek().available().is_some_and(|v| *v == 10));
730-
let stream: Vec<u32> = recipient.streams.into_iter().map(|v| v.0).collect();
731-
assert_eq!(stream, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
732-
assert!(context.no_unhandled_errors());
733-
}
734-
735720
#[test]
736721
fn test_single_stream() {
737722
let mut context = TestingContext::minimal_plugins();
@@ -812,4 +797,190 @@ mod tests {
812797

813798
test_counting_stream(count_async_map, &mut context);
814799
}
800+
801+
fn test_counting_stream(
802+
provider: impl Provider<Request = u32, Response = u32, Streams = StreamOf<u32>>,
803+
context: &mut TestingContext,
804+
) {
805+
let mut recipient = context.command(|commands| {
806+
commands.request(10, provider).take()
807+
});
808+
809+
context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
810+
assert!(recipient.response.peek().available().is_some_and(|v| *v == 10));
811+
let stream: Vec<u32> = recipient.streams.into_iter().map(|v| v.0).collect();
812+
assert_eq!(stream, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
813+
assert!(context.no_unhandled_errors());
814+
}
815+
816+
type FormatStreams = (StreamOf<u32>, StreamOf<i32>, StreamOf<f32>);
817+
#[test]
818+
fn test_tuple_stream() {
819+
let mut context = TestingContext::minimal_plugins();
820+
821+
let parse_blocking_srv = context.command(|commands| {
822+
commands.spawn_service(
823+
|In(input): BlockingServiceInput<String, FormatStreams>| {
824+
impl_formatting_streams_blocking(input.request, input.streams);
825+
}
826+
)
827+
});
828+
829+
test_formatting_stream(parse_blocking_srv, &mut context);
830+
831+
let parse_async_srv = context.command(|commands| {
832+
commands.spawn_service(
833+
|In(input): AsyncServiceInput<String, FormatStreams>| {
834+
async move {
835+
impl_formatting_streams_async(input.request, input.streams);
836+
}
837+
}
838+
)
839+
});
840+
841+
test_formatting_stream(parse_async_srv, &mut context);
842+
843+
let parse_blocking_callback = (
844+
|In(input): BlockingCallbackInput<String, FormatStreams>| {
845+
impl_formatting_streams_blocking(input.request, input.streams);
846+
}
847+
).as_callback();
848+
849+
test_formatting_stream(parse_blocking_callback, &mut context);
850+
851+
let parse_async_callback = (
852+
|In(input): AsyncCallbackInput<String, FormatStreams>| {
853+
async move {
854+
impl_formatting_streams_async(input.request, input.streams);
855+
}
856+
}
857+
).as_callback();
858+
859+
test_formatting_stream(parse_async_callback, &mut context);
860+
861+
let parse_blocking_map = (
862+
|input: BlockingMap<String, FormatStreams>| {
863+
impl_formatting_streams_blocking(input.request, input.streams);
864+
}
865+
).as_map();
866+
867+
test_formatting_stream(parse_blocking_map, &mut context);
868+
869+
let parse_async_map = (
870+
|input: AsyncMap<String, FormatStreams>| {
871+
async move {
872+
impl_formatting_streams_async(input.request, input.streams);
873+
}
874+
}
875+
).as_map();
876+
877+
test_formatting_stream(parse_async_map, &mut context);
878+
}
879+
880+
fn impl_formatting_streams_blocking(
881+
request: String,
882+
streams: <FormatStreams as StreamPack>::Buffer,
883+
) {
884+
if let Ok(value) = request.parse::<u32>() {
885+
streams.0.send(StreamOf(value));
886+
}
887+
888+
if let Ok(value) = request.parse::<i32>() {
889+
streams.1.send(StreamOf(value));
890+
}
891+
892+
if let Ok(value) = request.parse::<f32>() {
893+
streams.2.send(StreamOf(value));
894+
}
895+
}
896+
897+
fn impl_formatting_streams_async(
898+
request: String,
899+
streams: <FormatStreams as StreamPack>::Channel,
900+
) {
901+
if let Ok(value) = request.parse::<u32>() {
902+
streams.0.send(StreamOf(value));
903+
}
904+
905+
if let Ok(value) = request.parse::<i32>() {
906+
streams.1.send(StreamOf(value));
907+
}
908+
909+
if let Ok(value) = request.parse::<f32>() {
910+
streams.2.send(StreamOf(value));
911+
}
912+
}
913+
914+
fn test_formatting_stream(
915+
provider: impl Provider<Request = String, Response = (), Streams = FormatStreams> + Clone,
916+
context: &mut TestingContext,
917+
) {
918+
let mut recipient = context.command(|commands| {
919+
commands.request("5".to_owned(), provider.clone()).take()
920+
});
921+
922+
context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
923+
assert!(recipient.response.peek().available().is_some());
924+
assert!(context.no_unhandled_errors());
925+
926+
let outcome: FormatOutcome = recipient.into();
927+
assert_eq!(outcome.stream_u32, [5]);
928+
assert_eq!(outcome.stream_i32, [5]);
929+
assert_eq!(outcome.stream_f32, [5.0]);
930+
931+
let mut recipient = context.command(|commands| {
932+
commands.request("-2".to_owned(), provider.clone()).take()
933+
});
934+
935+
context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
936+
assert!(recipient.response.peek().available().is_some());
937+
assert!(context.no_unhandled_errors());
938+
939+
let outcome: FormatOutcome = recipient.into();
940+
assert!(outcome.stream_u32.is_empty());
941+
assert_eq!(outcome.stream_i32, [-2]);
942+
assert_eq!(outcome.stream_f32, [-2.0]);
943+
944+
let mut recipient = context.command(|commands| {
945+
commands.request("6.7".to_owned(), provider.clone()).take()
946+
});
947+
948+
context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
949+
assert!(recipient.response.peek().available().is_some());
950+
assert!(context.no_unhandled_errors());
951+
952+
let outcome: FormatOutcome = recipient.into();
953+
assert!(outcome.stream_u32.is_empty());
954+
assert!(outcome.stream_i32.is_empty());
955+
assert_eq!(outcome.stream_f32, [6.7]);
956+
957+
let mut recipient = context.command(|commands| {
958+
commands.request("hello".to_owned(), provider.clone()).take()
959+
});
960+
961+
context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
962+
assert!(recipient.response.peek().available().is_some());
963+
assert!(context.no_unhandled_errors());
964+
965+
let outcome: FormatOutcome = recipient.into();
966+
assert!(outcome.stream_u32.is_empty());
967+
assert!(outcome.stream_i32.is_empty());
968+
assert!(outcome.stream_f32.is_empty());
969+
}
970+
971+
struct FormatOutcome {
972+
stream_u32: Vec<u32>,
973+
stream_i32: Vec<i32>,
974+
stream_f32: Vec<f32>,
975+
}
976+
977+
impl From<Recipient<(), FormatStreams>> for FormatOutcome {
978+
fn from(recipient: Recipient<(), FormatStreams>) -> Self {
979+
Self {
980+
stream_u32: recipient.streams.0.into_iter().map(|v| v.0).collect(),
981+
stream_i32: recipient.streams.1.into_iter().map(|v| v.0).collect(),
982+
stream_f32: recipient.streams.2.into_iter().map(|v| v.0).collect()
983+
}
984+
}
985+
}
815986
}

0 commit comments

Comments
 (0)