Skip to content

Commit 9f522a6

Browse files
committed
Add a config parameter to MessageSink::add_input
This is first step toward removing the Source type parameter to MessageSink. Indeed, this type parameter and the get_config method prevents a sink to receive messages from several sources. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 9e51b3c commit 9f522a6

File tree

7 files changed

+25
-20
lines changed

7 files changed

+25
-20
lines changed

crates/core/tedge_actors/src/builders.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ pub trait MessageSink<M: Message, Config> {
109109
fn get_sender(&self) -> DynSender<M>;
110110

111111
/// Add a source of messages to the actor under construction
112-
fn add_input<N>(&mut self, source: &mut impl MessageSource<N, Config>)
112+
fn add_input<N, C>(&mut self, config: C, source: &mut impl MessageSource<N, C>)
113113
where
114114
N: Message,
115115
M: From<N>,
116116
{
117-
source.register_peer(self.get_config(), self.get_sender().sender_clone())
117+
source.register_peer(config, self.get_sender().sender_clone())
118118
}
119119

120120
/// Add a source of messages to the actor under construction, the messages being translated on the fly.
@@ -129,6 +129,7 @@ pub trait MessageSink<M: Message, Config> {
129129
/// # use tedge_actors::MessageReceiver;
130130
/// # use tedge_actors::MessageSink;
131131
/// # use tedge_actors::NoMessage;
132+
/// # use tedge_actors::NoConfig;
132133
/// # use tedge_actors::Sender;
133134
/// # use tedge_actors::SimpleMessageBox;
134135
/// # use tedge_actors::SimpleMessageBoxBuilder;
@@ -142,7 +143,7 @@ pub trait MessageSink<M: Message, Config> {
142143
/// let mut sender_builder = SimpleMessageBoxBuilder::new("Send", 16);
143144
///
144145
/// // Convert the `&str` sent by the source into an iterator of `char` as expected by the receiver.
145-
/// receiver_builder.add_mapped_input(&mut sender_builder, |str: &'static str| str.chars() );
146+
/// receiver_builder.add_mapped_input(NoConfig, &mut sender_builder, |str: &'static str| str.chars() );
146147
///
147148
/// let mut sender: SimpleMessageBox<NoMessage, &'static str>= sender_builder.build();
148149
/// let receiver: SimpleMessageBox<char, NoMessage> = receiver_builder.build();
@@ -161,9 +162,10 @@ pub trait MessageSink<M: Message, Config> {
161162
/// # Ok(())
162163
/// # }
163164
/// ```
164-
fn add_mapped_input<N, MS, MessageMapper>(
165+
fn add_mapped_input<N, C, MS, MessageMapper>(
165166
&mut self,
166-
source: &mut impl MessageSource<N, Config>,
167+
config: C,
168+
source: &mut impl MessageSource<N, C>,
167169
cast: MessageMapper,
168170
) where
169171
N: Message,
@@ -172,7 +174,7 @@ pub trait MessageSink<M: Message, Config> {
172174
MessageMapper: 'static + Send + Sync,
173175
{
174176
let sender = MappingSender::new(self.get_sender(), cast);
175-
source.register_peer(self.get_config(), sender.into())
177+
source.register_peer(config, sender.into())
176178
}
177179
}
178180

crates/core/tedge_actors/src/converter.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl<C: Converter> ConvertingActor<C> {
217217
/// TopicFilter("some/mqtt/topics/#"));
218218
///
219219
/// // Connect this actor as a sink of the mqtt actor, to receive input messages from it
220-
/// converter_builder.add_input(&mut mqtt_builder);
220+
/// converter_builder.add_input(converter_builder.get_config(), &mut mqtt_builder);
221221
///
222222
/// // Connect the same mqtt actor as a sink of this actor, to send output messages to it
223223
/// converter_builder.add_sink(&mut mqtt_builder);
@@ -272,13 +272,11 @@ impl<C: Converter, Config> MessageSource<C::Output, NoConfig>
272272
}
273273
}
274274

275-
impl<C: Converter, Config: Clone, SourceConfig> MessageSink<C::Input, SourceConfig>
275+
impl<C: Converter, Config: Clone> MessageSink<C::Input, Config>
276276
for ConvertingActorBuilder<C, Config>
277-
where
278-
SourceConfig: From<Config>,
279277
{
280-
fn get_config(&self) -> SourceConfig {
281-
self.config.clone().into()
278+
fn get_config(&self) -> Config {
279+
self.config.clone()
282280
}
283281

284282
fn get_sender(&self) -> DynSender<C::Input> {

crates/core/tedge_agent/src/agent.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,10 +432,14 @@ pub fn create_tedge_to_te_converter(
432432
.try_into()?;
433433

434434
// Tedge to Te converter
435-
let mut tedge_converter_actor =
436-
ConvertingActor::builder("TedgetoTeConverter", tedge_to_te_converter, subscriptions);
435+
let mut tedge_converter_actor: ConvertingActorBuilder<TedgetoTeConverter, TopicFilter> =
436+
ConvertingActor::builder(
437+
"TedgetoTeConverter",
438+
tedge_to_te_converter,
439+
subscriptions.clone(),
440+
);
437441

438-
tedge_converter_actor.add_input(mqtt_actor_builder);
442+
tedge_converter_actor.add_input(subscriptions, mqtt_actor_builder);
439443
tedge_converter_actor.add_sink(mqtt_actor_builder);
440444

441445
Ok(tedge_converter_actor)

crates/core/tedge_mapper/src/aws/mapper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl TEdgeComponent for AwsMapper {
4444
get_topic_filter(&tedge_config),
4545
);
4646

47-
aws_converting_actor.add_input(&mut mqtt_actor);
47+
aws_converting_actor.add_input(aws_converting_actor.get_config(), &mut mqtt_actor);
4848
aws_converting_actor.register_peer(NoConfig, mqtt_actor.get_sender());
4949

5050
runtime.spawn(aws_converting_actor).await?;

crates/core/tedge_mapper/src/az/mapper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl TEdgeComponent for AzureMapper {
3939
);
4040
let mut az_converting_actor =
4141
ConvertingActor::builder("AzConverter", az_converter, get_topic_filter(&tedge_config));
42-
az_converting_actor.add_input(&mut mqtt_actor);
42+
az_converting_actor.add_input(az_converting_actor.get_config(), &mut mqtt_actor);
4343

4444
az_converting_actor.register_peer(NoConfig, mqtt_actor.get_sender());
4545

crates/core/tedge_mapper/src/collectd/mapper.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use mqtt_channel::Topic;
88
use mqtt_channel::TopicFilter;
99
use std::path::Path;
1010
use tedge_actors::MessageSink;
11+
use tedge_actors::NoConfig;
1112
use tedge_config::TEdgeConfig;
1213

1314
const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd";
@@ -47,8 +48,8 @@ impl TEdgeComponent for CollectdMapper {
4748
let mut collectd_actor = CollectdActorBuilder::new(input_topic);
4849

4950
collectd_actor.add_input(&mut mqtt_actor);
50-
batching_actor.add_input(&mut collectd_actor);
51-
mqtt_actor.add_mapped_input(&mut batching_actor, move |batch| {
51+
batching_actor.add_input(NoConfig, &mut collectd_actor);
52+
mqtt_actor.add_mapped_input(NoConfig, &mut batching_actor, move |batch| {
5253
collectd_ext::converter::batch_into_mqtt_messages(&output_topic, batch).into_iter()
5354
});
5455

crates/extensions/c8y_mapper_ext/src/compatibility_adapter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl OldAgentAdapter {
4646
OldAgentAdapter,
4747
old_and_new_command_topics(),
4848
);
49-
builder.add_input(mqtt);
49+
builder.add_input(builder.get_config(), mqtt);
5050
builder.add_sink(mqtt);
5151
builder
5252
}

0 commit comments

Comments
 (0)