Skip to content

Commit 397dac9

Browse files
committed
Apply review suggestions
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
1 parent ce44a03 commit 397dac9

File tree

2 files changed

+47
-18
lines changed
  • crates
    • core/c8y_api/src/smartrest
    • extensions/tedge_mqtt_bridge/src

2 files changed

+47
-18
lines changed

crates/core/c8y_api/src/smartrest/topic.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ pub enum C8yTopic {
1414
SmartRestRequest,
1515
SmartRestResponse,
1616
ChildSmartRestResponse(String),
17-
// OperationTopic(String),
1817
}
1918

2019
impl C8yTopic {
@@ -51,7 +50,7 @@ impl C8yTopic {
5150
Self::SmartRestResponse => format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}"),
5251
Self::ChildSmartRestResponse(child_id) => {
5352
format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}/{child_id}")
54-
} // Self::OperationTopic(name) => name.into(),
53+
}
5554
}
5655
}
5756

crates/extensions/tedge_mqtt_bridge/src/lib.rs

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use tedge_actors::futures::channel::mpsc;
2323
use tedge_actors::Actor;
2424
use tedge_actors::Builder;
2525
use tedge_actors::DynSender;
26+
use tedge_actors::NullSender;
2627
use tedge_actors::RuntimeError;
2728
use tedge_actors::RuntimeRequest;
2829
use tedge_actors::RuntimeRequestSink;
@@ -40,9 +41,7 @@ pub use mqtt_channel::Topic;
4041
pub use mqtt_channel::TopicFilter;
4142
use tedge_config::TEdgeConfig;
4243

43-
pub struct MqttBridgeActorBuilder {
44-
signal_sender: mpsc::Sender<RuntimeRequest>,
45-
}
44+
pub struct MqttBridgeActorBuilder {}
4645

