@@ -159,10 +159,9 @@ impl<'a, T> BidirectionalChannelHalf<T> {
159
159
/// Forward messages received from `recv_event_loop` to `target`
160
160
///
161
161
/// The result of running this function constitutes half the MQTT bridge, hence the name.
162
- /// Each half has two main responsibilities, one is to take messages received on the event
163
- /// loop and forward them to the target client, the other is to communicate with the corresponding
164
- /// half of the bridge to ensure published messages get acknowledged only when they have been
165
- /// fully processed.
162
+ /// Each half has two main responsibilities, one is to take messages received on the event loop and
163
+ /// forward them to the target client, the other is to communicate with the companion half of the
164
+ /// bridge to ensure published messages get acknowledged only when they have been fully processed.
166
165
///
167
166
/// # Message flow
168
167
/// Messages in the bridge go through a few states
@@ -184,14 +183,25 @@ impl<'a, T> BidirectionalChannelHalf<T> {
184
183
///
185
184
/// The channel sends [Option<Publish>] rather than [Publish] to allow the bridge to send entirely
186
185
/// novel messages, and not just forwarded ones, as attaching packet IDs relies on pairing every
187
- /// [Outgoing] publish notification with a message sent by the relevant client. This means the
186
+ /// [Outgoing] publish notification with a message sent by the relevant client. So, when a QoS 1
187
+ /// message is forwarded, this will be accompanied by sending `Some(message)` to the channel,
188
+ /// allowing the original message to be acknowledged once an acknowledgement is received for the
189
+ /// forwarded message. When publishing a health message, this will be accompanied by sending `None`
190
+ /// to the channel, telling the bridge to ignore the associated packet ID as this didn't arise from
191
+ /// a forwarded message that itself requires acknowledgement.
188
192
///
189
193
/// # Health topics
194
+ /// The bridge will publish health information to `health_topic` (if supplied) on `target` to enable
195
+ /// other components to establish bridge health. This is intended to be used the half with cloud
196
+ /// event loop, so the status of this connection will be relayed to a relevant `te` topic like its
197
+ /// mosquitto-based predecessor. The payload is either `1` (healthy) or `0` (unhealthy). When the
198
+ /// connection is created, the last-will message is set to send the `0` payload when the connection
199
+ /// is dropped.
190
200
async fn half_bridge < F : for < ' a > Fn ( & ' a str ) -> Cow < ' a , str > > (
191
201
mut recv_event_loop : EventLoop ,
192
202
target : AsyncClient ,
193
203
transform_topic : F ,
194
- mut corresponding_bridge_half : BidirectionalChannelHalf < Option < Publish > > ,
204
+ mut companion_bridge_half : BidirectionalChannelHalf < Option < Publish > > ,
195
205
health_topic : Option < String > ,
196
206
name : & ' static str ,
197
207
) {
@@ -209,7 +219,7 @@ async fn half_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>(
209
219
. publish ( health_topic, QoS :: AtLeastOnce , true , C8Y_BRIDGE_UP_PAYLOAD )
210
220
. await
211
221
. unwrap ( ) ;
212
- corresponding_bridge_half . send ( None ) . await . unwrap ( ) ;
222
+ companion_bridge_half . send ( None ) . await . unwrap ( ) ;
213
223
}
214
224
}
215
225
notification
@@ -229,7 +239,7 @@ async fn half_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>(
229
239
)
230
240
. await
231
241
. unwrap ( ) ;
232
- corresponding_bridge_half . send ( None ) . await . unwrap ( ) ;
242
+ companion_bridge_half . send ( None ) . await . unwrap ( ) ;
233
243
}
234
244
}
235
245
continue ;
@@ -248,7 +258,7 @@ async fn half_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>(
248
258
. await
249
259
. unwrap ( ) ;
250
260
let publish = ( publish. qos > QoS :: AtMostOnce ) . then_some ( publish) ;
251
- corresponding_bridge_half . send ( publish) . await . unwrap ( ) ;
261
+ companion_bridge_half . send ( publish) . await . unwrap ( ) ;
252
262
}
253
263
// Forwarding acks from event loop to target
254
264
Event :: Incoming (
@@ -259,15 +269,13 @@ async fn half_bridge<F: for<'a> Fn(&'a str) -> Cow<'a, str>>(
259
269
target. ack ( & msg) . await . unwrap ( ) ;
260
270
}
261
271
}
262
- Event :: Outgoing ( Outgoing :: Publish ( pkid) ) => {
263
- match corresponding_bridge_half. recv ( ) . await {
264
- Some ( Some ( msg) ) => {
265
- forward_pkid_to_received_msg. insert ( pkid, msg) ;
266
- }
267
- Some ( None ) => { }
268
- None => break ,
272
+ Event :: Outgoing ( Outgoing :: Publish ( pkid) ) => match companion_bridge_half. recv ( ) . await {
273
+ Some ( Some ( msg) ) => {
274
+ forward_pkid_to_received_msg. insert ( pkid, msg) ;
269
275
}
270
- }
276
+ Some ( None ) => { }
277
+ None => break ,
278
+ } ,
271
279
_ => { }
272
280
}
273
281
}
0 commit comments