Skip to content

Commit aba1aca

Browse files
committed
Deprecate MessageSource::register_peer
The purpose of the `register_peer` and `connect_sink` methods was exactly the same: to connect a source to a sink. The only difference was on their arguments, the sink being respectively passed as a sender or as a sink. Simply adding an `impl<M> MessageSink<M> for DynSender<M>`, makes the two methods fully equivalent. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 27fb29a commit aba1aca

File tree

26 files changed

+103
-106
lines changed

26 files changed

+103
-106
lines changed

crates/common/batcher/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl<B: Batchable> MessageSink<BatchDriverInput<B>> for BatchingActorBuilder<B>
7474
}
7575

7676
impl<B: Batchable> MessageSource<BatchDriverOutput<B>, NoConfig> for BatchingActorBuilder<B> {
77-
fn register_peer(&mut self, config: NoConfig, sender: DynSender<BatchDriverOutput<B>>) {
78-
self.message_box.register_peer(config, sender)
77+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<BatchDriverOutput<B>>) {
78+
self.message_box.connect_sink(config, peer)
7979
}
8080
}
8181

crates/core/tedge_actors/src/builders.rs

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pub struct NoConfig;
9595
/// The [Builder] of an [Actor](crate::Actor) must implement this trait
9696
/// for every message type that actor can receive from its peers.
9797
///
98-
/// An actor whose builder is a `MessageSink<M` can be connected to any other actor
98+
/// An actor whose builder is a `MessageSink<M>` can be connected to any other actor
9999
/// whose builder is a `MessageSource<Into<M>, C>` so that the sink can receive messages from that source.
100100
pub trait MessageSink<M: Message> {
101101
/// Return the sender that can be used by peers to send messages to this actor
@@ -105,12 +105,12 @@ pub trait MessageSink<M: Message> {
105105
///
106106
/// A sink might be interested only in a subset of the messages emitted by the source.
107107
/// This subset is defined by the config parameter.
108-
fn connect_source<N, C>(&mut self, config: C, source: &mut impl MessageSource<N, C>)
108+
fn connect_source<N, C>(&self, config: C, source: &mut impl MessageSource<N, C>)
109109
where
110110
N: Message,
111111
M: From<N>,
112112
{
113-
source.register_peer(config, self.get_sender().sender_clone())
113+
source.connect_sink(config, &self.get_sender())
114114
}
115115

116116
/// Add a source of messages to the actor under construction, the messages being translated on the fly.
@@ -159,7 +159,7 @@ pub trait MessageSink<M: Message> {
159159
/// # }
160160
/// ```
161161
fn connect_mapped_source<N, C, MS, MessageMapper>(
162-
&mut self,
162+
&self,
163163
config: C,
164164
source: &mut impl MessageSource<N, C>,
165165
cast: MessageMapper,
@@ -169,23 +169,30 @@ pub trait MessageSink<M: Message> {
169169
MessageMapper: Fn(N) -> MS,
170170
MessageMapper: 'static + Send + Sync,
171171
{
172-
let sender = MappingSender::new(self.get_sender(), cast);
173-
source.register_peer(config, sender.into())
172+
let sender: DynSender<N> = MappingSender::new(self.get_sender(), cast).into();
173+
source.connect_sink(config, &sender)
174+
}
175+
}
176+
177+
/// A [DynSender] can be used as a [MessageSink],
178+
/// provided the messages expected by the sender can be built from those sent to the sink.
179+
impl<N: Message, M: Message + From<N>> MessageSink<N> for DynSender<M> {
180+
fn get_sender(&self) -> DynSender<N> {
181+
self.sender_clone()
174182
}
175183
}
176184

177185
/// The [Builder] of an [Actor](crate::Actor) must implement this trait
178186
/// for every message type that actor can send to its peers.
179187
///
180-
/// To receive messages from a `MessageSource<M, C>`, the peer must be a `MessageSink<From<M>`.
188+
/// To receive messages from a `MessageSource<M, C>`, the peer must be a `MessageSink<From<M>>`.
181189
pub trait MessageSource<M: Message, Config> {
182-
/// The message will be sent to the peer using the provided `sender`
183-
fn register_peer(&mut self, config: Config, sender: DynSender<M>);
184-
185-
/// Connect a peer actor that will consume the message produced by this actor
186-
fn connect_sink(&mut self, config: Config, peer: &impl MessageSink<M>) {
187-
self.register_peer(config, peer.get_sender());
188-
}
190+
/// Connect a peer actor that will consume the messages produced by this actor
191+
///
192+
/// The messages will be sent to the peer using its sender, the `peer.get_sender()`.
193+
/// A peer can subscribe to a subset of the messages produced by this source.
194+
/// This subset of messages expected by the peer is defined by the `config` parameter.
195+
fn connect_sink(&mut self, config: Config, peer: &impl MessageSink<M>);
189196
}
190197

191198
/// The [Builder] of an [Actor](crate::Actor) must implement this trait
@@ -290,8 +297,8 @@ pub trait RuntimeRequestSink {
290297
/// # }
291298
/// # }
292299
/// # impl MessageSource<MyActorOutput, NoConfig> for MyActorBuilder {
293-
/// # fn register_peer(&mut self, config: NoConfig, sender: DynSender<MyActorOutput>) {
294-
/// # self.messages.register_peer(config, sender)
300+
/// # fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<MyActorOutput>) {
301+
/// # self.messages.connect_sink(config, peer)
295302
/// # }
296303
/// # }
297304
/// # impl MessageSink<MyActorInput> for MyActorBuilder {
@@ -330,8 +337,8 @@ pub trait RuntimeRequestSink {
330337
/// // Connect a test box to an actor under test
331338
/// let mut my_actor_builder = MyActorBuilder::new(MyActorConfig::default());
332339
/// let mut test_box_builder = SimpleMessageBoxBuilder::new("Test box", 16);
333-
/// my_actor_builder.register_peer(NoConfig, test_box_builder.get_sender());
334-
/// test_box_builder.register_peer(NoConfig, my_actor_builder.get_sender());
340+
/// my_actor_builder.connect_sink(NoConfig, &test_box_builder);
341+
/// my_actor_builder.connect_source(NoConfig, &mut test_box_builder);
335342
///
336343
/// // Build the test box and run the actor
337344
/// let mut test_box = test_box_builder.build();
@@ -379,8 +386,8 @@ impl<I: Message, O: Message> SimpleMessageBoxBuilder<I, O> {
379386
config: Config,
380387
service: &mut (impl MessageSink<O> + MessageSource<I, Config>),
381388
) {
382-
service.register_peer(config, self.input_sender.sender_clone());
383-
self.register_peer(NoConfig, service.get_sender());
389+
service.connect_sink(config, self);
390+
self.connect_sink(NoConfig, service);
384391
}
385392

386393
/// Connect this client message box to the service message box
@@ -399,8 +406,8 @@ impl<I: Message, O: Message> SimpleMessageBoxBuilder<I, O> {
399406

400407
/// A `SimpleMessageBoxBuilder<Input,Output>` is a [MessageSource] of `Output` messages ignoring the config.
401408
impl<I: Message, O: Message, C> MessageSource<O, C> for SimpleMessageBoxBuilder<I, O> {
402-
fn register_peer(&mut self, _config: C, sender: DynSender<O>) {
403-
self.output_sender = sender;
409+
fn connect_sink(&mut self, _config: C, peer: &impl MessageSink<O>) {
410+
self.output_sender = peer.get_sender();
404411
}
405412
}
406413

crates/core/tedge_actors/src/converter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,8 @@ impl<C: Converter> Builder<ConvertingActor<C>> for ConvertingActorBuilder<C> {
259259
}
260260

261261
impl<C: Converter> MessageSource<C::Output, NoConfig> for ConvertingActorBuilder<C> {
262-
fn register_peer(&mut self, config: NoConfig, sender: DynSender<C::Output>) {
263-
self.message_box.register_peer(config, sender)
262+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<C::Output>) {
263+
self.message_box.connect_sink(config, peer)
264264
}
265265
}
266266

crates/core/tedge_actors/src/examples/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,13 @@
6161
//! // - receives as input, the output messages sent from the server message box
6262
//! // - sends output messages to the server message box as its input.
6363
//! let request_sender = server_box_builder.add_requester(player_1_box_builder.get_sender());
64-
//! player_1_box_builder.register_peer(NoConfig, request_sender);
64+
//! player_1_box_builder.connect_sink(NoConfig, &request_sender);
6565
//!
6666
//! // It matters that the builder of the server box is a `ServerMessageBoxBuilder`:
6767
//! // as this builder accepts multiple client actors to connect to the same server.
6868
//! let mut player_2_box_builder = SimpleMessageBoxBuilder::new("Player 2", 1);
6969
//! let request_sender = server_box_builder.add_requester(player_2_box_builder.get_sender());
70-
//! player_2_box_builder.register_peer(NoConfig, request_sender);
70+
//! player_2_box_builder.connect_sink(NoConfig, &request_sender);
7171
//!
7272
//! // One can then build the message boxes
7373
//! let server_box: ServerMessageBox<Operation,Update> = server_box_builder.build();

crates/core/tedge_actors/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@
213213
//! ///
214214
//! /// The source registers the new consumer and its sender (i.e. where to send response),
215215
//! /// possibly using the configuration `config` to filter messages.
216-
//! fn register_peer(&mut self, config: SomeConfig, sender: DynSender<SomeMessage>) {
216+
//! fn connect_sink(&mut self, config: SomeConfig, peer: &impl MessageSink<SomeMessage>) {
217217
//! todo!()
218218
//! }
219219
//! }
@@ -231,7 +231,7 @@
231231
//! // can then be connected to each other.
232232
//! let mut producer = SomeActorBuilder::default();
233233
//! let mut consumer = SomeOtherActorBuilder::default();
234-
//! producer.register_peer(SomeConfig, consumer.get_sender());
234+
//! producer.connect_sink(SomeConfig, &consumer);
235235
//! ```
236236
//!
237237
//! ## Running actors

crates/core/tedge_actors/src/test_helpers.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,9 @@ impl<I: MessagePlus, O: MessagePlus> Probe<I, O> {
520520
source: &mut impl MessageSource<I, C>,
521521
sink: &mut impl MessageSink<O>,
522522
) {
523+
let input_interceptor: DynSender<I> = self.input_interceptor.clone().into();
523524
self.output_forwarder = sink.get_sender();
524-
source.register_peer(config, self.input_interceptor.clone().into());
525+
source.connect_sink(config, &input_interceptor);
525526
}
526527

527528
/// Connect this probe to a service provider
@@ -638,15 +639,16 @@ where
638639
&'a mut self,
639640
probe: &'a mut Probe<Response, Request>,
640641
) -> &'a mut Probe<Response, Request> {
642+
let output_interceptor: DynSender<Request> = probe.output_interceptor.clone().into();
641643
probe.input_forwarder = self.get_sender();
642-
self.register_peer(NoConfig, probe.output_interceptor.sender_clone());
644+
self.connect_sink(NoConfig, &output_interceptor);
643645
probe
644646
}
645647
}
646648

647649
impl<I: MessagePlus, O: MessagePlus> MessageSource<O, NoConfig> for Probe<I, O> {
648-
fn register_peer(&mut self, _config: NoConfig, sender: DynSender<O>) {
649-
self.output_forwarder = sender;
650+
fn connect_sink(&mut self, _config: NoConfig, peer: &impl MessageSink<O>) {
651+
self.output_forwarder = peer.get_sender();
650652
}
651653
}
652654

@@ -666,11 +668,11 @@ impl<I: Message, O: Message> ServiceProviderExt<I, O> for DynSender<RequestEnvel
666668
let name = "client-box";
667669
let capacity = 16;
668670
let mut client_box = SimpleMessageBoxBuilder::new(name, capacity);
669-
let request_sender = RequestSender {
671+
let request_sender = Box::new(RequestSender {
670672
sender: self.sender_clone(),
671673
reply_to: client_box.get_sender(),
672-
};
673-
client_box.register_peer(NoConfig, request_sender.into());
674+
});
675+
client_box.connect_sink(NoConfig, &request_sender.sender_clone());
674676
client_box.build()
675677
}
676678
}
@@ -686,8 +688,8 @@ impl<I: Message, O: Message> ServiceProviderExt<I, O> for SimpleMessageBoxBuilde
686688
let name = "client-box";
687689
let capacity = 16;
688690
let mut client_box = SimpleMessageBoxBuilder::new(name, capacity);
689-
self.register_peer(NoConfig, client_box.get_sender());
690-
client_box.register_peer(NoConfig, self.get_sender());
691+
self.connect_sink(NoConfig, &client_box);
692+
self.connect_source(NoConfig, &mut client_box);
691693
client_box.build()
692694
}
693695
}

crates/core/tedge_agent/src/operation_file_cache/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,10 +298,7 @@ impl FileCacheActorBuilder {
298298
downloader_actor.add_requester(message_box.get_sender().sender_clone());
299299

300300
let mqtt_sender = mqtt_actor.get_sender();
301-
mqtt_actor.register_peer(
302-
Self::subscriptions(&mqtt_schema),
303-
message_box.get_sender().sender_clone(),
304-
);
301+
mqtt_actor.connect_sink(Self::subscriptions(&mqtt_schema), &message_box.get_sender());
305302

306303
Self {
307304
message_box,

crates/core/tedge_agent/src/restart_manager/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ impl MessageSink<RestartCommand> for RestartManagerBuilder {
3434
}
3535

3636
impl MessageSource<RestartCommand, NoConfig> for RestartManagerBuilder {
37-
fn register_peer(&mut self, config: NoConfig, sender: DynSender<RestartCommand>) {
38-
self.message_box.register_peer(config, sender)
37+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<RestartCommand>) {
38+
self.message_box.connect_sink(config, peer)
3939
}
4040
}
4141

crates/core/tedge_agent/src/restart_manager/tests.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use tedge_actors::Actor;
88
use tedge_actors::Builder;
99
use tedge_actors::DynError;
1010
use tedge_actors::MessageReceiver;
11-
use tedge_actors::MessageSink;
1211
use tedge_actors::MessageSource;
1312
use tedge_actors::NoConfig;
1413
use tedge_actors::Sender;
@@ -120,8 +119,8 @@ async fn spawn_restart_manager(
120119
};
121120

122121
let mut restart_actor_builder = RestartManagerBuilder::new(config);
123-
converter_builder.register_peer(NoConfig, restart_actor_builder.get_sender());
124-
restart_actor_builder.register_peer(NoConfig, converter_builder.get_sender());
122+
converter_builder.connect_sink(NoConfig, &restart_actor_builder);
123+
restart_actor_builder.connect_sink(NoConfig, &converter_builder);
125124

126125
let converter_box = converter_builder.build().with_timeout(TEST_TIMEOUT_MS);
127126

crates/core/tedge_agent/src/software_manager/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ impl MessageSink<SoftwareCommand> for SoftwareManagerBuilder {
3434
}
3535

3636
impl MessageSource<SoftwareCommand, NoConfig> for SoftwareManagerBuilder {
37-
fn register_peer(&mut self, config: NoConfig, sender: DynSender<SoftwareCommand>) {
38-
self.message_box.register_peer(config, sender)
37+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<SoftwareCommand>) {
38+
self.message_box.connect_sink(config, peer)
3939
}
4040
}
4141

0 commit comments

Comments
 (0)