Skip to content

Commit 9979965

Browse files
committed
feat: tedge connect to c8y mqtt service endpoint
1 parent 40d34b4 commit 9979965

File tree

5 files changed

+228
-4
lines changed

5 files changed

+228
-4
lines changed

crates/common/tedge_config/src/tedge_toml/models/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use strum::Display;
2424

2525
pub const HTTPS_PORT: u16 = 443;
2626
pub const MQTT_TLS_PORT: u16 = 8883;
27+
pub const MQTT_SVC_TLS_PORT: u16 = 9883;
2728

2829
pub use self::apt_config::*;
2930
pub use self::auto::*;

crates/common/tedge_config/src/tedge_toml/tedge_config.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use super::models::SoftwareManagementApiFlag;
2020
use super::models::TemplatesSet;
2121
use super::models::TopicPrefix;
2222
use super::models::HTTPS_PORT;
23+
use super::models::MQTT_SVC_TLS_PORT;
2324
use super::models::MQTT_TLS_PORT;
2425
use super::tedge_config_location::TEdgeConfigLocation;
2526
use crate::models::AbsolutePath;
@@ -226,6 +227,10 @@ define_tedge_config! {
226227
#[tedge_config(reader(private))]
227228
url: ConnectUrl,
228229

230+
/// Cumulocity tenant ID
231+
#[tedge_config(example = "t12345678")]
232+
tenant_id: String,
233+
229234
/// The path where Cumulocity root certificate(s) are stored
230235
#[tedge_config(note = "The value can be a directory path as well as the path of the certificate file.")]
231236
#[tedge_config(example = "/etc/tedge/c8y-trusted-root-certificates.pem", default(function = "default_root_cert_path"))]
@@ -433,6 +438,23 @@ define_tedge_config! {
433438
#[tedge_config(example = "60m", default(from_str = "60m"))]
434439
interval: SecondsOrHumanTime,
435440
},
441+
442+
mqtt_service: {
443+
/// Wheather to connect to the MQTT service endpoint or not
444+
#[tedge_config(example = "true", default(value = false))]
445+
enabled: bool,
446+
447+
/// MQTT service endpoint for the Cumulocity tenant, with optional port.
448+
#[tedge_config(example = "mqtt.your-tenant.cumulocity.com:1234")]
449+
#[tedge_config(default(from_optional_key = "c8y.url"))]
450+
url: HostPort<MQTT_SVC_TLS_PORT>,
451+
452+
/// The topic prefix that will be used for the Cumulocity MQTT service endpoint connection.
453+
/// For instance, if set to "c8y-mqtt", then messages published to `c8y-mqtt/xyz`
454+
/// will be forwarded to the MQTT service endpoint on the `xyz` topic
455+
#[tedge_config(example = "c8y-mqtt", default(function = "c8y_mqtt_service_topic_prefix"))]
456+
topic_prefix: TopicPrefix,
457+
}
436458
},
437459

438460
#[tedge_config(deprecated_name = "azure")] // for 0.1.0 compatibility
@@ -1114,6 +1136,10 @@ fn c8y_topic_prefix() -> TopicPrefix {
11141136
TopicPrefix::try_new("c8y").unwrap()
11151137
}
11161138

1139+
fn c8y_mqtt_service_topic_prefix() -> TopicPrefix {
1140+
TopicPrefix::try_new("c8y-mqtt").unwrap()
1141+
}
1142+
11171143
fn az_topic_prefix() -> TopicPrefix {
11181144
TopicPrefix::try_new("az").unwrap()
11191145
}

crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl_append_remove_for_single_value!(
4040
ConnectUrl,
4141
HostPort<HTTPS_PORT>,
4242
HostPort<MQTT_TLS_PORT>,
43+
HostPort<MQTT_SVC_TLS_PORT>,
4344
bool,
4445
IpAddr,
4546
u16,

crates/core/tedge/src/bridge/c8y.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use super::config::ProxyWrapper;
22
use super::BridgeConfig;
33
use crate::bridge::config::BridgeLocation;
4+
use crate::ConfigError;
5+
use c8y_api::http_proxy::read_c8y_credentials;
46
use camino::Utf8PathBuf;
57
use std::borrow::Cow;
68
use std::process::Command;
@@ -13,8 +15,10 @@ use tedge_config::models::AutoFlag;
1315
use tedge_config::models::HostPort;
1416
use tedge_config::models::TemplatesSet;
1517
use tedge_config::models::TopicPrefix;
18+
use tedge_config::models::MQTT_SVC_TLS_PORT;
1619
use tedge_config::models::MQTT_TLS_PORT;
1720
use tedge_config::tedge_toml::ProfileName;
21+
use tedge_config::TEdgeConfig;
1822
use which::which;
1923

2024
#[derive(Debug)]
@@ -201,6 +205,171 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
201205
}
202206
}
203207

