Skip to content

Commit f54ceeb

Browse files
committed
Simplify the logic to adapt dyn senders
- The `Clone` trait is no more implemented by `DynSender` This was the main reason preventing the implementation of useful `Into` convertions between misc `DynSender`. - The `adapt()` function as been deprecated as this was a workaround the lack of `Into` convertions between misc `DynSender`. - The `sender_clone()` method is to be used both to `clone` and adapt a sender. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent f347e57 commit f54ceeb

File tree

9 files changed

+76
-88
lines changed

9 files changed

+76
-88
lines changed

crates/core/tedge_actors/src/actors.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ pub mod tests {
120120
let actor_task = spawn(async move { actor.run().await });
121121

122122
spawn(async move {
123-
let mut sender: DynSender<&str> = adapt(&input_sender);
123+
let mut sender: DynSender<&str> = input_sender.sender_clone();
124124
sender.send("Do this").await.expect("sent");
125125
sender.send("Do nothing").await.expect("sent");
126126
sender.send("Do that and this").await.expect("sent");
@@ -190,8 +190,8 @@ pub mod tests {
190190
impl SpecificMessageBox {
191191
fn new_box(capacity: usize, output: DynSender<DoMsg>) -> (DynSender<String>, Self) {
192192
let (sender, input) = mpsc::channel(capacity);
193-
let peer_1 = adapt(&output);
194-
let peer_2 = adapt(&output);
193+
let peer_1 = output.sender_clone();
194+
let peer_2 = output.sender_clone();
195195
let message_box = SpecificMessageBox {
196196
input,
197197
peer_1,

crates/core/tedge_actors/src/builders.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ pub trait MessageSink<M: Message, Config> {
130130
N: Message,
131131
M: From<N>,
132132
{
133-
source.register_peer(self.get_config(), crate::adapt(&self.get_sender()))
133+
source.register_peer(self.get_config(), self.get_sender().sender_clone())
134134
}
135135

136136
/// Add a source of messages to the actor under construction, the messages being translated on the fly.

crates/core/tedge_actors/src/channels.rs

Lines changed: 31 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -39,41 +39,55 @@ impl<M, S: Clone + Sender<M>> CloneSender<M> for S {
3939
}
4040
}
4141

42-
impl<M: 'static> Clone for DynSender<M> {
43-
fn clone(&self) -> Self {
44-
self.sender_clone()
42+
impl<M, S: Clone + Sender<M>> From<S> for DynSender<M> {
43+
fn from(sender: S) -> Self {
44+
Box::new(sender)
4545
}
4646
}
4747

48-
/// An `mpsc::Sender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
49-
impl<M: Message, N: Message + Into<M>> From<mpsc::Sender<M>> for DynSender<N> {
50-
fn from(sender: mpsc::Sender<M>) -> Self {
51-
Box::new(sender)
48+
/// A `DynSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
49+
#[async_trait]
50+
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
51+
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
52+
Ok(self.as_mut().send(message.into()).await?)
5253
}
5354
}
5455

5556
#[async_trait]
56-
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
57+
impl<M: Message, N: Message + Into<M>> Sender<N> for Box<dyn Sender<M>> {
5758
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
58-
Ok(SinkExt::send(&mut self, message.into()).await?)
59+
Ok(self.as_mut().send(message.into()).await?)
5960
}
6061
}
6162

62-
/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
63-
impl<M: Message, N: Message + Into<M>> From<mpsc::UnboundedSender<M>> for DynSender<N> {
64-
fn from(sender: mpsc::UnboundedSender<M>) -> Self {
65-
Box::new(sender)
63+
#[async_trait]
64+
impl<M: Message, N: Message + Into<M>> CloneSender<N> for DynSender<M> {
65+
fn sender_clone(&self) -> DynSender<N> {
66+
Box::new(self.as_ref().sender_clone())
67+
}
68+
69+
fn sender(&self) -> Box<dyn Sender<N>> {
70+
Box::new(self.as_ref().sender())
71+
}
72+
}
73+
74+
/// An `mpsc::Sender<M>` is a `DynSender<M>`
75+
#[async_trait]
76+
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
77+
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
78+
Ok(SinkExt::send(&mut self, message.into()).await?)
6679
}
6780
}
6881

82+
/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
6983
#[async_trait]
7084
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::UnboundedSender<M> {
7185
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
7286
Ok(SinkExt::send(&mut self, message.into()).await?)
7387
}
7488
}
7589

76-
/// An `oneshot::Sender<M>` is a `Sender<N>` provided `N` implements `Into<M>`
90+
/// A `oneshot::Sender<M>` is a `Sender<N>` provided `N` implements `Into<M>`
7791
///
7892
/// There is one caveat. The `oneshot::Sender::send()` method consumes the sender,
7993
/// hence the one shot sender is wrapped inside an `Option`.
@@ -92,29 +106,6 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for Option<oneshot::Sender<M>>
92106
}
93107
}
94108

95-
/// Make a `DynSender<N>` from a `DynSender<M>`
96-
///
97-
/// This is a workaround to the fact the compiler rejects a From implementation:
98-
///
99-
/// ```shell
100-
///
101-
/// impl<M: Message, N: Message + Into<M>> From<DynSender<M>> for DynSender<N> {
102-
/// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
103-
/// |
104-
/// = note: conflicting implementation in crate `core`:
105-
/// - impl<T> From<T> for T;
106-
/// ```
107-
pub fn adapt<M: Message, N: Message + Into<M>>(sender: &DynSender<M>) -> DynSender<N> {
108-
Box::new(sender.clone())
109-
}
110-
111-
#[async_trait]
112-
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
113-
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
114-
Ok(self.as_mut().send(message.into()).await?)
115-
}
116-
}
117-
118109
/// A sender that discards messages instead of sending them
119110
#[derive(Clone)]
120111
pub struct NullSender;
@@ -126,12 +117,6 @@ impl<M: Message> Sender<M> for NullSender {
126117
}
127118
}
128119

129-
impl<M: Message> From<NullSender> for DynSender<M> {
130-
fn from(sender: NullSender) -> Self {
131-
Box::new(sender)
132-
}
133-
}
134-
135120
/// A sender that transforms the messages on the fly
136121
pub struct MappingSender<F, M> {
137122
inner: DynSender<M>,
@@ -141,7 +126,7 @@ pub struct MappingSender<F, M> {
141126
impl<F, M: 'static> Clone for MappingSender<F, M> {
142127
fn clone(&self) -> Self {
143128
MappingSender {
144-
inner: self.inner.clone(),
129+
inner: self.inner.sender_clone(),
145130
cast: self.cast.clone(),
146131
}
147132
}
@@ -173,19 +158,6 @@ where
173158
}
174159
}
175160

176-
impl<M, N, NS, F> From<MappingSender<F, N>> for DynSender<M>
177-
where
178-
M: Message,
179-
N: Message,
180-
NS: Iterator<Item = N> + Send,
181-
F: Fn(M) -> NS,
182-
F: 'static + Sync + Send,
183-
{
184-
fn from(value: MappingSender<F, N>) -> Self {
185-
Box::new(value)
186-
}
187-
}
188-
189161
#[cfg(test)]
190162
mod tests {
191163
use super::*;
@@ -236,8 +208,8 @@ mod tests {
236208
impl From<DynSender<Msg>> for Peers {
237209
fn from(recipient: DynSender<Msg>) -> Self {
238210
Peers {
239-
peer_1: adapt(&recipient),
240-
peer_2: adapt(&recipient),
211+
peer_1: recipient.sender_clone(),
212+
peer_2: recipient.sender_clone(),
241213
}
242214
}
243215
}

crates/core/tedge_actors/src/servers/keyed_messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct SenderVec<M> {
4141
impl<M: 'static> Clone for SenderVec<M> {
4242
fn clone(&self) -> Self {
4343
SenderVec {
44-
senders: self.senders.clone(),
44+
senders: self.senders.iter().map(|s| s.sender_clone()).collect(),
4545
}
4646
}
4747
}

crates/core/tedge_actors/src/servers/requests.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,20 @@ impl<Request: Message, Response: Message> ClientMessageBox<Request, Response> {
4949

5050
/// A [Sender] used by a client to send requests to a server,
5151
/// redirecting the responses to another recipient.
52-
#[derive(Clone)]
5352
pub struct RequestSender<Request: 'static, Response: 'static> {
5453
sender: DynSender<RequestEnvelope<Request, Response>>,
5554
reply_to: DynSender<Response>,
5655
}
5756

57+
impl<Request, Response> Clone for RequestSender<Request, Response> {
58+
fn clone(&self) -> Self {
59+
RequestSender {
60+
sender: self.sender.sender_clone(),
61+
reply_to: self.reply_to.sender_clone(),
62+
}
63+
}
64+
}
65+
5866
#[async_trait]
5967
impl<Request: Message, Response: Message> Sender<Request> for RequestSender<Request, Response> {
6068
async fn send(&mut self, request: Request) -> Result<(), ChannelError> {
@@ -65,13 +73,6 @@ impl<Request: Message, Response: Message> Sender<Request> for RequestSender<Requ
6573
}
6674
}
6775

68-
/* Adding this prevents to derive Clone for RequestSender! Why?
69-
impl<Request: Message, Response: Message> From<RequestSender<Request,Response>> for DynSender<Request> {
70-
fn from(sender: RequestSender<Request,Response>) -> Self {
71-
Box::new(sender)
72-
}
73-
}*/
74-
7576
/// An actor that wraps a request-response server
7677
///
7778
/// Requests are processed in turn, leading either to a response or an error.

crates/core/tedge_agent/src/operation_file_cache/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ use camino::Utf8PathBuf;
1616
use std::collections::HashMap;
1717
use std::path::Path;
1818
use std::sync::Arc;
19-
use tedge_actors::adapt;
2019
use tedge_actors::fan_in_message_type;
2120
use tedge_actors::Actor;
2221
use tedge_actors::Builder;
22+
use tedge_actors::CloneSender;
2323
use tedge_actors::DynSender;
2424
use tedge_actors::LinkError;
2525
use tedge_actors::LoggingReceiver;
@@ -295,11 +295,11 @@ impl FileCacheActorBuilder {
295295
let message_box = SimpleMessageBoxBuilder::new("RestartManager", 10);
296296

297297
let download_sender =
298-
downloader_actor.connect_consumer(NoConfig, adapt(&message_box.get_sender()));
298+
downloader_actor.connect_consumer(NoConfig, message_box.get_sender().sender_clone());
299299

300300
let mqtt_sender = mqtt_actor.connect_consumer(
301301
Self::subscriptions(&mqtt_schema),
302-
adapt(&message_box.get_sender()),
302+
message_box.get_sender().sender_clone(),
303303
);
304304

305305
Self {

crates/extensions/c8y_mapper_ext/src/actor.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ use c8y_http_proxy::messages::C8YRestRequest;
1515
use c8y_http_proxy::messages::C8YRestResult;
1616
use std::path::PathBuf;
1717
use std::time::Duration;
18-
use tedge_actors::adapt;
1918
use tedge_actors::fan_in_message_type;
2019
use tedge_actors::Actor;
2120
use tedge_actors::Builder;
21+
use tedge_actors::CloneSender;
2222
use tedge_actors::DynSender;
2323
use tedge_actors::LoggingSender;
2424
use tedge_actors::MessageReceiver;
@@ -306,14 +306,21 @@ impl C8yMapperBuilder {
306306

307307
let box_builder = SimpleMessageBoxBuilder::new("CumulocityMapper", 16);
308308

309-
let mqtt_publisher =
310-
mqtt.connect_consumer(config.topics.clone(), adapt(&box_builder.get_sender()));
309+
let mqtt_publisher = mqtt.connect_consumer(
310+
config.topics.clone(),
311+
box_builder.get_sender().sender_clone(),
312+
);
311313
let http_proxy = C8YHttpProxy::new("C8yMapper => C8YHttpProxy", http);
312-
let timer_sender = timer.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
313-
let upload_sender = uploader.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
314+
let timer_sender =
315+
timer.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
316+
let upload_sender =
317+
uploader.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
314318
let download_sender =
315-
downloader.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
316-
fs_watcher.register_peer(config.ops_dir.clone(), adapt(&box_builder.get_sender()));
319+
downloader.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
320+
fs_watcher.register_peer(
321+
config.ops_dir.clone(),
322+
box_builder.get_sender().sender_clone(),
323+
);
317324
let auth_proxy = ProxyUrlGenerator::new(
318325
config.auth_proxy_addr.clone(),
319326
config.auth_proxy_port,

crates/extensions/tedge_log_manager/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ pub use actor::*;
99
pub use config::*;
1010
use log_manager::LogPluginConfig;
1111
use std::path::PathBuf;
12-
use tedge_actors::adapt;
1312
use tedge_actors::Builder;
13+
use tedge_actors::CloneSender;
1414
use tedge_actors::DynSender;
1515
use tedge_actors::LinkError;
1616
use tedge_actors::LoggingSender;
@@ -53,15 +53,15 @@ impl LogManagerBuilder {
5353
let box_builder = SimpleMessageBoxBuilder::new("Log Manager", 16);
5454
let mqtt_publisher = mqtt.connect_consumer(
5555
Self::subscriptions(&config),
56-
adapt(&box_builder.get_sender()),
56+
box_builder.get_sender().sender_clone(),
5757
);
5858
fs_notify.register_peer(
5959
LogManagerBuilder::watched_directory(&config),
60-
adapt(&box_builder.get_sender()),
60+
box_builder.get_sender().sender_clone(),
6161
);
6262

6363
let upload_sender =
64-
uploader_actor.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
64+
uploader_actor.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
6565

6666
Ok(Self {
6767
config,

crates/extensions/tedge_timer_ext/src/builder.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use async_trait::async_trait;
66
use std::convert::Infallible;
77
use tedge_actors::Builder;
88
use tedge_actors::ChannelError;
9+
use tedge_actors::CloneSender;
910
use tedge_actors::DynSender;
1011
use tedge_actors::Message;
1112
use tedge_actors::NoConfig;
@@ -75,7 +76,7 @@ struct TimeoutSender<T: Message> {
7576
impl<T: Message> Clone for TimeoutSender<T> {
7677
fn clone(&self) -> Self {
7778
TimeoutSender {
78-
inner: self.inner.clone(),
79+
inner: self.inner.sender_clone(),
7980
}
8081
}
8182
}
@@ -94,11 +95,18 @@ impl<T: Message> Sender<Timeout<AnyPayload>> for TimeoutSender<T> {
9495
///
9596
/// This sender receives `SetTimeout<T>` requests from some actor,
9697
/// and translates then forwards these messages to the timer actor expecting`Timeout<AnyPayload>`
97-
#[derive(Clone)]
9898
struct SetTimeoutSender {
9999
inner: DynSender<SetTimeout<AnyPayload>>,
100100
}
101101

102+
impl Clone for SetTimeoutSender {
103+
fn clone(&self) -> Self {
104+
SetTimeoutSender {
105+
inner: self.inner.sender_clone(),
106+
}
107+
}
108+
}
109+
102110
#[async_trait]
103111
impl<T: Message> Sender<SetTimeout<T>> for SetTimeoutSender {
104112
async fn send(&mut self, request: SetTimeout<T>) -> Result<(), ChannelError> {

0 commit comments

Comments
 (0)