Skip to content

Commit d64e795

Browse files
committed
Split trait Sender into Sender + CloneSender
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent fb1650d commit d64e795

File tree

9 files changed

+89
-73
lines changed

9 files changed

+89
-73
lines changed

crates/core/tedge_actors/src/builders.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@
7777
//! using an `impl From<SourceMessage> for SinkMessage`. This flexibility allows an actor to receive
7878
//! messages from several independent sources (see the [fan_in_message_type](crate::fan_in_message_type) macro).
7979
use crate::mpsc;
80+
use crate::CloneSender;
8081
use crate::DynSender;
8182
use crate::LoggingReceiver;
8283
use crate::LoggingSender;
8384
use crate::MappingSender;
8485
use crate::Message;
8586
use crate::NullSender;
8687
use crate::RuntimeRequest;
87-
use crate::Sender;
8888
use crate::SimpleMessageBox;
8989
use std::convert::Infallible;
9090
use std::fmt::Debug;

crates/core/tedge_actors/src/channels.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,38 @@ use futures::SinkExt;
77

88
/// A sender of messages of type `M`
99
///
10-
/// Actors don't access directly the `mpsc::Sender` of their peers,
10+
/// Actors don't get direct access to the `mpsc::Sender` of their peers,
1111
/// but use intermediate senders that adapt the messages when sent.
12-
pub type DynSender<M> = Box<dyn Sender<M>>;
12+
pub type DynSender<M> = Box<dyn CloneSender<M>>;
1313

1414
#[async_trait]
1515
pub trait Sender<M>: 'static + Send + Sync {
1616
/// Send a message to the receiver behind this sender,
1717
/// returning an error if the receiver is no more expecting messages
1818
async fn send(&mut self, message: M) -> Result<(), ChannelError>;
19+
}
1920

21+
pub trait CloneSender<M>: Sender<M> {
2022
/// Clone this sender in order to send messages to the same receiver from another actor
2123
fn sender_clone(&self) -> DynSender<M>;
24+
25+
/// Clone a cast of this sender into a `Box<dyn Sender<M>>`
26+
///
27+
/// This is a workaround for https://github.com/rust-lang/rust/issues/65991
28+
fn sender(&self) -> Box<dyn Sender<M>>;
29+
}
30+
31+
impl<M, S: Clone + Sender<M>> CloneSender<M> for S {
32+
fn sender_clone(&self) -> DynSender<M> {
33+
Box::new(self.clone())
34+
}
35+
36+
fn sender(&self) -> Box<dyn Sender<M>> {
37+
Box::new(self.clone())
38+
}
2239
}
2340

24-
impl<M: Message> Clone for DynSender<M> {
41+
impl<M: 'static> Clone for DynSender<M> {
2542
fn clone(&self) -> Self {
2643
self.sender_clone()
2744
}
@@ -39,10 +56,6 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
3956
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
4057
Ok(SinkExt::send(&mut self, message.into()).await?)
4158
}
42-
43-
fn sender_clone(&self) -> DynSender<N> {
44-
Box::new(self.clone())
45-
}
4659
}
4760

4861
/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
@@ -88,24 +101,17 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
88101
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
89102
Ok(self.as_mut().send(message.into()).await?)
90103
}
91-
92-
fn sender_clone(&self) -> DynSender<N> {
93-
Box::new(self.as_ref().sender_clone())
94-
}
95104
}
96105

97106
/// A sender that discards messages instead of sending them
107+
#[derive(Clone)]
98108
pub struct NullSender;
99109

100110
#[async_trait]
101111
impl<M: Message> Sender<M> for NullSender {
102112
async fn send(&mut self, _message: M) -> Result<(), ChannelError> {
103113
Ok(())
104114
}
105-
106-
fn sender_clone(&self) -> DynSender<M> {
107-
Box::new(NullSender)
108-
}
109115
}
110116

