Skip to content

Commit 16b6ffa

Browse files
committed
Testing races to exit scope
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 34a2cd1 commit 16b6ffa

File tree

8 files changed

+242
-70
lines changed

8 files changed

+242
-70
lines changed

src/builder.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::{
2323
Provider, UnusedTarget, StreamPack, Node, InputSlot, Output, StreamTargetMap,
2424
Buffer, BufferSettings, AddOperation, OperateBuffer, Scope, OperateScope,
2525
ScopeSettings, BeginCancel, ScopeEndpoints, IntoBlockingMap, IntoAsyncMap,
26+
AsMap, ProvideOnce,
2627
};
2728

2829
pub(crate) mod connect;
@@ -99,6 +100,29 @@ impl<'w, 's, 'a> Builder<'w, 's, 'a> {
99100
self.create_node(f.into_async_map())
100101
}
101102

103+
/// Create a map (either a [blocking map][1] or an
104+
/// [async map][2]) by providing a function that takes [`BlockingMap`][1] or
105+
/// [AsyncMap][2] as its only argument.
106+
///
107+
/// [1]: crate::BlockingMap
108+
/// [2]: crate::AsyncMap
109+
pub fn create_map<M, F: AsMap<M>>(
110+
&mut self,
111+
f: F
112+
) -> Node<
113+
<F::MapType as ProvideOnce>::Request,
114+
<F::MapType as ProvideOnce>::Response,
115+
<F::MapType as ProvideOnce>::Streams,
116+
>
117+
where
118+
F::MapType: Provider,
119+
<F::MapType as ProvideOnce>::Request: 'static + Send + Sync,
120+
<F::MapType as ProvideOnce>::Response: 'static + Send + Sync,
121+
<F::MapType as ProvideOnce>::Streams: StreamPack,
122+
{
123+
self.create_node(f.as_map())
124+
}
125+
102126
/// Connect the output of one into the input slot of another node.
103127
pub fn connect<T: 'static + Send + Sync>(
104128
&mut self,

src/builder/connect.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,27 @@ fn try_connect(connect: Connect, world: &mut World) -> OperationResult {
6767
return Ok(());
6868
}
6969

70+
println!("Connecting {:?} to {:?}", connect.original_target, connect.new_target);
71+
7072
let old_inputs = world.get_entity_mut(connect.original_target).or_broken()?
7173
.take::<SingleInputStorage>().or_broken()?
7274
.take();
7375

