Skip to content

Commit ce649ba

Browse files
authored
Merge pull request #3365 from rina23q/feature/3153/configurable-mqtt-keepalive-interval-to-cloud-bridge
feat: add MQTT keepalive_interval config to allow custom value per bridge
2 parents 38d5c1c + 2a5a2ec commit ce649ba

File tree

9 files changed

+61
-3
lines changed

9 files changed

+61
-3
lines changed

crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,10 @@ define_tedge_config! {
341341
/// forwarded to Cumulocity on the `s/us` topic
342342
#[tedge_config(example = "c8y", default(function = "c8y_topic_prefix"))]
343343
topic_prefix: TopicPrefix,
344+
345+
/// The amount of time after which the bridge should send a ping if no other traffic has occured
346+
#[tedge_config(example = "60s", default(from_str = "60s"))]
347+
keepalive_interval: SecondsOrHumanTime,
344348
},
345349

346350
entity_store: {
@@ -441,6 +445,10 @@ define_tedge_config! {
441445
/// forwarded to Azure on the `$iothub/twin/GET/#` topic
442446
#[tedge_config(example = "az", default(function = "az_topic_prefix"))]
443447
topic_prefix: TopicPrefix,
448+
449+
/// The amount of time after which the bridge should send a ping if no other traffic has occured
450+
#[tedge_config(example = "60s", default(from_str = "60s"))]
451+
keepalive_interval: SecondsOrHumanTime,
444452
},
445453

446454
/// Set of MQTT topics the Azure IoT mapper should subscribe to
@@ -506,6 +514,11 @@ define_tedge_config! {
506514
/// forwarded to AWS on the `$aws/things/shadow/#` topic
507515
#[tedge_config(example = "aws", default(function = "aws_topic_prefix"))]
508516
topic_prefix: TopicPrefix,
517+
518+
519+
/// The amount of time after which the bridge should send a ping if no other traffic has occured
520+
#[tedge_config(example = "60s", default(from_str = "60s"))]
521+
keepalive_interval: SecondsOrHumanTime,
509522
},
510523

511524
/// Set of MQTT topics the AWS IoT mapper should subscribe to

crates/core/tedge/src/bridge/aws.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::BridgeConfig;
22
use crate::bridge::config::BridgeLocation;
33
use camino::Utf8PathBuf;
44
use std::borrow::Cow;
5+
use std::time::Duration;
56
use tedge_api::mqtt_topics::Channel;
67
use tedge_api::mqtt_topics::EntityTopicId;
78
use tedge_api::mqtt_topics::MqttSchema;
@@ -22,6 +23,7 @@ pub struct BridgeConfigAwsParams {
2223
pub topic_prefix: TopicPrefix,
2324
pub profile_name: Option<ProfileName>,
2425
pub mqtt_schema: MqttSchema,
26+
pub keepalive_interval: Duration,
2527
}
2628

2729
impl From<BridgeConfigAwsParams> for BridgeConfig {
@@ -37,6 +39,7 @@ impl From<BridgeConfigAwsParams> for BridgeConfig {
3739
topic_prefix,
3840
profile_name,
3941
mqtt_schema,
42+
keepalive_interval,
4043
} = params;
4144

4245
let user_name = remote_clientid.to_string();
@@ -107,6 +110,7 @@ impl From<BridgeConfigAwsParams> for BridgeConfig {
107110
connection_check_attempts: 5,
108111
auth_method: None,
109112
mosquitto_version: None,
113+
keepalive_interval,
110114
}
111115
}
112116
}
@@ -124,6 +128,7 @@ fn test_bridge_config_from_aws_params() -> anyhow::Result<()> {
124128
topic_prefix: "aws".try_into().unwrap(),
125129
profile_name: None,
126130
mqtt_schema: MqttSchema::with_root("te".into()),
131+
keepalive_interval: Duration::from_secs(60),
127132
};
128133

129134
let bridge = BridgeConfig::from(params);
@@ -162,6 +167,7 @@ fn test_bridge_config_from_aws_params() -> anyhow::Result<()> {
162167
connection_check_attempts: 5,
163168
auth_method: None,
164169
mosquitto_version: None,
170+
keepalive_interval: Duration::from_secs(60),
165171
};
166172

