Skip to content

Commit b2f0d0d

Browse files
Merge pull request #2600 from didier-wenzek/refactor/actor-request-handler
refactor: Simplify actor request handlers
2 parents b16996a + 0bd66fa commit b2f0d0d

File tree

46 files changed

+1637
-1345
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1637
-1345
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/tedge_actors/src/actors.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub mod tests {
7878
async fn running_an_actor_without_a_runtime() {
7979
let mut box_builder = SimpleMessageBoxBuilder::new("test", 16);
8080
let mut client_message_box = box_builder.new_client_box(NoConfig);
81+
let mut runtime_box = box_builder.get_signal_sender();
8182
let actor_message_box = box_builder.build();
8283
let actor = Echo {
8384
messages: actor_message_box,
@@ -93,7 +94,10 @@ pub mod tests {
9394
assert_eq!(client_message_box.recv().await, Some("World".to_string()));
9495

9596
// When there is no more input message senders
96-
client_message_box.close_sender();
97+
runtime_box
98+
.send(RuntimeRequest::Shutdown)
99+
.await
100+
.expect("fail to shutdown");
97101

98102
// The actor stops
99103
actor_task
@@ -116,7 +120,7 @@ pub mod tests {
116120
let actor_task = spawn(async move { actor.run().await });
117121

118122
spawn(async move {
119-
let mut sender: DynSender<&str> = adapt(&input_sender);
123+
let mut sender: DynSender<&str> = input_sender.sender_clone();
120124
sender.send("Do this").await.expect("sent");
121125
sender.send("Do nothing").await.expect("sent");
122126
sender.send("Do that and this").await.expect("sent");
@@ -186,8 +190,8 @@ pub mod tests {
186190
impl SpecificMessageBox {
187191
fn new_box(capacity: usize, output: DynSender<DoMsg>) -> (DynSender<String>, Self) {
188192
let (sender, input) = mpsc::channel(capacity);
189-
let peer_1 = adapt(&output);
190-
let peer_2 = adapt(&output);
193+
let peer_1 = output.sender_clone();
194+
let peer_2 = output.sender_clone();
191195
let message_box = SpecificMessageBox {
192196
input,
193197
peer_1,

crates/core/tedge_actors/src/builders.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@
7777
//! using an `impl From<SourceMessage> for SinkMessage`. This flexibility allows an actor to receive
7878
//! messages from several independent sources (see the [fan_in_message_type](crate::fan_in_message_type) macro).
7979
use crate::mpsc;
80+
use crate::CloneSender;
8081
use crate::DynSender;
8182
use crate::LoggingReceiver;
8283
use crate::LoggingSender;
8384
use crate::MappingSender;
8485
use crate::Message;
8586
use crate::NullSender;
8687
use crate::RuntimeRequest;
87-
use crate::Sender;
8888
use crate::SimpleMessageBox;
8989
use std::convert::Infallible;
9090
use std::fmt::Debug;
@@ -130,7 +130,7 @@ pub trait MessageSink<M: Message, Config> {
130130
N: Message,
131131
M: From<N>,
132132
{
133-
source.register_peer(self.get_config(), crate::adapt(&self.get_sender()))
133+
source.register_peer(self.get_config(), self.get_sender().sender_clone())
134134
}
135135

136136
/// Add a source of messages to the actor under construction, the messages being translated on the fly.

crates/core/tedge_actors/src/channels.rs

Lines changed: 64 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -3,128 +3,118 @@ use crate::ChannelError;
33
use crate::Message;
44
use async_trait::async_trait;
55
use futures::channel::mpsc;
6+
use futures::channel::oneshot;
67
use futures::SinkExt;
78

89
/// A sender of messages of type `M`
910
///
10-
/// Actors don't access directly the `mpsc::Sender` of their peers,
11+
/// Actors don't get direct access to the `mpsc::Sender` of their peers,
1112
/// but use intermediate senders that adapt the messages when sent.
12-
pub type DynSender<M> = Box<dyn Sender<M>>;
13+
pub type DynSender<M> = Box<dyn CloneSender<M>>;
1314

1415
#[async_trait]
1516
pub trait Sender<M>: 'static + Send + Sync {
1617
/// Send a message to the receiver behind this sender,
1718
/// returning an error if the receiver is no more expecting messages
1819
async fn send(&mut self, message: M) -> Result<(), ChannelError>;
20+
}
1921

22+
pub trait CloneSender<M>: Sender<M> {
2023
/// Clone this sender in order to send messages to the same receiver from another actor
2124
fn sender_clone(&self) -> DynSender<M>;
2225

23-
/// Closes this channel from the sender side, preventing any new messages.
24-
fn close_sender(&mut self);
26+
/// Clone a cast of this sender into a `Box<dyn Sender<M>>`
27+
///
28+
/// This is a workaround for https://github.com/rust-lang/rust/issues/65991
29+
fn sender(&self) -> Box<dyn Sender<M>>;
2530
}
2631

27-
impl<M: Message> Clone for DynSender<M> {
28-
fn clone(&self) -> Self {
29-
self.sender_clone()
32+
impl<M, S: Clone + Sender<M>> CloneSender<M> for S {
33+
fn sender_clone(&self) -> DynSender<M> {
34+
Box::new(self.clone())
35+
}
36+
37+
fn sender(&self) -> Box<dyn Sender<M>> {
38+
Box::new(self.clone())
3039
}
3140
}
3241

33-
/// An `mpsc::Sender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
34-
impl<M: Message, N: Message + Into<M>> From<mpsc::Sender<M>> for DynSender<N> {
35-
fn from(sender: mpsc::Sender<M>) -> Self {
42+
impl<M, S: Clone + Sender<M>> From<S> for DynSender<M> {
43+
fn from(sender: S) -> Self {
3644
Box::new(sender)
3745
}
3846
}
3947

48+
/// A `DynSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
4049
#[async_trait]
41-
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
50+
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
4251
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
43-
Ok(SinkExt::send(&mut self, message.into()).await?)
52+
Ok(self.as_mut().send(message.into()).await?)
53+
}
54+
}
55+
56+
#[async_trait]
57+
impl<M: Message, N: Message + Into<M>> Sender<N> for Box<dyn Sender<M>> {
58+
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
59+
Ok(self.as_mut().send(message.into()).await?)
4460
}
61+
}
4562

63+
#[async_trait]
64+
impl<M: Message, N: Message + Into<M>> CloneSender<N> for DynSender<M> {
4665
fn sender_clone(&self) -> DynSender<N> {
47-
Box::new(self.clone())
66+
Box::new(self.as_ref().sender_clone())
4867
}
4968

50-
fn close_sender(&mut self) {
51-
self.close_channel();
69+
fn sender(&self) -> Box<dyn Sender<N>> {
70+
Box::new(self.as_ref().sender())
5271
}
5372
}
5473

55-
/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
56-
impl<M: Message, N: Message + Into<M>> From<mpsc::UnboundedSender<M>> for DynSender<N> {
57-
fn from(sender: mpsc::UnboundedSender<M>) -> Self {
58-
Box::new(sender)
74+
/// An `mpsc::Sender<M>` is a `DynSender<M>`
75+
#[async_trait]
76+
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
77+
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
78+
Ok(SinkExt::send(&mut self, message.into()).await?)
5979
}
6080
}
6181

82+
/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
6283
#[async_trait]
6384
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::UnboundedSender<M> {
6485
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
6586
Ok(SinkExt::send(&mut self, message.into()).await?)
6687
}
67-
68-
fn sender_clone(&self) -> DynSender<N> {
69-
Box::new(self.clone())
70-
}
71-
72-
fn close_sender(&mut self) {
73-
self.close_channel();
74-
}
7588
}
7689

77-
/// Make a `DynSender<N>` from a `DynSender<M>`
78-
///
79-
/// This is a workaround to the fact the compiler rejects a From implementation:
90+
/// A `oneshot::Sender<M>` is a `Sender<N>` provided `N` implements `Into<M>`
8091
///
81-
/// ```shell
92+
/// There is one caveat. The `oneshot::Sender::send()` method consumes the sender,
93+
/// hence the one shot sender is wrapped inside an `Option`.
8294
///
83-
/// impl<M: Message, N: Message + Into<M>> From<DynSender<M>> for DynSender<N> {
84-
/// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
85-
/// |
86-
/// = note: conflicting implementation in crate `core`:
87-
/// - impl<T> From<T> for T;
88-
/// ```
89-
pub fn adapt<M: Message, N: Message + Into<M>>(sender: &DynSender<M>) -> DynSender<N> {
90-
Box::new(sender.clone())
91-
}
92-
95+
/// Such a [Sender] can only be used once:
96+
/// - it cannot be cloned
97+
/// - any message sent after a first one will be silently ignored
98+
/// - a message sent while the receiver has been drop will also be silently ignored
9399
#[async_trait]
94-
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
100+
impl<M: Message, N: Message + Into<M>> Sender<N> for Option<oneshot::Sender<M>> {
95101
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
96-
Ok(self.as_mut().send(message.into()).await?)
97-
}
98-
99-
fn sender_clone(&self) -> DynSender<N> {
100-
Box::new(self.as_ref().sender_clone())
101-
}
102-
103-
fn close_sender(&mut self) {
104-
self.as_mut().close_sender()
102+
if let Some(sender) = self.take() {
103+
let _ = sender.send(message.into());
104+
}
105+
Ok(())
105106
}
106107
}
107108

108109
/// A sender that discards messages instead of sending them
110+
#[derive(Clone)]
109111
pub struct NullSender;
110112

111113
#[async_trait]
112114
impl<M: Message> Sender<M> for NullSender {
113115
async fn send(&mut self, _message: M) -> Result<(), ChannelError> {
114116
Ok(())
115117
}
116-
117-
fn sender_clone(&self) -> DynSender<M> {
118-
Box::new(NullSender)
119-
}
120-
121-
fn close_sender(&mut self) {}
122-
}
123-
124-
impl<M: Message> From<NullSender> for DynSender<M> {
125-
fn from(sender: NullSender) -> Self {
126-
Box::new(sender)
127-
}
128118
}
129119

130120
/// A sender that transforms the messages on the fly
@@ -133,6 +123,15 @@ pub struct MappingSender<F, M> {
133123
cast: std::sync::Arc<F>,
134124
}
135125

126+
impl<F, M: 'static> Clone for MappingSender<F, M> {
127+
fn clone(&self) -> Self {
128+
MappingSender {
129+
inner: self.inner.sender_clone(),
130+
cast: self.cast.clone(),
131+
}
132+
}
133+
}
134+
136135
impl<F, M> MappingSender<F, M> {
137136
pub fn new(inner: DynSender<M>, cast: F) -> Self {
138137
MappingSender {
@@ -157,30 +156,6 @@ where
157156
}
158157
Ok(())
159158
}
160-
161-
fn sender_clone(&self) -> DynSender<M> {
162-
Box::new(MappingSender {
163-
inner: self.inner.sender_clone(),
164-
cast: self.cast.clone(),
165-
})
166-
}
167-
168-
fn close_sender(&mut self) {
169-
self.inner.as_mut().close_sender()
170-
}
171-
}
172-
173-
impl<M, N, NS, F> From<MappingSender<F, N>> for DynSender<M>
174-
where
175-
M: Message,
176-
N: Message,
177-
NS: Iterator<Item = N> + Send,
178-
F: Fn(M) -> NS,
179-
F: 'static + Sync + Send,
180-
{
181-
fn from(value: MappingSender<F, N>) -> Self {
182-
Box::new(value)
183-
}
184159
}
185160

186161
#[cfg(test)]
@@ -233,8 +208,8 @@ mod tests {
233208
impl From<DynSender<Msg>> for Peers {
234209
fn from(recipient: DynSender<Msg>) -> Self {
235210
Peers {
236-
peer_1: adapt(&recipient),
237-
peer_2: adapt(&recipient),
211+
peer_1: recipient.sender_clone(),
212+
peer_2: recipient.sender_clone(),
238213
}
239214
}
240215
}

0 commit comments

Comments
 (0)