Skip to content

Commit 60adc3f

Browse files
committed
Add metadata to provider inputs
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
1 parent 9dd64e6 commit 60adc3f

File tree

6 files changed

+55
-9
lines changed

6 files changed

+55
-9
lines changed

src/handler.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ where
175175

176176
let streams = Streams::make_buffer(input.source, input.world);
177177

178-
let response = self.system.run(BlockingHandler { request, streams: streams.clone() }, input.world);
178+
let response = self.system.run(BlockingHandler {
179+
request, streams: streams.clone(), source: input.source, session,
180+
}, input.world);
179181
self.system.apply_deferred(&mut input.world);
180182

181183
Streams::process_buffer(streams, input.source, session, input.world, input.roster)?;
@@ -207,7 +209,9 @@ where
207209
self.system.initialize(&mut input.world);
208210
}
209211

210-
let task = self.system.run(AsyncHandler { request, channel }, &mut input.world);
212+
let task = self.system.run(AsyncHandler {
213+
request, channel, source: input.source, session,
214+
}, &mut input.world);
211215
self.system.apply_deferred(&mut input.world);
212216

213217
input.give_task(session, task)
@@ -293,7 +297,9 @@ where
293297
let callback = move |mut input: HandleRequest| {
294298
let Input { session, data: request } = input.get_request::<Self::Request>()?;
295299
let streams = Streams::make_buffer(input.source, input.world);
296-
let response = (self)(BlockingHandler { request, streams: streams.clone() });
300+
let response = (self)(BlockingHandler {
301+
request, streams: streams.clone(), source: input.source, session,
302+
});
297303
Streams::process_buffer(streams, input.source, session, input.world, input.roster)?;
298304
input.give_response(session, response)
299305
};
@@ -317,7 +323,9 @@ where
317323
let callback = move |mut input: HandleRequest| {
318324
let Input { session, data: request } = input.get_request::<Self::Request>()?;
319325
let channel = input.get_channel(session)?;
320-
let task = (self)(AsyncHandler { request, channel });
326+
let task = (self)(AsyncHandler {
327+
request, channel, source: input.source, session,
328+
});
321329
input.give_task(session, task)
322330
};
323331
Handler::new(CallbackHandler { callback })

src/impulse/map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ where
9292
let Input { session, data: request } = source_mut.take_input::<Request>()?;
9393
let f = source_mut.take::<BlockingMapOnceStorage<F>>().or_broken()?.f;
9494

95-
let response = f.call(BlockingMap { request, streams: streams.clone() });
95+
let response = f.call(BlockingMap { request, streams: streams.clone(), source, session });
9696

9797
Streams::process_buffer(streams, source, session, world, roster)?;
9898

src/lib.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,16 @@ use bevy::prelude::{Entity, In};
126126
/// ```
127127
#[non_exhaustive]
128128
pub struct BlockingService<Request, Streams: StreamPack = ()> {
129+
/// The input data of the request
129130
pub request: Request,
130-
pub provider: Entity,
131+
/// The buffer to hold stream output data until the function is finished
131132
pub streams: Streams::Buffer,
133+
/// The entity providing the service
134+
pub provider: Entity,
135+
/// The node in a workflow or impulse chain that asked for the service
136+
pub source: Entity,
137+
/// The unique session ID for the workflow
138+
pub session: Entity,
132139
}
133140

134141
/// Use this to reduce bracket noise when you need `In<BlockingService<R>>`.
@@ -142,9 +149,18 @@ pub type InBlockingService<Request, Streams = ()> = In<BlockingService<Request,
142149
/// ECS asynchronously while it is polled from inside the task pool.
143150
#[non_exhaustive]
144151
pub struct AsyncService<Request, Streams: StreamPack = ()> {
152+
/// The input data of the request
145153
pub request: Request,
154+
/// The channel that allows querying and syncing with the world while the
155+
/// service runs asynchronously. Use the [`Channel::streams`] method to
156+
/// send stream output data from the service.
146157
pub channel: Channel<Streams>,
158+
/// The entity providing the service
147159
pub provider: Entity,
160+
/// The node in a workflow or impulse chain that asked for the service
161+
pub source: Entity,
162+
/// The unique session ID for the workflow
163+
pub session: Entity,
148164
}
149165