167173
assert_eq!(bridge, expected);
@@ -182,6 +188,7 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> {
182188
topic_prefix: "aws-custom".try_into().unwrap(),
183189
profile_name: Some("profile".parse().unwrap()),
184190
mqtt_schema: MqttSchema::with_root("te".into()),
191+
keepalive_interval: Duration::from_secs(60),
185192
};
186193

187194
let bridge = BridgeConfig::from(params);
@@ -222,6 +229,7 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> {
222229
connection_check_attempts: 5,
223230
auth_method: None,
224231
mosquitto_version: None,
232+
keepalive_interval: Duration::from_secs(60),
225233
};
226234

227235
assert_eq!(bridge, expected);

crates/core/tedge/src/bridge/azure.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::BridgeConfig;
22
use crate::bridge::config::BridgeLocation;
33
use camino::Utf8PathBuf;
44
use std::borrow::Cow;
5+
use std::time::Duration;
56
use tedge_api::mqtt_topics::Channel;
67
use tedge_api::mqtt_topics::EntityTopicId;
78
use tedge_api::mqtt_topics::MqttSchema;
@@ -22,6 +23,7 @@ pub struct BridgeConfigAzureParams {
2223
pub topic_prefix: TopicPrefix,
2324
pub profile_name: Option<ProfileName>,
2425
pub mqtt_schema: MqttSchema,
26+
pub keepalive_interval: Duration,
2527
}
2628

2729
impl From<BridgeConfigAzureParams> for BridgeConfig {
@@ -37,6 +39,7 @@ impl From<BridgeConfigAzureParams> for BridgeConfig {
3739
topic_prefix,
3840
profile_name,
3941
mqtt_schema,
42+
keepalive_interval,
4043
} = params;
4144

4245
let address = mqtt_host.clone();
@@ -103,6 +106,7 @@ impl From<BridgeConfigAzureParams> for BridgeConfig {
103106
connection_check_attempts: 1,
104107
auth_method: None,
105108
mosquitto_version: None,
109+
keepalive_interval,
106110
}
107111
}
108112
}
@@ -122,6 +126,7 @@ fn test_bridge_config_from_azure_params() -> anyhow::Result<()> {
122126
topic_prefix: "az".try_into().unwrap(),
123127
profile_name: None,
124128
mqtt_schema: MqttSchema::with_root("te".into()),
129+
keepalive_interval: Duration::from_secs(60),
125130
};
126131

127132
let bridge = BridgeConfig::from(params);
@@ -162,6 +167,7 @@ fn test_bridge_config_from_azure_params() -> anyhow::Result<()> {
162167
connection_check_attempts: 1,
163168
auth_method: None,
164169
mosquitto_version: None,
170+
keepalive_interval: Duration::from_secs(60),
165171
};
166172

167173
assert_eq!(bridge, expected);
@@ -184,6 +190,7 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> {
184190
topic_prefix: "az-custom".try_into().unwrap(),
185191
profile_name: Some("profile".parse().unwrap()),
186192
mqtt_schema: MqttSchema::with_root("te".into()),
193+
keepalive_interval: Duration::from_secs(60),
187194
};
188195

189196
let bridge = BridgeConfig::from(params);
@@ -225,6 +232,7 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> {
225232
connection_check_attempts: 1,
226233
auth_method: None,
227234
mosquitto_version: None,
235+
keepalive_interval: Duration::from_secs(60),
228236
};
229237

230238
assert_eq!(bridge, expected);

crates/core/tedge/src/bridge/c8y.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::bridge::config::BridgeLocation;
33
use camino::Utf8PathBuf;
44
use std::borrow::Cow;
55
use std::process::Command;
6+
use std::time::Duration;
67
use tedge_api::mqtt_topics::Channel;
78
use tedge_api::mqtt_topics::EntityTopicId;
89
use tedge_api::mqtt_topics::MqttSchema;
@@ -32,6 +33,7 @@ pub struct BridgeConfigC8yParams {
3233
pub topic_prefix: TopicPrefix,
3334
pub profile_name: Option<ProfileName>,
3435
pub mqtt_schema: MqttSchema,
36+
pub keepalive_interval: Duration,
3537
}
3638

