Skip to content

Commit a5671dc

Browse files
Merge pull request #2780 from didier-wenzek/refactor/improve-message-source-sink-naming
refactor: improve message source and sink traits
2 parents 457780b + 53585d0 commit a5671dc

File tree

35 files changed

+168
-192
lines changed

35 files changed

+168
-192
lines changed

crates/common/batcher/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl<B: Batchable> MessageSink<BatchDriverInput<B>> for BatchingActorBuilder<B>
7474
}
7575

7676
impl<B: Batchable> MessageSource<BatchDriverOutput<B>, NoConfig> for BatchingActorBuilder<B> {
77-
fn register_peer(&mut self, config: NoConfig, sender: DynSender<BatchDriverOutput<B>>) {
78-
self.message_box.register_peer(config, sender)
77+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<BatchDriverOutput<B>>) {
78+
self.message_box.connect_sink(config, peer)
7979
}
8080
}
8181

crates/core/tedge_actors/src/builders.rs

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,17 @@
4747
//! (possibly several receivers to handle message priorities among inputs),
4848
//! and has to give clones of the associated Sender (or Senders) to its peers.
4949
//! - The first responsibility of a builder is to create a channel per receiver of the actor
50-
//! under construction. The receiver will be given to the actor on build.
50+
//! under construction. The receiver will be given to the actor when built.
5151
//! The sender is owned by the builder to be cloned and given to any peer that needs to send data
5252
//! to the actor under construction.
53-
//! - The second responsibility of the builder is to collect a Sender for each peer the actor
53+
//! - The second responsibility of the builder is to collect a sender for each peer the actor
5454
//! under construction needs to send messages to. This is the mirror of the previous responsibility:
5555
//! each builder gives to the others clones of its senders and collects senders from others.
5656
//! - This is why all the actor building traits
5757
//! ([MessageSource], [MessageSink] and [RuntimeRequestSink])
58-
//! are related to exchanges of Sender. A sink gives to a source a sender attached to its receiver.
59-
//! - To be precise, the actor builders exchange [DynSender] and not [Sender]. The difference is that
60-
//! a [DynSender] can transform the messages sent by the source to adapt them to the sink expectations,
58+
//! are related to exchanges of Sender. A sink gives to a source a sender attached to its own receiver.
59+
//! - To be precise, the actor builders exchange [DynSender] and not [Sender](futures::channel::mpsc::Sender).
60+
//! The difference is that a [DynSender] can transform the messages sent by the source to adapt them to the sink expectations,
6161
//! using an `impl From<SourceMessage> for SinkMessage`. This flexibility allows an actor to receive
6262
//! messages from several independent sources (see the [fan_in_message_type](crate::fan_in_message_type) macro).
6363
use crate::mpsc;
@@ -95,8 +95,8 @@ pub struct NoConfig;
9595
/// The [Builder] of an [Actor](crate::Actor) must implement this trait
9696
/// for every message type that actor can receive from its peers.
9797
///
98-
/// An actor whose builder is a `MessageSink<M, C>` can be connected to any other actor
99-
/// whose builder is a `MessageSource<M, C>` so that the sink can receive messages from that source.
98+
/// An actor whose builder is a `MessageSink<M>` can be connected to any other actor
99+
/// whose builder is a `MessageSource<Into<M>, C>` so that the sink can receive messages from that source.
100100
pub trait MessageSink<M: Message> {
101101
/// Return the sender that can be used by peers to send messages to this actor
102102
fn get_sender(&self) -> DynSender<M>;
@@ -105,12 +105,12 @@ pub trait MessageSink<M: Message> {
105105
///
106106
/// A sink might be interested only in a subset of the messages emitted by the source.
107107
/// This subset is defined by the config parameter.
108-
fn add_input<N, C>(&mut self, config: C, source: &mut impl MessageSource<N, C>)
108+
fn connect_source<N, C>(&self, config: C, source: &mut impl MessageSource<N, C>)
109109
where
110110
N: Message,
111111
M: From<N>,
112112
{
113-
source.register_peer(config, self.get_sender().sender_clone())
113+
source.connect_sink(config, &self.get_sender())
114114
}
115115

116116
/// Add a source of messages to the actor under construction, the messages being translated on the fly.
@@ -139,7 +139,7 @@ pub trait MessageSink<M: Message> {
139139
/// let mut sender_builder = SimpleMessageBoxBuilder::new("Send", 16);
140140
///
141141
/// // Convert the `&str` sent by the source into an iterator of `char` as expected by the receiver.
142-
/// receiver_builder.add_mapped_input(NoConfig, &mut sender_builder, |str: &'static str| str.chars() );
142+
/// receiver_builder.connect_mapped_source(NoConfig, &mut sender_builder, |str: &'static str| str.chars() );
143143
///
144144
/// let mut sender: SimpleMessageBox<NoMessage, &'static str>= sender_builder.build();
145145
/// let receiver: SimpleMessageBox<char, NoMessage> = receiver_builder.build();
@@ -158,8 +158,8 @@ pub trait MessageSink<M: Message> {
158158
/// # Ok(())
159159
/// # }
160160
/// ```
161-
fn add_mapped_input<N, C, MS, MessageMapper>(
162-
&mut self,
161+
fn connect_mapped_source<N, C, MS, MessageMapper>(
162+
&self,
163163
config: C,
164164
source: &mut impl MessageSource<N, C>,
165165
cast: MessageMapper,
@@ -169,23 +169,30 @@ pub trait MessageSink<M: Message> {
169169
MessageMapper: Fn(N) -> MS,
170170
MessageMapper: 'static + Send + Sync,
171171
{
172-
let sender = MappingSender::new(self.get_sender(), cast);
173-
source.register_peer(config, sender.into())
172+
let sender: DynSender<N> = MappingSender::new(self.get_sender(), cast).into();
173+
source.connect_sink(config, &sender)
174+
}
175+
}
176+
177+
/// A [DynSender] can be used as a [MessageSink],
178+
/// provided the messages expected by the sender can be built from those sent to the sink.
179+
impl<N: Message, M: Message + From<N>> MessageSink<N> for DynSender<M> {
180+
fn get_sender(&self) -> DynSender<N> {
181+
self.sender_clone()
174182
}
175183
}
176184

177185
/// The [Builder] of an [Actor](crate::Actor) must implement this trait
178186
/// for every message type that actor can send to its peers.
179187
///
180-
/// To receive messages from a `MessageSource<M, C>`, the peer must be a `MessageSink<M>`.
188+
/// To receive messages from a `MessageSource<M, C>`, the peer must be a `MessageSink<From<M>>`.
181189
pub trait MessageSource<M: Message, Config> {
182-
/// The message will be sent to the peer using the provided `sender`
183-
fn register_peer(&mut self, config: Config, sender: DynSender<M>);
184-
185-
/// Connect a peer actor that will consume the message produced by this actor
186-
fn add_sink(&mut self, config: Config, peer: &impl MessageSink<M>) {
187-
self.register_peer(config, peer.get_sender());
188-
}
190+
/// Connect a peer actor that will consume the messages produced by this actor
191+
///
192+
/// The messages will be sent to the peer using its sender, the `peer.get_sender()`.
193+
/// A peer can subscribe to a subset of the messages produced by this source.
194+
/// This subset of messages expected by the peer is defined by the `config` parameter.
195+
fn connect_sink(&mut self, config: Config, peer: &impl MessageSink<M>);
189196
}
190197

191198
/// The [Builder] of an [Actor](crate::Actor) must implement this trait
@@ -290,8 +297,8 @@ pub trait RuntimeRequestSink {
290297
/// # }
291298
/// # }
292299
/// # impl MessageSource<MyActorOutput, NoConfig> for MyActorBuilder {
293-
/// # fn register_peer(&mut self, config: NoConfig, sender: DynSender<MyActorOutput>) {
294-
/// # self.messages.register_peer(config, sender)
300+
/// # fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<MyActorOutput>) {
301+
/// # self.messages.connect_sink(config, peer)
295302
/// # }
296303
/// # }
297304
/// # impl MessageSink<MyActorInput> for MyActorBuilder {
@@ -330,8 +337,8 @@ pub trait RuntimeRequestSink {
330337
/// // Connect a test box to an actor under test
331338
/// let mut my_actor_builder = MyActorBuilder::new(MyActorConfig::default());
332339
/// let mut test_box_builder = SimpleMessageBoxBuilder::new("Test box", 16);
333-
/// my_actor_builder.register_peer(NoConfig, test_box_builder.get_sender());
334-
/// test_box_builder.register_peer(NoConfig, my_actor_builder.get_sender());
340+
/// my_actor_builder.connect_sink(NoConfig, &test_box_builder);
341+
/// my_actor_builder.connect_source(NoConfig, &mut test_box_builder);
335342
///
336343
/// // Build the test box and run the actor
337344
/// let mut test_box = test_box_builder.build();
@@ -379,8 +386,8 @@ impl<I: Message, O: Message> SimpleMessageBoxBuilder<I, O> {
379386
config: Config,
380387
service: &mut (impl MessageSink<O> + MessageSource<I, Config>),
381388
) {
382-
service.register_peer(config, self.input_sender.sender_clone());
383-
self.register_peer(NoConfig, service.get_sender());
389+
service.connect_sink(config, self);
390+
self.connect_sink(NoConfig, service);
384391
}
385392

386393
/// Connect this client message box to the service message box
@@ -399,8 +406,8 @@ impl<I: Message, O: Message> SimpleMessageBoxBuilder<I, O> {
399406

400407
/// A `SimpleMessageBoxBuilder<Input,Output>` is a [MessageSource] of `Output` messages ignoring the config.
401408
impl<I: Message, O: Message, C> MessageSource<O, C> for SimpleMessageBoxBuilder<I, O> {
402-
fn register_peer(&mut self, _config: C, sender: DynSender<O>) {
403-
self.output_sender = sender;
409+
fn connect_sink(&mut self, _config: C, peer: &impl MessageSink<O>) {
410+
self.output_sender = peer.get_sender();
404411
}
405412
}
406413

crates/core/tedge_actors/src/channels.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub trait CloneSender<M>: Sender<M> {
2525

2626
/// Clone a cast of this sender into a `Box<dyn Sender<M>>`
2727
///
28-
/// This is a workaround for https://github.com/rust-lang/rust/issues/65991
28+
/// This is a workaround for <https://github.com/rust-lang/rust/issues/65991>
2929
fn sender(&self) -> Box<dyn Sender<M>>;
3030
}
3131

crates/core/tedge_actors/src/converter.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ impl<C: Converter> ConvertingActor<C> {
213213
/// );
214214
///
215215
/// // Connect this actor as a sink of the mqtt actor, to receive input messages from it
216-
/// converter_builder.add_input(TopicFilter("some/mqtt/topics/#"), &mut mqtt_builder);
216+
/// converter_builder.connect_source(TopicFilter("some/mqtt/topics/#"), &mut mqtt_builder);
217217
///
218218
/// // Connect the same mqtt actor as a sink of this actor, to send output messages to it
219-
/// converter_builder.add_sink(NoConfig, &mut mqtt_builder);
219+
/// converter_builder.connect_sink(NoConfig, &mut mqtt_builder);
220220
///
221221
/// // Finally build the actor
222222
/// converter_builder.build()
@@ -259,8 +259,8 @@ impl<C: Converter> Builder<ConvertingActor<C>> for ConvertingActorBuilder<C> {
259259
}
260260

261261
impl<C: Converter> MessageSource<C::Output, NoConfig> for ConvertingActorBuilder<C> {
262-
fn register_peer(&mut self, config: NoConfig, sender: DynSender<C::Output>) {
263-
self.message_box.register_peer(config, sender)
262+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<C::Output>) {
263+
self.message_box.connect_sink(config, peer)
264264
}
265265
}
266266

crates/core/tedge_actors/src/examples/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
//! to establish appropriate connections between the actor message boxes.
4444
//!
4545
//! ```
46-
//! # use tedge_actors::{Actor, Builder, ChannelError, MessageReceiver, Sender, NoConfig, ServerActor, ServerMessageBox, ServerMessageBoxBuilder, SimpleMessageBox, SimpleMessageBoxBuilder, Service, MessageSink};
46+
//! # use tedge_actors::{Actor, Builder, ChannelError, MessageReceiver, Sender, NoConfig, ServerActor, ServerMessageBox, ServerMessageBoxBuilder, SimpleMessageBox, SimpleMessageBoxBuilder, Service, MessageSink, MessageSource};
4747
//! # use crate::tedge_actors::examples::calculator_server::*;
4848
//! # #[tokio::main]
4949
//! # async fn main_test() -> Result<(),ChannelError> {
@@ -60,12 +60,14 @@
6060
//! // Connecting the two boxes, so the box built by the `player_box_builder`:
6161
//! // - receives as input, the output messages sent from the server message box
6262
//! // - sends output messages to the server message box as its input.
63-
//! server_box_builder.add_client(&mut player_1_box_builder);
63+
//! let request_sender = server_box_builder.connect_client(player_1_box_builder.get_sender());
64+
//! player_1_box_builder.connect_sink(NoConfig, &request_sender);
6465
//!
6566
//! // It matters that the builder of the server box is a `ServerMessageBoxBuilder`:
6667
//! // as this builder accepts multiple client actors to connect to the same server.
6768
//! let mut player_2_box_builder = SimpleMessageBoxBuilder::new("Player 2", 1);
68-
//! server_box_builder.add_client(&mut player_2_box_builder);
69+
//! let request_sender = server_box_builder.connect_client(player_2_box_builder.get_sender());
70+
//! player_2_box_builder.connect_sink(NoConfig, &request_sender);
6971
//!
7072
//! // One can then build the message boxes
7173
//! let server_box: ServerMessageBox<Operation,Update> = server_box_builder.build();

crates/core/tedge_actors/src/lib.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//!
55
//! Actors are processing units that interact using asynchronous messages.
66
//!
7-
//! The behavior of an [Actor](crate::Actor) is defined by:
7+
//! The behavior of an [Actor] is defined by:
88
//! - a state owned and freely updated by the actor,
99
//! - a [message box](crate::message_boxes) connected to peer actors,
1010
//! - input [messages](crate::Message) that the actor receives from its peers and processes in turn,
@@ -87,9 +87,9 @@
8787
//! ```
8888
//!
8989
//! This crate provides specific `Actor` implementations:
90-
//! - The [ServerActor](crate::ServerActor) wraps a [Server](crate::Server),
90+
//! - The [ServerActor] wraps a [Server],
9191
//! to implement a request-response communication pattern with a set of connected client actors.
92-
//! - The [ConvertingActor](crate::ConvertingActor) wraps a [Converter](crate::Converter),
92+
//! - The [ConvertingActor] wraps a [Converter],
9393
//! that translates each input message into a sequence of output messages.
9494
//!
9595
//! ## Testing an actor
@@ -104,9 +104,8 @@
104104
//! [Actor and message box builders](crate::builders) are provided to address these specificities
105105
//! with a generic approach without exposing the internal structure of the actors.
106106
//!
107-
//! To test the `Calculator` example we need first to create its box using a
108-
//! [SimpleMessageBoxBuilder](crate::SimpleMessageBoxBuilder),
109-
//! as this actor expects a [SimpleMessageBox](crate::SimpleMessageBox).
107+
//! To test the `Calculator` example we need first to create its box using a [SimpleMessageBoxBuilder],
108+
//! as this actor expects a [SimpleMessageBox].
110109
//! And then, to create a test box connected to the actor message box,
111110
//! we use the [ServiceProviderExt](crate::test_helpers::ServiceProviderExt) test helper extension
112111
//! and the [new_client_box](crate::test_helpers::ServiceProviderExt::new_client_box) method.
@@ -153,7 +152,7 @@
153152
//! # }
154153
//! ```
155154
//!
156-
//! See the [test_helpers](crate::test_helpers) module for various ways
155+
//! See the [test_helpers] module for various ways
157156
//! to observe and interact with running actors.
158157
//!
159158
//! - The primary tool to interact with an actor under test is the [SimpleMessageBoxBuilder],
@@ -172,26 +171,20 @@
172171
//! that define the services provided and consumed by the actors under construction.
173172
//!
174173
//! The connection builder traits work by pairs.
175-
//! A [MessageSink](crate::MessageSink) connects to a [MessageSource](crate::MessageSource),
174+
//! A [MessageSink] connects to a [MessageSource],
176175
//! so the messages sent by the latter will be received by the former.
177176
//!
178177
//! These traits define the types of the messages sent and received.
179178
//! - A sink that excepts message of type `M` can only be connected to a source of messages
180179
//! that can be converted into `M` values.
181-
//! - Similarly a service is defined by two types of messages, the requests received by the service
182-
//! and the responses sent by the service. To use a service, a consumer will have to send messages
183-
//! that can be converted into the service request type and be ready to receive messages converted from
184-
//! the service response type.
185180
//! - Note, that no contract is enforced beyond the type-compatibility of the messages sent between the actors.
186181
//! A consumer of an HTTP service needs to known that a request must be sent before any response can be received;
187182
//! while a consumer of an MQTT service can expect to receive messages without sending a single one.
188183
//!
189184
//! The connection builder traits also define a configuration type.
190-
//! - The semantics of this type is defined by the message source or the service provider.
191-
//! It can be used to filter the values sent to a given sink
192-
//! or to restrict the scope of the service provided to a given service consumer.
193-
//! - The configuration values are provided by the message sinks and the service consumers
194-
//! to specify the context of their connection to a source or a service.
185+
//! - The semantics of this type is defined by the message source and is used to filter the values sent to a sink.
186+
//! - The actual configuration values are provided by the actor sinks when connecting the source
187+
//! to specify the subset of messages their are interested into.
195188
//!
196189
//! Note that these traits are implemented by the actor builders, not by the actors themselves.
197190
//!
@@ -213,7 +206,7 @@
213206
//! ///
214207
//! /// The source registers the new consumer and its sender (i.e. where to send response),
215208
//! /// possibly using the configuration `config` to filter messages.
216-
//! fn register_peer(&mut self, config: SomeConfig, sender: DynSender<SomeMessage>) {
209+
//! fn connect_sink(&mut self, config: SomeConfig, peer: &impl MessageSink<SomeMessage>) {
217210
//! todo!()
218211
//! }
219212
//! }
@@ -231,7 +224,7 @@
231224
//! // can then be connected to each other.
232225
//! let mut producer = SomeActorBuilder::default();
233226
//! let mut consumer = SomeOtherActorBuilder::default();
234-
//! producer.register_peer(SomeConfig, consumer.get_sender());
227+
//! producer.connect_sink(SomeConfig, &consumer);
235228
//! ```
236229
//!
237230
//! ## Running actors

crates/core/tedge_actors/src/message_boxes.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
//! Conceptually, a message box is a receiver of input messages combined with a sender of output messages.
88
//! * The receiver is connected to the senders of peer actors;
99
//! and reciprocally the sender is connected to receivers of peer actors.
10-
//! * The receivers are [mpsc::Receiver](crate::mpsc::Receiver) that collect messages from several sources,
10+
//! * The receivers are [mpsc::Receiver] that collect messages from several sources,
1111
//! and deliver the messages to the actor in the order they have been received.
12-
//! * The senders are [DynSender](crate::DynSender) that adapt the messages sent to match constraints of the receivers.
12+
//! * The senders are [DynSender] that adapt the messages sent to match constraints of the receivers.
1313
//!
14-
//! A [SimpleMessageBox](crate::SimpleMessageBox) implements exactly this conceptual view:
14+
//! A [SimpleMessageBox] implements exactly this conceptual view:
1515
//!
1616
//! ```ascii
1717
//! input_senders: DynSender<Input> ...
@@ -79,7 +79,7 @@
7979
//!
8080
//! This crates provides several built-in message box implementations:
8181
//!
82-
//! - [SimpleMessageBox](crate::SimpleMessageBox) for actors that simply process messages in turn,
82+
//! - [SimpleMessageBox] for actors that simply process messages in turn,
8383
//! - [ServerMessageBox](crate::ServerMessageBox) for server actors that deliver a request-response service,
8484
//! - [ConcurrentServerMessageBox](crate::ConcurrentServerMessageBox) for server actors that process requests concurrently,
8585
//! - [ClientMessageBox](crate::ClientMessageBox) for client actors that use a request-response service from a server actor,

crates/core/tedge_actors/src/servers/builders.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use crate::Builder;
44
use crate::CloneSender;
55
use crate::ConcurrentServerActor;
66
use crate::ConcurrentServerMessageBox;
7-
use crate::DynRequestSender;
87
use crate::DynSender;
98
use crate::LoggingReceiver;
109
use crate::Message;
@@ -52,7 +51,7 @@ impl<Request: Message, Response: Message> ServerMessageBoxBuilder<Request, Respo
5251
}
5352

5453
/// Return a sender for the requests
55-
pub fn request_sender(&self) -> DynRequestSender<Request, Response> {
54+
pub fn request_sender(&self) -> DynSender<RequestEnvelope<Request, Response>> {
5655
self.request_sender.sender_clone()
5756
}
5857

@@ -134,7 +133,7 @@ impl<S: Server, K> ServerActorBuilder<S, K> {
134133
}
135134

136135
/// Return a sender for the requests
137-
pub fn request_sender(&self) -> DynRequestSender<S::Request, S::Response> {
136+
pub fn request_sender(&self) -> DynSender<RequestEnvelope<S::Request, S::Response>> {
138137
self.box_builder.request_sender()
139138
}
140139
}

0 commit comments

Comments
 (0)