111117
impl<M: Message> From<NullSender> for DynSender<M> {
@@ -120,6 +126,15 @@ pub struct MappingSender<F, M> {
120126
cast: std::sync::Arc<F>,
121127
}
122128

129+
impl<F, M: 'static> Clone for MappingSender<F, M> {
130+
fn clone(&self) -> Self {
131+
MappingSender {
132+
inner: self.inner.clone(),
133+
cast: self.cast.clone(),
134+
}
135+
}
136+
}
137+
123138
impl<F, M> MappingSender<F, M> {
124139
pub fn new(inner: DynSender<M>, cast: F) -> Self {
125140
MappingSender {
@@ -144,13 +159,6 @@ where
144159
}
145160
Ok(())
146161
}
147-
148-
fn sender_clone(&self) -> DynSender<M> {
149-
Box::new(MappingSender {
150-
inner: self.inner.sender_clone(),
151-
cast: self.cast.clone(),
152-
})
153-
}
154162
}
155163

156164
impl<M, N, NS, F> From<MappingSender<F, N>> for DynSender<M>

crates/core/tedge_actors/src/message_boxes.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
//!
9292
use crate::channels::Sender;
9393
use crate::ChannelError;
94+
use crate::CloneSender;
9495
use crate::DynSender;
9596
use crate::Message;
9697
use crate::RuntimeRequest;
@@ -218,34 +219,27 @@ pub struct LoggingSender<Output> {
218219
sender: DynSender<Output>,
219220
}
220221

221-
impl<Output> LoggingSender<Output> {
222-
pub fn new(name: String, sender: DynSender<Output>) -> Self {
223-
Self { name, sender }
224-
}
225-
}
226-
227222
impl<Output: 'static> Clone for LoggingSender<Output> {
228223
fn clone(&self) -> Self {
229-
Self {
224+
LoggingSender {
230225
name: self.name.clone(),
231226
sender: self.sender.sender_clone(),
232227
}
233228
}
234229
}
235230

231+
impl<Output> LoggingSender<Output> {
232+
pub fn new(name: String, sender: DynSender<Output>) -> Self {
233+
Self { name, sender }
234+
}
235+
}
236+
236237
#[async_trait]
237-
impl<Output: Debug + Send + Sync + 'static> Sender<Output> for LoggingSender<Output> {
238+
impl<Output: Message + Debug> Sender<Output> for LoggingSender<Output> {
238239
async fn send(&mut self, message: Output) -> Result<(), ChannelError> {
239240
log_message_sent(&self.name, &message);
240241
self.sender.send(message).await
241242
}
242-
243-
fn sender_clone(&self) -> DynSender<Output> {
244-
Box::new(LoggingSender {
245-
name: self.name.clone(),
246-
sender: self.sender.clone(),
247-
})
248-
}
249243
}
250244