3739
impl From<BridgeConfigC8yParams> for BridgeConfig {
@@ -52,6 +54,7 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
5254
topic_prefix,
5355
profile_name,
5456
mqtt_schema,
57+
keepalive_interval,
5558
} = params;
5659

5760
let mut topics: Vec<String> = vec![
@@ -190,6 +193,7 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
190193
connection_check_attempts: 1,
191194
auth_method: Some(auth_method),
192195
mosquitto_version,
196+
keepalive_interval,
193197
}
194198
}
195199
}
@@ -249,6 +253,7 @@ mod tests {
249253
topic_prefix: "c8y".try_into().unwrap(),
250254
profile_name: None,
251255
mqtt_schema: MqttSchema::with_root("te".into()),
256+
keepalive_interval: Duration::from_secs(60),
252257
};
253258

254259
let bridge = BridgeConfig::from(params);
@@ -319,6 +324,7 @@ mod tests {
319324
connection_check_attempts: 1,
320325
auth_method: Some(AuthMethod::Certificate),
321326
mosquitto_version: None,
327+
keepalive_interval: Duration::from_secs(60),
322328
};
323329

324330
assert_eq!(bridge, expected);
@@ -344,6 +350,7 @@ mod tests {
344350
topic_prefix: "c8y".try_into().unwrap(),
345351
profile_name: Some("profile".parse().unwrap()),
346352
mqtt_schema: MqttSchema::with_root("te".into()),
353+
keepalive_interval: Duration::from_secs(60),
347354
};
348355

349356
let bridge = BridgeConfig::from(params);
@@ -421,6 +428,7 @@ mod tests {
421428
connection_check_attempts: 1,
422429
auth_method: Some(AuthMethod::Basic),
423430
mosquitto_version: None,
431+
keepalive_interval: Duration::from_secs(60),
424432
};
425433

426434
assert_eq!(bridge, expected);

crates/core/tedge/src/bridge/config.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use core::fmt;
2-
use std::borrow::Cow;
3-
41
use anyhow::anyhow;
52
use camino::Utf8PathBuf;
3+
use core::fmt;
4+
use std::borrow::Cow;
5+
use std::time::Duration;
66
use tedge_config::auth_method::AuthMethod;
77
use tedge_config::HostPort;
88
use tedge_config::TEdgeConfigLocation;
@@ -43,6 +43,7 @@ pub struct BridgeConfig {
4343
pub connection_check_attempts: i32,
4444
pub auth_method: Option<AuthMethod>,
4545
pub mosquitto_version: Option<String>,
46+
pub keepalive_interval: Duration,
4647
}
4748

4849
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
@@ -106,6 +107,11 @@ impl BridgeConfig {
106107
"bridge_attempt_unsubscribe {}",
107108
self.bridge_attempt_unsubscribe
108109
)?;
110+
writeln!(
111+
writer,
112+
"keepalive_interval {}",
113+
self.keepalive_interval.as_secs()
114+
)?;
109115

110116
writeln!(writer, "\n### Topics",)?;
111117
for topic in &self.topics {
@@ -212,6 +218,7 @@ mod test {
212218
connection_check_attempts: 1,
213219
auth_method: None,
214220
mosquitto_version: None,
221+
keepalive_interval: Duration::from_secs(60),
215222
};
216223

217224
let mut serialized_config = Vec::<u8>::new();
@@ -239,6 +246,7 @@ notifications false
239246
notifications_local_only false
240247
notification_topic test_topic
241248
bridge_attempt_unsubscribe false
249+
keepalive_interval 60
242250
243251
### Topics
244252
"#,
@@ -282,6 +290,7 @@ bridge_attempt_unsubscribe false
282290
connection_check_attempts: 1,
283291
auth_method: None,
284292
mosquitto_version: None,
293+
keepalive_interval: Duration::from_secs(60),
285294
};
286295
let mut serialized_config = Vec::<u8>::new();
287296
bridge.serialize(&mut serialized_config)?;
@@ -308,6 +317,7 @@ notifications false
308317
notifications_local_only false
309318
notification_topic test_topic
310319
bridge_attempt_unsubscribe false
320+
keepalive_interval 60
311321
312322
### Topics
313323
"#,
@@ -354,6 +364,7 @@ bridge_attempt_unsubscribe false
354364
connection_check_attempts: 1,
355365
auth_method: None,
356366
mosquitto_version: None,
367+
keepalive_interval: Duration::from_secs(60),
357368
};
358369