150166
/// Use this to reduce backet noise when you need `In<AsyncService<R, S>>`.
@@ -155,8 +171,14 @@ pub type InAsyncService<Request, Streams = ()> = In<AsyncService<Request, Stream
155171
/// not associated with any entity.
156172
#[non_exhaustive]
157173
pub struct BlockingHandler<Request, Streams: StreamPack = ()> {
174+
/// The input data of the request
158175
pub request: Request,
176+
/// The buffer to hold stream output data until the function is finished
159177
pub streams: Streams::Buffer,
178+
/// The node in a workflow or impulse chain that asked for the callback
179+
pub source: Entity,
180+
/// The unique session ID for the workflow
181+
pub session: Entity,
160182
}
161183

162184
/// Use AsyncHandler to indicate that your system or function is meant to define
@@ -165,8 +187,16 @@ pub struct BlockingHandler<Request, Streams: StreamPack = ()> {
165187
/// will be polled by the async task pool.
166188
#[non_exhaustive]
167189
pub struct AsyncHandler<Request, Streams: StreamPack = ()> {
190+
/// The input data of the request
168191
pub request: Request,
192+
/// The channel that allows querying and syncing with the world while the
193+
/// service runs asynchronously. Use the [`Channel::streams`] method to
194+
/// send stream output data from the service.
169195
pub channel: Channel<Streams>,
196+
/// The node in a workflow or impulse chain that asked for the callback
197+
pub source: Entity,
198+
/// The unique session ID for the workflow
199+
pub session: Entity,
170200
}
171201

172202
/// Use BlockingMap to indicate that your function is meant to define a blocking
@@ -175,8 +205,14 @@ pub struct AsyncHandler<Request, Streams: StreamPack = ()> {
175205
/// implement [`FnOnce`].
176206
#[non_exhaustive]
177207
pub struct BlockingMap<Request, Streams: StreamPack = ()> {
208+
/// The input data of the request
178209
pub request: Request,
210+
/// The buffer to hold stream output data until the function is finished
179211
pub streams: Streams::Buffer,
212+
/// The node in a workflow or impulse chain that asked for the callback
213+
pub source: Entity,
214+
/// The unique session ID for the workflow
215+
pub session: Entity,
180216
}
181217

182218
/// Use AsyncMap to indicate that your function is meant to define an async

src/operation/operate_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ where
9393
let mut map = source_mut.get_mut::<BlockingMapStorage<F>>().or_broken()?;
9494
let mut f = map.f.take().or_broken()?;
9595

96-
let response = f.call(BlockingMap { request, streams: streams.clone() });
96+
let response = f.call(BlockingMap { request, streams: streams.clone(), source, session });
9797
map.f = Some(f);
9898

9999
Streams::process_buffer(streams, source, session, world, roster)?;

src/service/async_srv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ where
221221

222222
let sender = world.get_resource_or_insert_with(|| ChannelQueue::new()).sender.clone();
223223
let channel = InnerChannel::new(source, session, sender.clone()).into_specific(world)?;
224-
let job = service.run(AsyncService { request, channel, provider }, world);
224+
let job = service.run(AsyncService { request, channel, provider, source, session }, world);
225225
service.apply_deferred(world);
226226

227227
if let Some(mut service_storage) = world.get_mut::<AsyncServiceStorage<Request, Streams, Task>>(provider) {

src/service/blocking.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ where
111111
};
112112

113113
let streams = Streams::make_buffer(source, world);
114-
let response = service.run(BlockingService { request, provider, streams: streams.clone() }, world);
114+
let response = service.run(BlockingService {
115+
request, streams: streams.clone(), provider, source, session,
116+
}, world);
115117
service.apply_deferred(world);
116118
Streams::process_buffer(streams, source, session, world, roster)?;
117119

0 commit comments

Comments
 (0)