208+
#[derive(Debug)]
209+
pub struct BridgeConfigC8yMqttServiceParams {
210+
pub mqtt_host: HostPort<MQTT_SVC_TLS_PORT>,
211+
pub config_file: Cow<'static, str>,
212+
pub tenant_id: String,
213+
pub remote_clientid: String,
214+
pub remote_username: Option<String>,
215+
pub remote_password: Option<String>,
216+
pub bridge_root_cert_path: Utf8PathBuf,
217+
pub bridge_certfile: Utf8PathBuf,
218+
pub bridge_keyfile: Utf8PathBuf,
219+
pub include_local_clean_session: AutoFlag,
220+
pub bridge_location: BridgeLocation,
221+
pub topic_prefix: TopicPrefix,
222+
pub profile_name: Option<ProfileName>,
223+
pub mqtt_schema: MqttSchema,
224+
pub keepalive_interval: Duration,
225+
}
226+
227+
impl TryFrom<(&TEdgeConfig, Option<&ProfileName>)> for BridgeConfigC8yMqttServiceParams {
228+
type Error = ConfigError;
229+
230+
fn try_from(value: (&TEdgeConfig, Option<&ProfileName>)) -> Result<Self, Self::Error> {
231+
let (config, profile) = value;
232+
233+
let bridge_location = match config.mqtt.bridge.built_in {
234+
true => BridgeLocation::BuiltIn,
235+
false => BridgeLocation::Mosquitto,
236+
};
237+
let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone());
238+
let c8y_config = config.c8y.try_get(profile)?;
239+
240+
let (remote_username, remote_password) =
241+
match c8y_config.auth_method.to_type(&c8y_config.credentials_path) {
242+
AuthType::Certificate => (None, None),
243+
AuthType::Basic => {
244+
let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?;
245+
(Some(username), Some(password))
246+
}
247+
};
248+
249+
let config_file = if let Some(profile_name) = &profile {
250+
format!("c8y-mqtt-svc@{profile_name}-bridge.conf")
251+
} else {
252+
"c8y-mqtt-svc-bridge.conf".to_string()
253+
};
254+
let params = BridgeConfigC8yMqttServiceParams {
255+
mqtt_host: c8y_config.mqtt_service.url.or_config_not_set()?.clone(),
256+
config_file: config_file.into(),
257+
tenant_id: c8y_config.tenant_id.or_config_not_set()?.clone(),
258+
bridge_root_cert_path: c8y_config.root_cert_path.clone().into(),
259+
remote_clientid: c8y_config.device.id()?.clone(),
260+
remote_username,
261+
remote_password,
262+
bridge_certfile: c8y_config.device.cert_path.clone().into(),
263+
bridge_keyfile: c8y_config.device.key_path.clone().into(),
264+
include_local_clean_session: c8y_config.bridge.include.local_cleansession.clone(),
265+
bridge_location,
266+
topic_prefix: c8y_config.mqtt_service.topic_prefix.clone(),
267+
profile_name: profile.cloned(),
268+
mqtt_schema,
269+
keepalive_interval: c8y_config.bridge.keepalive_interval.duration(),
270+
};
271+
272+
Ok(params)
273+
}
274+
}
275+
276+
impl From<BridgeConfigC8yMqttServiceParams> for BridgeConfig {
277+
fn from(params: BridgeConfigC8yMqttServiceParams) -> Self {
278+
let BridgeConfigC8yMqttServiceParams {
279+
mqtt_host,
280+
config_file,
281+
bridge_root_cert_path,
282+
tenant_id,
283+
mut remote_username,
284+
remote_password,
285+
remote_clientid,
286+
bridge_certfile,
287+
bridge_keyfile,
288+
include_local_clean_session,
289+
bridge_location,
290+
topic_prefix,
291+
profile_name,
292+
mqtt_schema,
293+
keepalive_interval,
294+
} = params;
295+
296+
let address = mqtt_host
297+
.to_string()
298+
.parse::<HostPort<MQTT_TLS_PORT>>()
299+
.expect("MQTT service address must be in the expected format");
300+
301+
let topics: Vec<String> = vec![
302+
// Outgoing
303+
format!(r#"# out 1 {topic_prefix}/"#),
304+
// Incoming
305+
format!(r#"$debug/$error in 1 {topic_prefix}/"#),
306+
];
307+
308+
let auth_type = if remote_password.is_some() {
309+
AuthType::Basic
310+
} else {
311+
AuthType::Certificate
312+
};
313+
314+
// When cert based auth is used, provide the tenant id as the username
315+
if auth_type == AuthType::Certificate {
316+
remote_username = Some(tenant_id.clone());
317+
}
318+
319+
let (include_local_clean_session, mosquitto_version) = match include_local_clean_session {
320+
AutoFlag::True => (true, None),
321+
AutoFlag::False => (false, None),
322+
AutoFlag::Auto => is_mosquitto_version_above_2(),
323+
};
324+
325+
let service_name = format!("mosquitto-{topic_prefix}-bridge");
326+
let health = mqtt_schema.topic_for(
327+
&EntityTopicId::default_main_service(&service_name).unwrap(),
328+
&Channel::Health,
329+
);
330+
331+
Self {
332+
cloud_name: "c8y-mqtt".into(),
333+
config_file,
334+
connection: if let Some(profile) = &profile_name {
335+
format!("edge_to_c8y_mqtt_service@{profile}")
336+
} else {
337+
"edge_to_c8y_mqtt_service".into()
338+
},
339+
address,
340+
remote_username,
341+
remote_password,
342+
bridge_root_cert_path,
343+
remote_clientid,
344+
local_clientid: if let Some(profile) = &profile_name {
345+
format!("CumulocityMqttService@{profile}")
346+
} else {
347+
"CumulocityMqttService".into()
348+
},
349+
bridge_certfile,
350+
bridge_keyfile,
351+
use_mapper: true,
352+
use_agent: true,
353+
try_private: false,
354+
start_type: "automatic".into(),
355+
clean_session: true,
356+
include_local_clean_session,
357+
local_clean_session: false,
358+
notifications: true,
359+
notifications_local_only: true,
360+
notification_topic: health.name,
361+
bridge_attempt_unsubscribe: false,
362+
topics,
363+
bridge_location,
364+
connection_check_attempts: 3,
365+
auth_type,
366+
mosquitto_version,
367+
keepalive_interval,
368+
proxy: None,
369+
}
370+
}
371+
}
372+
204373
/// Return whether or not mosquitto version is >= 2.0.0
205374
///
206375
/// As mosquitto doesn't provide a `--version` flag, this is a guess.

crates/core/tedge/src/cli/connect/command.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
use crate::bridge::aws::BridgeConfigAwsParams;
33
#[cfg(feature = "azure")]
44
use crate::bridge::azure::BridgeConfigAzureParams;
5+
use crate::bridge::c8y::BridgeConfigC8yMqttServiceParams;
56
#[cfg(feature = "c8y")]
67
use crate::bridge::c8y::BridgeConfigC8yParams;
78
use crate::bridge::BridgeConfig;
@@ -746,6 +747,10 @@ pub fn bridge_config(
746747
}
747748
};
748749

750+
if c8y_config.mqtt_service.enabled && remote_password.is_none() {
751+
// If the MQTT service connection is enabled and cert based auth is used, then tenant_id must be set
752+
c8y_config.tenant_id.or_config_not_set()?;
753+
}
749754
let params = BridgeConfigC8yParams {
750755
mqtt_host: c8y_config.mqtt.or_config_not_set()?.clone(),
751756
config_file: cloud.bridge_config_filename(),
@@ -841,10 +846,19 @@ impl ConnectCommand {
841846
}
842847

843848
if bridge_config.bridge_location == BridgeLocation::Mosquitto {
844-
if let Err(err) = write_bridge_config_to_file(tedge_config, bridge_config).await {
845-
// We want to preserve previous errors and therefore discard result of this function.
846-
let _ = clean_up(tedge_config, bridge_config);
847-
return Err(err.into());
849+
write_mosquitto_bridge_config_file(tedge_config, bridge_config).await?;
850+
851+
if let Cloud::C8y(profile_name) = &self.cloud {
852+
let c8y_config = tedge_config.c8y.try_get(profile_name.as_deref())?;
853+
if c8y_config.mqtt_service.enabled {
854+
let config_params = BridgeConfigC8yMqttServiceParams::try_from((
855+
tedge_config,
856+
profile_name.as_deref(),
857+
))?;
858+
let mqtt_svc_bridge_config = BridgeConfig::from(config_params);
859+
write_mosquitto_bridge_config_file(tedge_config, &mqtt_svc_bridge_config)
860+
.await?;
861+
}
848862
}
849863
} else {
850864
use_built_in_bridge(tedge_config, bridge_config).await?;
@@ -1040,6 +1054,19 @@ async fn write_generic_mosquitto_config_to_file(
10401054
Ok(())
10411055
}
10421056

1057+
async fn write_mosquitto_bridge_config_file(
1058+
tedge_config: &TEdgeConfig,
1059+
bridge_config: &BridgeConfig,
1060+
) -> Result<(), ConnectError> {
1061+
if let Err(err) = write_bridge_config_to_file(tedge_config, bridge_config).await {
1062+
// We want to preserve previous errors and therefore discard result of this function.
1063+
// let _ = clean_up(tedge_config, bridge_config);
1064+
return Err(err);
1065+
}
1066+
1067+
Ok(())
1068+
}
1069+
10431070
async fn write_bridge_config_to_file(
10441071
config: &TEdgeConfig,
10451072
bridge_config: &BridgeConfig,

0 commit comments

Comments
 (0)