359370
let mut buffer = Vec::new();
@@ -383,6 +394,7 @@ bridge_attempt_unsubscribe false
383394
expected.insert("notifications_local_only false");
384395
expected.insert("notification_topic test_topic");
385396
expected.insert("bridge_attempt_unsubscribe false");
397+
expected.insert("keepalive_interval 60");
386398

387399
expected.insert("topic messages/events/ out 1 az/ devices/alpha/");
388400
expected.insert("topic messages/devicebound/# in 1 az/ devices/alpha/");
@@ -426,6 +438,7 @@ bridge_attempt_unsubscribe false
426438
connection_check_attempts: 1,
427439
auth_method: None,
428440
mosquitto_version: None,
441+
keepalive_interval: Duration::from_secs(60),
429442
};
430443

431444
let mut buffer = Vec::new();
@@ -454,6 +467,7 @@ bridge_attempt_unsubscribe false
454467
expected.insert("notifications_local_only false");
455468
expected.insert("notification_topic test_topic");
456469
expected.insert("bridge_attempt_unsubscribe false");
470+
expected.insert("keepalive_interval 60");
457471
expected.insert(r#"topic inventory/managedObjects/update/# out 2 c8y/ """#);
458472
expected.insert(r#"topic measurement/measurements/create out 2 c8y/ """#);
459473
assert_eq!(config_set, expected);
@@ -545,6 +559,7 @@ bridge_attempt_unsubscribe false
545559
connection_check_attempts: 1,
546560
auth_method: None,
547561
mosquitto_version: None,
562+
keepalive_interval: Duration::from_secs(60),
548563
}
549564
}
550565
}

crates/core/tedge/src/cli/connect/command.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,7 @@ pub fn bridge_config(
465465
topic_prefix: az_config.bridge.topic_prefix.clone(),
466466
profile_name: profile.clone().map(Cow::into_owned),
467467
mqtt_schema,
468+
keepalive_interval: az_config.bridge.keepalive_interval.duration(),
468469
};
469470

470471
Ok(BridgeConfig::from(params))
@@ -490,6 +491,7 @@ pub fn bridge_config(
490491
topic_prefix: aws_config.bridge.topic_prefix.clone(),
491492
profile_name: profile.clone().map(Cow::into_owned),
492493
mqtt_schema,
494+
keepalive_interval: aws_config.bridge.keepalive_interval.duration(),
493495
};
494496

495497
Ok(BridgeConfig::from(params))
@@ -529,6 +531,7 @@ pub fn bridge_config(
529531
topic_prefix: c8y_config.bridge.topic_prefix.clone(),
530532
profile_name: profile.clone().map(Cow::into_owned),
531533
mqtt_schema,
534+
keepalive_interval: c8y_config.bridge.keepalive_interval.duration(),
532535
};
533536

534537
Ok(BridgeConfig::from(params))

crates/core/tedge_mapper/src/aws/mapper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ impl TEdgeComponent for AwsMapper {
5151
8883,
5252
);
5353
cloud_config.set_clean_session(false);
54+
cloud_config.set_keep_alive(aws_config.bridge.keepalive_interval.duration());
5455
use_key_and_cert(&mut cloud_config, aws_config)?;
5556

5657
let bridge_name = format!("tedge-mapper-bridge-{prefix}");

crates/core/tedge_mapper/src/az/mapper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ impl TEdgeComponent for AzureMapper {
5959
),
6060
"",
6161
);
62+
cloud_config.set_keep_alive(az_config.bridge.keepalive_interval.duration());
6263
use_key_and_cert(&mut cloud_config, az_config)?;
6364

6465
let built_in_bridge_name = format!("tedge-mapper-bridge-{prefix}");

crates/core/tedge_mapper/src/c8y/mapper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ impl TEdgeComponent for CumulocityMapper {
215215
message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(),
216216
retain: false,
217217
});
218+
cloud_config.set_keep_alive(c8y_config.bridge.keepalive_interval.duration());
218219

219220
runtime
220221
.spawn(

0 commit comments

Comments
 (0)