76+
dbg!(&old_inputs);
77+
7478
for input in old_inputs.into_iter() {
79+
dbg!(input);
7580
let mut input_mut = world.get_entity_mut(input).or_broken()?;
7681

7782
if let Some(mut target) = input_mut.get_mut::<SingleTargetStorage>() {
7883
target.set(connect.new_target);
84+
dbg!(&target);
7985
}
8086

8187
if let Some(mut targets) = input_mut.get_mut::<ForkTargetStorage>() {
8288
for target in &mut targets.0 {
8389
if *target == connect.original_target {
90+
dbg!(&target, connect.new_target);
8491
*target = connect.new_target;
8592
}
8693
}
@@ -89,6 +96,7 @@ fn try_connect(connect: Connect, world: &mut World) -> OperationResult {
8996
if let Some(mut targets) = input_mut.get_mut::<StreamTargetMap>() {
9097
for target in &mut targets.map {
9198
if *target == connect.original_target {
99+
dbg!(&target, connect.new_target);
92100
*target = connect.new_target;
93101
}
94102
}

src/input.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ impl<'w> ManageInput for EntityMut<'w> {
186186
session: Entity,
187187
data: T,
188188
) -> Result<(), OperationError> {
189+
dbg!(self.id());
189190
let mut storage = self.get_mut::<InputStorage<T>>().or_broken()?;
191+
dbg!(session);
190192
storage.reverse_queue.insert(0, Input { session, data });
191193
Ok(())
192194
}

src/node.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use bevy::prelude::Entity;
1919

2020
use crate::{
2121
StreamPack, Chain, Builder, UnusedTarget, AddBranchToForkClone, ForkClone,
22-
AddOperation, ForkTargetStorage,
22+
AddOperation, ForkTargetStorage, SingleInputStorage,
2323
};
2424

2525
/// A collection of all the inputs and outputs for a node within a workflow.
@@ -87,7 +87,7 @@ impl<Response> std::fmt::Debug for Output<Response> {
8787
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8888
f.debug_struct(format!("Output<{}>", std::any::type_name::<Response>()).as_str())
8989
.field("scope", &self.scope)
90-
.field("targret", &self.target)
90+
.field("target", &self.target)
9191
.finish()
9292
}
9393
}
@@ -151,7 +151,10 @@ pub struct ForkCloneOutput<Response> {
151151
impl<Response: 'static + Send + Sync> ForkCloneOutput<Response> {
152152
pub fn clone_output(&self, builder: &mut Builder) -> Output<Response> {
153153
assert_eq!(self.scope, builder.scope);
154-
let target = builder.commands.spawn(UnusedTarget).id();
154+
let target = builder.commands.spawn((
155+
SingleInputStorage::new(self.id()),
156+
UnusedTarget,
157+
)).id();
155158
builder.commands.add(AddBranchToForkClone {
156159
source: self.source,
157160
target,

src/operation.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ impl From<SmallVec<[Entity; 8]>> for FunnelInputStorage {
144144
}
145145

146146
/// Keep track of the target for a link in a impulse chain
147-
#[derive(Component, Clone, Copy)]
147+
#[derive(Component, Clone, Copy, Debug)]
148148
pub struct SingleTargetStorage(Entity);
149149

150150
impl SingleTargetStorage {
@@ -327,9 +327,19 @@ pub struct OperationCleanup<'a> {
327327
impl<'a> OperationCleanup<'a> {
328328

329329
pub fn clean(&mut self) {
330-
let Some(cleanup) = self.world.get::<OperationCleanupStorage>(self.source) else {
330+
let Some(source_ref) = self.world.get_entity(self.source) else {
331+
dbg!(self.source);
331332
return;
332333
};
334+
// let Some(cleanup) = self.world.get::<OperationCleanupStorage>(self.source) else {
335+
// dbg!(self.source);
336+
// return;
337+
// };
338+
let Some(cleanup) = source_ref.get::<OperationCleanupStorage>() else {
339+
dbg!(self.source);
340+
return;
341+
};
342+
333343
let cleanup = cleanup.0;
334344
if let Err(error) = cleanup(OperationCleanup {
335345
source: self.source,
@@ -359,11 +369,14 @@ impl<'a> OperationCleanup<'a> {
359369
}
360370

361371
pub fn notify_cleaned(&mut self) -> OperationResult {
372+
dbg!(self.source);
362373
let source_mut = self.world.get_entity_mut(self.source).or_broken()?;
363374
let scope = source_mut.get::<ScopeStorage>().or_not_ready()?.get();
375+
dbg!(scope);
364376
let mut scope_mut = self.world.get_entity_mut(scope).or_broken()?;
365377
let mut scope_contents = scope_mut.get_mut::<ScopeContents>().or_broken()?;
366378
if scope_contents.register_cleanup_of_node(self.session, self.source) {
379+
dbg!();
367380
self.roster.cleanup_finished(
368381
CleanupFinished { scope, session: self.session }
369382
);
@@ -565,6 +578,7 @@ impl<Op: Operation + 'static + Sync + Send> Command for AddOperation<Op> {
565578
}
566579

567580
let mut source_mut = world.entity_mut(self.source);
581+
dbg!(self.source);
568582
source_mut
569583
.insert((
570584
OperationExecuteStorage(perform_operation::<Op>),

0 commit comments

Comments
 (0)