4746
impl MqttBridgeActorBuilder {
4847
pub async fn new(
@@ -56,7 +55,6 @@ impl MqttBridgeActorBuilder {
5655
tedge_config.device.cert_path.clone().into(),
5756
)
5857
.unwrap();
59-
let (signal_sender, _signal_receiver) = mpsc::channel(10);
6058

6159
let prefix = tedge_config.c8y.bridge.topic_prefix.clone();
6260
let mut local_config = MqttOptions::new(
@@ -67,7 +65,6 @@ impl MqttBridgeActorBuilder {
6765
let health_topic = main_device_health_topic(&service_name);
6866
// TODO cope with secured mosquitto
6967
local_config.set_manual_acks(true);
70-
// TODO const for payload
7168
local_config.set_last_will(LastWill::new(
7269
&health_topic,
7370
C8Y_BRIDGE_DOWN_PAYLOAD,
@@ -110,16 +107,18 @@ impl MqttBridgeActorBuilder {
110107
move |topic| topic.strip_prefix(&topic_prefix).unwrap().into(),
111108
msgs_local,
112109
None,
110+
"local",
113111
));
114112
tokio::spawn(half_bridge(
115113
cloud_event_loop,
116114
local_client,
117115
move |topic| format!("{prefix}/{topic}").into(),
118116
msgs_cloud,
119117
Some(health_topic),
118+
"cloud",
120119
));
121120

122-
Self { signal_sender }
121+
Self {}
123122
}
124123

125124
pub(crate) fn build_actor(self) -> MqttBridgeActor {
@@ -157,23 +156,53 @@ impl<'a, T> BidirectionalChannelHalf<T> {
157156
}
158157
}
159158

159+
/// Forward messages received from `recv_event_loop` to `target`
160+
///
161+
/// The result of running this function constitutes half the MQTT bridge, hence the name.
162+
/// Each half has two main responsibilities, one is to take messages received on the event
163+
/// loop and forward them to the target client, the other is to communicate with the corresponding
164+
/// half of the bridge to ensure published messages get acknowledged only when they have been
165+
/// fully processed.
166+
///
167+
/// # Message flow
168+
/// Messages in the bridge go through a few states
169+
/// 1. Received from the sending broker
170+
/// 2. Forwarded to the receiving broker
171+
/// 3. The receiving broker sends an acknowledgement
172+
/// 4. The original forwarded message is acknowledged now we know it is fully processed
173+
///
174+
/// Since the function is processing one [EventLoop], messages can be sent to the receiving broker,
175+
/// but we cannot receive acknowledgements from that broker, therefore a communication link must
176+
/// be established between the bridge halves. This link is the argument `corresponding_bridge_half`,
177+
/// which can both send and receive messages from the other bridge loop.
178+
///
179+
/// When a message is forwarded, the [Publish] is forwarded from this loop to the corresponding loop.
180+
/// This allows the loop to store the message along with its packet ID when the forwarded message is
181+
/// published. When an acknowledgement is received for the forwarded message, the packet id is used
182+
/// to retrieve the original [Publish], which is then passed to [AsyncClient::ack] to complete the
183+
/// final step of the message flow.
184+
///
185+
/// The channel sends [Option<Publish>] rather than [Publish] to allow the bridge to send entirely
186+
/// novel messages, and not just forwarded ones, as attaching packet IDs relies on pairing every
187+
/// [Outgoing] publish notification with a message sent by the relevant client. This means the
188+
///
189+
/// # Health topics
160190
async fn half_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>(
161191
mut recv_event_loop: EventLoop,
162192
target: AsyncClient,
163193
transform_topic: F,
164194
mut corresponding_bridge_half: BidirectionalChannelHalf<Option<Publish>>,
165195
health_topic: Option<String>,
196+
name: &'static str,
166197
) {
167198
let mut forward_pkid_to_received_msg = HashMap::new();
168199
let mut last_err = Some("dummy error".into());
169200

170201
loop {
171202
let notification = match recv_event_loop.poll().await {
172-
// TODO notify if this is us recovering from an error
173203
Ok(notification) => {
174204
if last_err.as_ref().is_some() {
175-
// TODO clarify whether this is cloud/local
176-
info!("MQTT bridge connected");
205+
info!("MQTT bridge connected to {name} broker");
177206
last_err = None;
178207
if let Some(health_topic) = &health_topic {
179208
target
@@ -188,8 +217,7 @@ async fn half_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>(
188217
Err(err) => {
189218
let err = err.to_string();
190219
if last_err.as_ref() != Some(&err) {
191-
// TODO clarify whether this is cloud/local
192-
error!("MQTT bridge connection error: {err}");
220+
error!("MQTT bridge failed to connect to {name} broker: {err}");
193221
last_err = Some(err);
194222
if let Some(health_topic) = &health_topic {
195223
target
@@ -219,22 +247,24 @@ async fn half_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>(
219247
)
220248
.await
221249
.unwrap();
222-
corresponding_bridge_half.send(Some(publish)).await.unwrap();
250+
let publish = (publish.qos > QoS::AtMostOnce).then_some(publish);
251+
corresponding_bridge_half.send(publish).await.unwrap();
223252
}
224253
// Forwarding acks from event loop to target
225254
Event::Incoming(
226255
Incoming::PubAck(PubAck { pkid: ack_pkid })
227256
| Incoming::PubRec(PubRec { pkid: ack_pkid }),
228257
) => {
229-
if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid).unwrap() {
258+
if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) {
230259
target.ack(&msg).await.unwrap();
231260
}
232261
}
233262
Event::Outgoing(Outgoing::Publish(pkid)) => {
234263
match corresponding_bridge_half.recv().await {
235-
Some(optional_msg) => {
236-
forward_pkid_to_received_msg.insert(pkid, optional_msg);
264+
Some(Some(msg)) => {
265+
forward_pkid_to_received_msg.insert(pkid, msg);
237266
}
267+
Some(None) => {}
238268
None => break,
239269
}
240270
}
@@ -257,7 +287,7 @@ impl Builder<MqttBridgeActor> for MqttBridgeActorBuilder {
257287

258288
impl RuntimeRequestSink for MqttBridgeActorBuilder {
259289
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
260-
Box::new(self.signal_sender.clone())
290+
NullSender.into()
261291
}
262292
}
263293

0 commit comments

Comments
 (0)