251245
pub fn log_message_sent<I: Debug>(target: &str, message: I) {
@@ -331,7 +325,7 @@ impl<Input: Send + Debug> MessageReceiver<Input> for UnboundedLoggingReceiver<In
331325
/// - Log sent messages when pushed into the target box
332326
///
333327
/// Such a box is connected to peer actors using a [SimpleMessageBoxBuilder](crate::SimpleMessageBoxBuilder).
334-
pub struct SimpleMessageBox<Input: Debug, Output> {
328+
pub struct SimpleMessageBox<Input: Debug, Output: Debug> {
335329
input_receiver: LoggingReceiver<Input>,
336330
output_sender: LoggingSender<Output>,
337331
}
@@ -390,9 +384,15 @@ impl<Input: Message, Output: Message> Sender<Output> for SimpleMessageBox<Input,
390384
async fn send(&mut self, message: Output) -> Result<(), ChannelError> {
391385
self.output_sender.send(message).await
392386
}
387+
}
393388

389+
impl<Input: Message, Output: Message> CloneSender<Output> for SimpleMessageBox<Input, Output> {
394390
fn sender_clone(&self) -> DynSender<Output> {
395-
self.output_sender.sender_clone()
391+
CloneSender::sender_clone(&self.output_sender)
392+
}
393+
394+
fn sender(&self) -> Box<dyn Sender<Output>> {
395+
CloneSender::sender(&self.output_sender)
396396
}
397397
}
398398

crates/core/tedge_actors/src/servers/builders.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::mpsc;
22
use crate::Actor;
33
use crate::Builder;
44
use crate::ClientId;
5+
use crate::CloneSender;
56
use crate::ConcurrentServerActor;
67
use crate::ConcurrentServerMessageBox;
78
use crate::DynSender;
@@ -13,7 +14,6 @@ use crate::NoConfig;
1314
use crate::RuntimeError;
1415
use crate::RuntimeRequest;
1516
use crate::RuntimeRequestSink;
16-
use crate::Sender;
1717
use crate::SenderVec;
1818
use crate::Server;
1919
use crate::ServerActor;

crates/core/tedge_actors/src/servers/keyed_messages.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,20 @@ use crate::Sender;
66
use async_trait::async_trait;
77

88
/// A sender that adds a key to messages on the fly
9-
pub struct KeyedSender<K: Message + Clone, M: Message> {
9+
pub struct KeyedSender<K, M> {
1010
key: K,
1111
sender: mpsc::Sender<(K, M)>,
1212
}
1313

14+
impl<K: Clone, M> Clone for KeyedSender<K, M> {
15+
fn clone(&self) -> Self {
16+
KeyedSender {
17+
key: self.key.clone(),
18+
sender: self.sender.clone(),
19+
}
20+
}
21+
}
22+
1423
impl<K: Message + Clone, M: Message> KeyedSender<K, M> {
1524
pub fn new_sender(key: K, sender: mpsc::Sender<(K, M)>) -> DynSender<M> {
1625
Box::new(KeyedSender { key, sender })
@@ -22,20 +31,21 @@ impl<K: Message + Clone, M: Message> Sender<M> for KeyedSender<K, M> {
2231
async fn send(&mut self, message: M) -> Result<(), ChannelError> {
2332
self.sender.send((self.key.clone(), message)).await
2433
}
25-
26-
fn sender_clone(&self) -> DynSender<M> {
27-
Box::new(KeyedSender {
28-
key: self.key.clone(),
29-
sender: self.sender.clone(),
30-
})
31-
}
3234
}
3335

3436
/// A vector of senders addressed using a sender id attached to each message
35-
pub struct SenderVec<M: Message> {
37+
pub struct SenderVec<M> {
3638
senders: Vec<DynSender<M>>,
3739
}
3840

41+
impl<M: 'static> Clone for SenderVec<M> {
42+
fn clone(&self) -> Self {
43+
SenderVec {
44+
senders: self.senders.clone(),
45+
}
46+
}
47+
}
48+
3949
impl<M: Message> SenderVec<M> {
4050
pub fn new_sender(senders: Vec<DynSender<M>>) -> DynSender<(usize, M)> {
4151
Box::new(SenderVec { senders })
@@ -51,9 +61,4 @@ impl<M: Message> Sender<(usize, M)> for SenderVec<M> {
5161
}
5262
Ok(())
5363
}
54-
55-
fn sender_clone(&self) -> DynSender<(usize, M)> {
56-
let senders = self.senders.iter().map(|r| r.sender_clone()).collect();
57-
Box::new(SenderVec { senders })
58-
}
5964
}

crates/core/tedge_actors/src/servers/message_boxes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub type ServerMessageBox<Request, Response> =
2222
pub type ClientId = usize;
2323

2424
/// A message box for services that handles requests concurrently
25-
pub struct ConcurrentServerMessageBox<Request: Debug, Response> {
25+
pub struct ConcurrentServerMessageBox<Request: Debug, Response: Debug> {
2626
/// Max concurrent requests
2727
max_concurrency: usize,
2828

@@ -119,7 +119,7 @@ impl<Request: Message, Response: Message> ConcurrentServerMessageBox<Request, Re
119119
/// and synchronously wait for its response using the `await_response` function.
120120
///
121121
/// Note that this message box sends requests and receive responses.
122-
pub struct ClientMessageBox<Request, Response: Debug> {
122+
pub struct ClientMessageBox<Request: Message, Response: Message + Debug> {
123123
messages: SimpleMessageBox<Response, Request>,
124124
}
125125

crates/core/tedge_actors/src/test_helpers.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,15 @@ pub struct TimedMessageBox<T> {
318318
inner: T,
319319
}
320320

321+
impl<T: Clone> Clone for TimedMessageBox<T> {
322+
fn clone(&self) -> Self {
323+
TimedMessageBox {
324+
timeout: self.timeout,
325+
inner: self.inner.clone(),
326+
}
327+
}
328+
}
329+
321330
#[async_trait]
322331
impl<T, M> MessageReceiver<M> for TimedMessageBox<T>
323332
where
@@ -358,10 +367,6 @@ where
358367
async fn send(&mut self, message: M) -> Result<(), ChannelError> {
359368
self.inner.send(message).await
360369
}
361-
362-
fn sender_clone(&self) -> DynSender<M> {
363-
self.inner.sender_clone()
364-
}
365370
}
366371

367372
impl<T> AsRef<T> for TimedMessageBox<T> {

crates/extensions/c8y_mapper_ext/src/converter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,7 @@ pub(crate) mod tests {
16131613
use serde_json::Value;
16141614
use std::str::FromStr;
16151615
use tedge_actors::Builder;
1616+
use tedge_actors::CloneSender;
16161617
use tedge_actors::LoggingSender;
16171618
use tedge_actors::MessageReceiver;
16181619
use tedge_actors::Sender;

crates/extensions/tedge_timer_ext/src/builder.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,14 @@ struct TimeoutSender<T: Message> {
7272
inner: DynSender<Timeout<T>>,
7373
}
7474

75+
impl<T: Message> Clone for TimeoutSender<T> {
76+
fn clone(&self) -> Self {
77+
TimeoutSender {
78+
inner: self.inner.clone(),
79+
}
80+
}
81+
}
82+
7583
#[async_trait]
7684
impl<T: Message> Sender<Timeout<AnyPayload>> for TimeoutSender<T> {
7785
async fn send(&mut self, message: Timeout<AnyPayload>) -> Result<(), ChannelError> {
@@ -80,18 +88,13 @@ impl<T: Message> Sender<Timeout<AnyPayload>> for TimeoutSender<T> {
8088
}
8189
Ok(())
8290
}
83-
84-
fn sender_clone(&self) -> DynSender<Timeout<AnyPayload>> {
85-
Box::new(TimeoutSender {
86-
inner: self.inner.sender_clone(),
87-
})
88-
}
8991
}
9092

9193
/// A Sender that translates timeout requests on the wire
9294
///
9395
/// This sender receives `SetTimeout<T>` requests from some actor,
9496
/// and translates then forwards these messages to the timer actor expecting`Timeout<AnyPayload>`
97+
#[derive(Clone)]
9598
struct SetTimeoutSender {
9699
inner: DynSender<SetTimeout<AnyPayload>>,
97100
}
@@ -103,10 +106,4 @@ impl<T: Message> Sender<SetTimeout<T>> for SetTimeoutSender {
103106
let event: AnyPayload = Box::new(request.event);
104107
self.inner.send(SetTimeout { duration, event }).await
105108
}
106-
107-
fn sender_clone(&self) -> DynSender<SetTimeout<T>> {
108-
Box::new(SetTimeoutSender {
109-
inner: self.inner.sender_clone(),
110-
})
111-
}
112109
}

0 commit comments

Comments
 (0)