From 40d34b4b3106925ea46e42ea22bc698ea853ad48 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Wed, 25 Jun 2025 17:34:46 +0000 Subject: [PATCH 01/12] spec: c8y mqtt service connection spec --- .../0007-c8y-mqtt-service-connect.md | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 design/decisions/0007-c8y-mqtt-service-connect.md diff --git a/design/decisions/0007-c8y-mqtt-service-connect.md b/design/decisions/0007-c8y-mqtt-service-connect.md new file mode 100644 index 00000000000..9db560a4578 --- /dev/null +++ b/design/decisions/0007-c8y-mqtt-service-connect.md @@ -0,0 +1,171 @@ +# Connect to C8Y MQTT service endpoint + +* Date: __2024-06-23__ +* Status: __New__ + +## Requirement + +Cumulocity has introduced a new [MQTT service endpoint](https://cumulocity.com/docs/device-integration/mqtt-service) +that allows users to send free-form messages (any arbitrary topic and payload) to Cumulocity. +It does not replace the existing core MQTT endpoint and hence a separate connection is required. +To support the same, `tedge connect` must be enhanced to connect to this new endpoint as well, +in addition to the existing connection to the core endpoint. + +## MQTT service specifications/limitations + +- The mqtt service endpoint is exposed at the same tenant URL via port 9883 (with TLS). +- Both username/password based and cert based authentication is supported. +- Only connections with clean session enabled are supported. Persistent connection attempts are rejected. +- Subscribing to wildcard topics (`#` and `+`) and system topics (starting with `$SYS`) are not supported. +- Publishing/subscribing to legacy C8Y topics (SmartREST and JSON over MQTT) are not supported either. +- QoS 2 is not supported. +- Retained flag is accepted, but ignored. +- There are limits to the number of topics that can be created + and the number of messages that can be queued per topic. + Publishes are rejected when the queue is full. +- Each published message has a time-to-live, after which it is cleared from the queue. + The subscribed clients are not notified of expired messages. + +## Solutions + +To establish a bridge connection to the mqtt service endpoint, the following info must be provided in the connect request: + +* host: Same as legacy mqtt endpoint +* port: 9883 by default +* auth: + * for username/password based authentication + * username: `/` + * password + * for cert based authentication + * device cert path + * device key path + * server root ca path + * username: `` +* bridge topics: `/#` + +:::note +Even for cert based auth, the `username` must be provided with the tenant id. +::: + +The following are the solution approaches that are considered: + +### Approach 1: Connect to mqtt service along with core mqtt connection + +The mqtt service bridge connection is established along with `tedge connect c8y` command. + +:::note +For the builtin bridge functionality, whether to spawn a different mapper, +or establish a secondary connection from the existing mapper itself is to be decided. +The same applies for the trivial mapping rule that adds the timestamp to the payload. +::: + +#### Configuration + +The mqtt service connection related configs are defined under the `[c8y]` table in `tedge.toml` : + +| Config | Description | Default | +| ------ | ----------- | ------- | +| `c8y.mqtt_service.enabled` | Whether the mqtt server connection must be established or not | `false` | +| `c8y.mqtt_service.url` | The config URL for the MQTT service | `:9883` | +| `c8y.mqtt_service.topic_prefix` | topic prefix used to bridge mqtt topics | `c8y-mqtt` | + +To support connecting exclusively to the mqtt service, excluding the core mqtt connection, +a config flag (`c8y.core_mqtt.enabled`) to enable/disable the core mqtt connection can also be introduced. + + +To connect to the mqtt service, it must be enabled before `tedge connect c8y` is executed: + +```sh +tedge config set c8y.mqtt_service.enabled "true" +``` + +:::note +The tenant id which must be passed in the username is retrieved using the `https:///tenant/currentTenant` REST API. +An alternative is to take this as an additional `c8y.tenant_id` configuration. +::: + +#### Connection + +The command to connect to C8Y remains the same: + +```sh +tedge connect c8y +``` + +The connection to the mqtt service is established only if it is enabled. + +**Pros** + +- Makes the mqtt service connection seamless for existing users, with minimal configuration. +- Reuse most of the existing configurations like `c8y.url`, `c8y.device.key_path`, `c8y.device.cert_path` etc. +- Users can choose **not** to connect to the new endpoint, by keeping it disabled. +- In future, if the core mqtt connection is replaced with the new mqtt service, + we can just switch what is enabled and disabled by default. + +**Cons*** + +- The mqtt service connection is tied to the direct `c8y` connection, to retrieve the tenant id + (although this dependency can be removed if tenant id is accepted via config). + +### Approach 2: Connect to mqtt service as an independent cloud endpoint + +Do not establish both the core mqtt connection and the mqtt service connection together with `tedge connect c8y` command, +but treat the mqtt service as an independent/generic MQTT broker endpoint like connecting to AWS or AZ endpoints. + +#### Configuration + +As an independent cloud endpoint, the following config values must be provided: + +```sh +tedge config set remote.url example.cumulocity.com:9883 +tedge config set remote.username t123456 +tedge config set remote.bridge.topic_prefix c8y-mqtt +``` + +The following setting can also be configured, though they all have default values: + +| Config | Description | Default | +| ------ | ----------- | ------- | +| `remote.device.cert_path` | The device cert path used for the authentication | Same as `device.cert_path` | +| `remote.device.key_path` | The device cert path used for the authentication | Same as `device.key_path` | + +In addition to the above, pretty much all the configs defined in the `tedge.toml` for `az` and `aws` can be defined +for this generic `remote` endpoint as well, as none of them are cloud specific but applies to any mqtt broker. + +#### Connection + +```sh +tedge connect remote +``` + +**Pros** + +- Clear separation from `tedge connect c8y` allows the second connection to be established and disconnected on demand. +- Existing users are not affected, as the `tedge connect c8y` behavior stays the same. +- Allows customers to connect exclusively to the new mqtt service, if they don't need the core mqtt connection. +- There is no real dependency on Cumulocity as multiple `remote` profiles can be used to connect to any generic broker. + +**Cons** + +- Many of the config parameters defined in the `c8y` profile like the `url`, `cert_path` etc + will have to be duplicated in `remote` profile as well. +- When the device is connected to multiple c8y cloud profiles, + corresponding `remote` profiles must be configured explicitly per c8y instance. + +### Approach 3: Connect to all configured cloud instances with tedge connect + +This is more of an extension of `Approach 2`, where instead of explicitly connecting to each cloud instance/profile, +a `tedge connect` command (without the cloud arg), detects all the cloud profiles that are configured/enabled +and connects to each endpoint one by one. + +Only the cloud instances with a `url` explicitly configured are considered as enabled. +An additional `enabled` flag can be added to each cloud profile, to skip connecting to it, even when it is configured. + +#### Configuration + +TBD + +#### Connection + +TBD + From 99799658283301f86a501fcfba1f35bb1975f45e Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 3 Jul 2025 08:31:07 +0000 Subject: [PATCH 02/12] feat: tedge connect to c8y mqtt service endpoint --- .../tedge_config/src/tedge_toml/models/mod.rs | 1 + .../src/tedge_toml/tedge_config.rs | 26 +++ .../tedge_toml/tedge_config/append_remove.rs | 1 + crates/core/tedge/src/bridge/c8y.rs | 169 ++++++++++++++++++ crates/core/tedge/src/cli/connect/command.rs | 35 +++- 5 files changed, 228 insertions(+), 4 deletions(-) diff --git a/crates/common/tedge_config/src/tedge_toml/models/mod.rs b/crates/common/tedge_config/src/tedge_toml/models/mod.rs index 1cf84288031..9d0d1a58d4c 100644 --- a/crates/common/tedge_config/src/tedge_toml/models/mod.rs +++ b/crates/common/tedge_config/src/tedge_toml/models/mod.rs @@ -24,6 +24,7 @@ use strum::Display; pub const HTTPS_PORT: u16 = 443; pub const MQTT_TLS_PORT: u16 = 8883; +pub const MQTT_SVC_TLS_PORT: u16 = 9883; pub use self::apt_config::*; pub use self::auto::*; diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs index 307a665fd82..96c9ae1b345 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs @@ -20,6 +20,7 @@ use super::models::SoftwareManagementApiFlag; use super::models::TemplatesSet; use super::models::TopicPrefix; use super::models::HTTPS_PORT; +use super::models::MQTT_SVC_TLS_PORT; use super::models::MQTT_TLS_PORT; use super::tedge_config_location::TEdgeConfigLocation; use crate::models::AbsolutePath; @@ -226,6 +227,10 @@ define_tedge_config! { #[tedge_config(reader(private))] url: ConnectUrl, + /// Cumulocity tenant ID + #[tedge_config(example = "t12345678")] + tenant_id: String, + /// The path where Cumulocity root certificate(s) are stored #[tedge_config(note = "The value can be a directory path as well as the path of the certificate file.")] #[tedge_config(example = "/etc/tedge/c8y-trusted-root-certificates.pem", default(function = "default_root_cert_path"))] @@ -433,6 +438,23 @@ define_tedge_config! { #[tedge_config(example = "60m", default(from_str = "60m"))] interval: SecondsOrHumanTime, }, + + mqtt_service: { + /// Wheather to connect to the MQTT service endpoint or not + #[tedge_config(example = "true", default(value = false))] + enabled: bool, + + /// MQTT service endpoint for the Cumulocity tenant, with optional port. + #[tedge_config(example = "mqtt.your-tenant.cumulocity.com:1234")] + #[tedge_config(default(from_optional_key = "c8y.url"))] + url: HostPort, + + /// The topic prefix that will be used for the Cumulocity MQTT service endpoint connection. + /// For instance, if set to "c8y-mqtt", then messages published to `c8y-mqtt/xyz` + /// will be forwarded to the MQTT service endpoint on the `xyz` topic + #[tedge_config(example = "c8y-mqtt", default(function = "c8y_mqtt_service_topic_prefix"))] + topic_prefix: TopicPrefix, + } }, #[tedge_config(deprecated_name = "azure")] // for 0.1.0 compatibility @@ -1114,6 +1136,10 @@ fn c8y_topic_prefix() -> TopicPrefix { TopicPrefix::try_new("c8y").unwrap() } +fn c8y_mqtt_service_topic_prefix() -> TopicPrefix { + TopicPrefix::try_new("c8y-mqtt").unwrap() +} + fn az_topic_prefix() -> TopicPrefix { TopicPrefix::try_new("az").unwrap() } diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs index 2a21f0c7e82..b92b2d241c9 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs @@ -40,6 +40,7 @@ impl_append_remove_for_single_value!( ConnectUrl, HostPort, HostPort, + HostPort, bool, IpAddr, u16, diff --git a/crates/core/tedge/src/bridge/c8y.rs b/crates/core/tedge/src/bridge/c8y.rs index 84beb11fff8..b63d9c52363 100644 --- a/crates/core/tedge/src/bridge/c8y.rs +++ b/crates/core/tedge/src/bridge/c8y.rs @@ -1,6 +1,8 @@ use super::config::ProxyWrapper; use super::BridgeConfig; use crate::bridge::config::BridgeLocation; +use crate::ConfigError; +use c8y_api::http_proxy::read_c8y_credentials; use camino::Utf8PathBuf; use std::borrow::Cow; use std::process::Command; @@ -13,8 +15,10 @@ use tedge_config::models::AutoFlag; use tedge_config::models::HostPort; use tedge_config::models::TemplatesSet; use tedge_config::models::TopicPrefix; +use tedge_config::models::MQTT_SVC_TLS_PORT; use tedge_config::models::MQTT_TLS_PORT; use tedge_config::tedge_toml::ProfileName; +use tedge_config::TEdgeConfig; use which::which; #[derive(Debug)] @@ -201,6 +205,171 @@ impl From for BridgeConfig { } } +#[derive(Debug)] +pub struct BridgeConfigC8yMqttServiceParams { + pub mqtt_host: HostPort, + pub config_file: Cow<'static, str>, + pub tenant_id: String, + pub remote_clientid: String, + pub remote_username: Option, + pub remote_password: Option, + pub bridge_root_cert_path: Utf8PathBuf, + pub bridge_certfile: Utf8PathBuf, + pub bridge_keyfile: Utf8PathBuf, + pub include_local_clean_session: AutoFlag, + pub bridge_location: BridgeLocation, + pub topic_prefix: TopicPrefix, + pub profile_name: Option, + pub mqtt_schema: MqttSchema, + pub keepalive_interval: Duration, +} + +impl TryFrom<(&TEdgeConfig, Option<&ProfileName>)> for BridgeConfigC8yMqttServiceParams { + type Error = ConfigError; + + fn try_from(value: (&TEdgeConfig, Option<&ProfileName>)) -> Result { + let (config, profile) = value; + + let bridge_location = match config.mqtt.bridge.built_in { + true => BridgeLocation::BuiltIn, + false => BridgeLocation::Mosquitto, + }; + let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone()); + let c8y_config = config.c8y.try_get(profile)?; + + let (remote_username, remote_password) = + match c8y_config.auth_method.to_type(&c8y_config.credentials_path) { + AuthType::Certificate => (None, None), + AuthType::Basic => { + let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; + (Some(username), Some(password)) + } + }; + + let config_file = if let Some(profile_name) = &profile { + format!("c8y-mqtt-svc@{profile_name}-bridge.conf") + } else { + "c8y-mqtt-svc-bridge.conf".to_string() + }; + let params = BridgeConfigC8yMqttServiceParams { + mqtt_host: c8y_config.mqtt_service.url.or_config_not_set()?.clone(), + config_file: config_file.into(), + tenant_id: c8y_config.tenant_id.or_config_not_set()?.clone(), + bridge_root_cert_path: c8y_config.root_cert_path.clone().into(), + remote_clientid: c8y_config.device.id()?.clone(), + remote_username, + remote_password, + bridge_certfile: c8y_config.device.cert_path.clone().into(), + bridge_keyfile: c8y_config.device.key_path.clone().into(), + include_local_clean_session: c8y_config.bridge.include.local_cleansession.clone(), + bridge_location, + topic_prefix: c8y_config.mqtt_service.topic_prefix.clone(), + profile_name: profile.cloned(), + mqtt_schema, + keepalive_interval: c8y_config.bridge.keepalive_interval.duration(), + }; + + Ok(params) + } +} + +impl From for BridgeConfig { + fn from(params: BridgeConfigC8yMqttServiceParams) -> Self { + let BridgeConfigC8yMqttServiceParams { + mqtt_host, + config_file, + bridge_root_cert_path, + tenant_id, + mut remote_username, + remote_password, + remote_clientid, + bridge_certfile, + bridge_keyfile, + include_local_clean_session, + bridge_location, + topic_prefix, + profile_name, + mqtt_schema, + keepalive_interval, + } = params; + + let address = mqtt_host + .to_string() + .parse::>() + .expect("MQTT service address must be in the expected format"); + + let topics: Vec = vec![ + // Outgoing + format!(r#"# out 1 {topic_prefix}/"#), + // Incoming + format!(r#"$debug/$error in 1 {topic_prefix}/"#), + ]; + + let auth_type = if remote_password.is_some() { + AuthType::Basic + } else { + AuthType::Certificate + }; + + // When cert based auth is used, provide the tenant id as the username + if auth_type == AuthType::Certificate { + remote_username = Some(tenant_id.clone()); + } + + let (include_local_clean_session, mosquitto_version) = match include_local_clean_session { + AutoFlag::True => (true, None), + AutoFlag::False => (false, None), + AutoFlag::Auto => is_mosquitto_version_above_2(), + }; + + let service_name = format!("mosquitto-{topic_prefix}-bridge"); + let health = mqtt_schema.topic_for( + &EntityTopicId::default_main_service(&service_name).unwrap(), + &Channel::Health, + ); + + Self { + cloud_name: "c8y-mqtt".into(), + config_file, + connection: if let Some(profile) = &profile_name { + format!("edge_to_c8y_mqtt_service@{profile}") + } else { + "edge_to_c8y_mqtt_service".into() + }, + address, + remote_username, + remote_password, + bridge_root_cert_path, + remote_clientid, + local_clientid: if let Some(profile) = &profile_name { + format!("CumulocityMqttService@{profile}") + } else { + "CumulocityMqttService".into() + }, + bridge_certfile, + bridge_keyfile, + use_mapper: true, + use_agent: true, + try_private: false, + start_type: "automatic".into(), + clean_session: true, + include_local_clean_session, + local_clean_session: false, + notifications: true, + notifications_local_only: true, + notification_topic: health.name, + bridge_attempt_unsubscribe: false, + topics, + bridge_location, + connection_check_attempts: 3, + auth_type, + mosquitto_version, + keepalive_interval, + proxy: None, + } + } +} + /// Return whether or not mosquitto version is >= 2.0.0 /// /// As mosquitto doesn't provide a `--version` flag, this is a guess. diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index 0f97acf0656..f12ba78e2f2 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -2,6 +2,7 @@ use crate::bridge::aws::BridgeConfigAwsParams; #[cfg(feature = "azure")] use crate::bridge::azure::BridgeConfigAzureParams; +use crate::bridge::c8y::BridgeConfigC8yMqttServiceParams; #[cfg(feature = "c8y")] use crate::bridge::c8y::BridgeConfigC8yParams; use crate::bridge::BridgeConfig; @@ -746,6 +747,10 @@ pub fn bridge_config( } }; + if c8y_config.mqtt_service.enabled && remote_password.is_none() { + // If the MQTT service connection is enabled and cert based auth is used, then tenant_id must be set + c8y_config.tenant_id.or_config_not_set()?; + } let params = BridgeConfigC8yParams { mqtt_host: c8y_config.mqtt.or_config_not_set()?.clone(), config_file: cloud.bridge_config_filename(), @@ -841,10 +846,19 @@ impl ConnectCommand { } if bridge_config.bridge_location == BridgeLocation::Mosquitto { - if let Err(err) = write_bridge_config_to_file(tedge_config, bridge_config).await { - // We want to preserve previous errors and therefore discard result of this function. - let _ = clean_up(tedge_config, bridge_config); - return Err(err.into()); + write_mosquitto_bridge_config_file(tedge_config, bridge_config).await?; + + if let Cloud::C8y(profile_name) = &self.cloud { + let c8y_config = tedge_config.c8y.try_get(profile_name.as_deref())?; + if c8y_config.mqtt_service.enabled { + let config_params = BridgeConfigC8yMqttServiceParams::try_from(( + tedge_config, + profile_name.as_deref(), + ))?; + let mqtt_svc_bridge_config = BridgeConfig::from(config_params); + write_mosquitto_bridge_config_file(tedge_config, &mqtt_svc_bridge_config) + .await?; + } } } else { use_built_in_bridge(tedge_config, bridge_config).await?; @@ -1040,6 +1054,19 @@ async fn write_generic_mosquitto_config_to_file( Ok(()) } +async fn write_mosquitto_bridge_config_file( + tedge_config: &TEdgeConfig, + bridge_config: &BridgeConfig, +) -> Result<(), ConnectError> { + if let Err(err) = write_bridge_config_to_file(tedge_config, bridge_config).await { + // We want to preserve previous errors and therefore discard result of this function. + // let _ = clean_up(tedge_config, bridge_config); + return Err(err); + } + + Ok(()) +} + async fn write_bridge_config_to_file( config: &TEdgeConfig, bridge_config: &BridgeConfig, From 4a980a07211fa2c1a2a4fb51dda4c52c69edc22c Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 3 Jul 2025 12:28:14 +0000 Subject: [PATCH 03/12] fix basic auth based connection to mqtt service --- crates/core/tedge/src/bridge/c8y.rs | 138 +++++++++++++++++++++++++++- 1 file changed, 135 insertions(+), 3 deletions(-) diff --git a/crates/core/tedge/src/bridge/c8y.rs b/crates/core/tedge/src/bridge/c8y.rs index b63d9c52363..df4932abdfb 100644 --- a/crates/core/tedge/src/bridge/c8y.rs +++ b/crates/core/tedge/src/bridge/c8y.rs @@ -209,7 +209,7 @@ impl From for BridgeConfig { pub struct BridgeConfigC8yMqttServiceParams { pub mqtt_host: HostPort, pub config_file: Cow<'static, str>, - pub tenant_id: String, + pub tenant_id: Option, pub remote_clientid: String, pub remote_username: Option, pub remote_password: Option, @@ -251,10 +251,17 @@ impl TryFrom<(&TEdgeConfig, Option<&ProfileName>)> for BridgeConfigC8yMqttServic } else { "c8y-mqtt-svc-bridge.conf".to_string() }; + + let tenant_id = if remote_password.is_some() { + None + } else { + Some(c8y_config.tenant_id.or_config_not_set()?.clone()) + }; + let params = BridgeConfigC8yMqttServiceParams { mqtt_host: c8y_config.mqtt_service.url.or_config_not_set()?.clone(), config_file: config_file.into(), - tenant_id: c8y_config.tenant_id.or_config_not_set()?.clone(), + tenant_id, bridge_root_cert_path: c8y_config.root_cert_path.clone().into(), remote_clientid: c8y_config.device.id()?.clone(), remote_username, @@ -313,7 +320,7 @@ impl From for BridgeConfig { // When cert based auth is used, provide the tenant id as the username if auth_type == AuthType::Certificate { - remote_username = Some(tenant_id.clone()); + remote_username = tenant_id; } let (include_local_clean_session, mosquitto_version) = match include_local_clean_session { @@ -405,6 +412,7 @@ pub fn is_mosquitto_version_above_2() -> (bool, Option) { #[cfg(test)] mod tests { use super::*; + use std::convert::TryFrom; #[test] fn test_bridge_config_from_c8y_params() -> anyhow::Result<()> { @@ -611,4 +619,128 @@ mod tests { Ok(()) } + + #[test] + fn test_bridge_config_from_c8y_mqtt_service_params_certificate_auth() -> anyhow::Result<()> { + let params = BridgeConfigC8yMqttServiceParams { + mqtt_host: HostPort::::try_from("test.test.io").unwrap(), + config_file: "c8y-mqtt-svc-bridge.conf".into(), + tenant_id: Some("t12345678".into()), + remote_clientid: "alpha".into(), + remote_username: None, + remote_password: None, + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + include_local_clean_session: AutoFlag::False, + bridge_location: BridgeLocation::Mosquitto, + topic_prefix: "c8y-mqtt".try_into().unwrap(), + profile_name: None, + mqtt_schema: MqttSchema::with_root("te".into()), + keepalive_interval: Duration::from_secs(45), + }; + + let bridge = BridgeConfig::from(params); + + let expected = BridgeConfig { + cloud_name: "c8y-mqtt".into(), + config_file: "c8y-mqtt-svc-bridge.conf".into(), + connection: "edge_to_c8y_mqtt_service".into(), + address: HostPort::::try_from("test.test.io:9883")?, + remote_username: Some("t12345678".into()), + remote_password: None, + remote_clientid: "alpha".into(), + local_clientid: "CumulocityMqttService".into(), + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + use_mapper: true, + use_agent: true, + topics: vec![ + "# out 1 c8y-mqtt/".into(), + "$debug/$error in 1 c8y-mqtt/".into(), + ], + try_private: false, + start_type: "automatic".into(), + clean_session: true, + include_local_clean_session: false, + local_clean_session: false, + notifications: true, + notifications_local_only: true, + notification_topic: "te/device/main/service/mosquitto-c8y-mqtt-bridge/status/health" + .into(), + bridge_attempt_unsubscribe: false, + bridge_location: BridgeLocation::Mosquitto, + connection_check_attempts: 3, + auth_type: AuthType::Certificate, + mosquitto_version: None, + keepalive_interval: Duration::from_secs(45), + proxy: None, + }; + + assert_eq!(bridge, expected); + Ok(()) + } + + #[test] + fn test_bridge_config_from_c8y_mqtt_service_params_basic_auth() -> anyhow::Result<()> { + let params = BridgeConfigC8yMqttServiceParams { + mqtt_host: HostPort::::try_from("test.test.io")?, + config_file: "c8y-mqtt-svc-bridge.conf".into(), + tenant_id: None, + remote_clientid: "alpha".into(), + remote_username: Some("octocat".into()), + remote_password: Some("abcd1234".into()), + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + include_local_clean_session: AutoFlag::False, + bridge_location: BridgeLocation::Mosquitto, + topic_prefix: "c8y-mqtt".try_into().unwrap(), + profile_name: None, + mqtt_schema: MqttSchema::with_root("te".into()), + keepalive_interval: Duration::from_secs(45), + }; + + let bridge = BridgeConfig::from(params); + + let expected = BridgeConfig { + cloud_name: "c8y-mqtt".into(), + config_file: "c8y-mqtt-svc-bridge.conf".into(), + connection: "edge_to_c8y_mqtt_service".into(), + address: HostPort::::try_from("test.test.io:9883")?, + remote_username: Some("octocat".into()), + remote_password: Some("abcd1234".into()), + remote_clientid: "alpha".into(), + local_clientid: "CumulocityMqttService".into(), + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + use_mapper: true, + use_agent: true, + topics: vec![ + "# out 1 c8y-mqtt/".into(), + "$debug/$error in 1 c8y-mqtt/".into(), + ], + try_private: false, + start_type: "automatic".into(), + clean_session: true, + include_local_clean_session: false, + local_clean_session: false, + notifications: true, + notifications_local_only: true, + notification_topic: "te/device/main/service/mosquitto-c8y-mqtt-bridge/status/health" + .into(), + bridge_attempt_unsubscribe: false, + bridge_location: BridgeLocation::Mosquitto, + connection_check_attempts: 3, + auth_type: AuthType::Basic, + mosquitto_version: None, + keepalive_interval: Duration::from_secs(45), + proxy: None, + }; + + assert_eq!(bridge, expected); + Ok(()) + } } From 8675e0f93c3e1613d78709bfe97c074af054c1a3 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 3 Jul 2025 15:44:11 +0000 Subject: [PATCH 04/12] configurable remote sub topics --- .../src/tedge_toml/tedge_config.rs | 5 +++++ crates/core/tedge/src/bridge/c8y.rs | 20 ++++++++++++++----- .../0007-c8y-mqtt-service-connect.md | 1 + 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs index 96c9ae1b345..03bf54eddff 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs @@ -454,6 +454,11 @@ define_tedge_config! { /// will be forwarded to the MQTT service endpoint on the `xyz` topic #[tedge_config(example = "c8y-mqtt", default(function = "c8y_mqtt_service_topic_prefix"))] topic_prefix: TopicPrefix, + + /// Set of MQTT topics the bridge should subscribe to on the Cumulocity MQTT service endpoint + #[tedge_config(example = "incoming/topic,another/topic,test/topic")] + #[tedge_config(default(value = "$demo/$error"))] + topics: TemplatesSet, } }, diff --git a/crates/core/tedge/src/bridge/c8y.rs b/crates/core/tedge/src/bridge/c8y.rs index df4932abdfb..07cc9d15b10 100644 --- a/crates/core/tedge/src/bridge/c8y.rs +++ b/crates/core/tedge/src/bridge/c8y.rs @@ -222,6 +222,7 @@ pub struct BridgeConfigC8yMqttServiceParams { pub profile_name: Option, pub mqtt_schema: MqttSchema, pub keepalive_interval: Duration, + pub sub_topics: TemplatesSet, } impl TryFrom<(&TEdgeConfig, Option<&ProfileName>)> for BridgeConfigC8yMqttServiceParams { @@ -274,6 +275,7 @@ impl TryFrom<(&TEdgeConfig, Option<&ProfileName>)> for BridgeConfigC8yMqttServic profile_name: profile.cloned(), mqtt_schema, keepalive_interval: c8y_config.bridge.keepalive_interval.duration(), + sub_topics: c8y_config.mqtt_service.topics.clone(), }; Ok(params) @@ -298,6 +300,7 @@ impl From for BridgeConfig { profile_name, mqtt_schema, keepalive_interval, + sub_topics, } = params; let address = mqtt_host @@ -305,13 +308,16 @@ impl From for BridgeConfig { .parse::>() .expect("MQTT service address must be in the expected format"); - let topics: Vec = vec![ + let mut topics: Vec = vec![ // Outgoing format!(r#"# out 1 {topic_prefix}/"#), - // Incoming - format!(r#"$debug/$error in 1 {topic_prefix}/"#), ]; + // Topics to subscribe to + for topic in sub_topics.0.iter() { + topics.push(format!(r#"{topic} in 1 {topic_prefix}/"#)); + } + let auth_type = if remote_password.is_some() { AuthType::Basic } else { @@ -638,6 +644,7 @@ mod tests { profile_name: None, mqtt_schema: MqttSchema::with_root("te".into()), keepalive_interval: Duration::from_secs(45), + sub_topics: TemplatesSet::try_from(vec!["test/topic", "demo/topic"])?, }; let bridge = BridgeConfig::from(params); @@ -658,7 +665,8 @@ mod tests { use_agent: true, topics: vec![ "# out 1 c8y-mqtt/".into(), - "$debug/$error in 1 c8y-mqtt/".into(), + "test/topic in 1 c8y-mqtt/".into(), + "demo/topic in 1 c8y-mqtt/".into(), ], try_private: false, start_type: "automatic".into(), @@ -700,6 +708,7 @@ mod tests { profile_name: None, mqtt_schema: MqttSchema::with_root("te".into()), keepalive_interval: Duration::from_secs(45), + sub_topics: TemplatesSet::try_from(vec!["test/topic", "demo/topic"])?, }; let bridge = BridgeConfig::from(params); @@ -720,7 +729,8 @@ mod tests { use_agent: true, topics: vec![ "# out 1 c8y-mqtt/".into(), - "$debug/$error in 1 c8y-mqtt/".into(), + "test/topic in 1 c8y-mqtt/".into(), + "demo/topic in 1 c8y-mqtt/".into(), ], try_private: false, start_type: "automatic".into(), diff --git a/design/decisions/0007-c8y-mqtt-service-connect.md b/design/decisions/0007-c8y-mqtt-service-connect.md index 9db560a4578..168e945fd2a 100644 --- a/design/decisions/0007-c8y-mqtt-service-connect.md +++ b/design/decisions/0007-c8y-mqtt-service-connect.md @@ -68,6 +68,7 @@ The mqtt service connection related configs are defined under the `[c8y]` table | `c8y.mqtt_service.enabled` | Whether the mqtt server connection must be established or not | `false` | | `c8y.mqtt_service.url` | The config URL for the MQTT service | `:9883` | | `c8y.mqtt_service.topic_prefix` | topic prefix used to bridge mqtt topics | `c8y-mqtt` | +| `c8y.mqtt_service.topics` | topics to subscribe to on the mqtt service | `$debug/$error` | To support connecting exclusively to the mqtt service, excluding the core mqtt connection, a config flag (`c8y.core_mqtt.enabled`) to enable/disable the core mqtt connection can also be introduced. From 64467ab6a039386d35d90b621309f5eeff9dcb50 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Fri, 4 Jul 2025 13:32:44 +0000 Subject: [PATCH 05/12] System test skeleton --- .../tedge_connet_mqtt_service.robot | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot diff --git a/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot new file mode 100644 index 00000000000..e2352f37141 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot @@ -0,0 +1,41 @@ +*** Settings *** +Resource ../../resources/common.resource +Library DateTime +Library Cumulocity +Library ThinEdgeIO + +Test Teardown Get Logs + + +*** Test Cases *** +Connect to Cumulocity MQTT Service endpoint + ${DEVICE_SN}= Setup connect=${False} + Execute Command tedge config set c8y.tenant_id t37070943 + Execute Command tedge config set c8y.mqtt_service.enabled true + Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic' + Execute Command tedge connect c8y + + # TODO: Subscribing to test/topic from another client + Execute Command tedge mqtt pub c8y-mqtt/test/topic '"hello"' + # TODO: Validate message received on test/topic on the other client + + Sleep 1s + +Connect to Cumulocity MQTT Service endpoint basic auth + ${DEVICE_SN}= Setup register=${False} + + Execute Command tedge config set device.id ${DEVICE_SN} + Execute Command tedge config set c8y.url "${C8Y_CONFIG.host}" + Execute Command tedge config set c8y.mqtt_service.enabled true + + Execute Command + ... cmd=printf '[c8y]\nusername = "%s"\npassword = "%s"\n' '${C8Y_CONFIG.username}' '${C8Y_CONFIG.password}' > /etc/tedge/credentials.toml + Execute Command tedge config set c8y.auth_method basic + + Execute Command tedge connect c8y + + # TODO: Subscribing to test/topic from another client + Execute Command tedge mqtt pub c8y-mqtt/test/topic '"hello"' + # TODO: Validate message received on test/topic on the other client + + Sleep 1s From 813f335198ebf26dd9020e3081d437f4d12b7a2b Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Fri, 4 Jul 2025 13:35:54 +0000 Subject: [PATCH 06/12] fixup! feat: tedge connect to c8y mqtt service endpoint --- crates/core/tedge/src/cli/connect/command.rs | 2 +- design/decisions/0007-c8y-mqtt-service-connect.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index f12ba78e2f2..726e898a0f8 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -1060,7 +1060,7 @@ async fn write_mosquitto_bridge_config_file( ) -> Result<(), ConnectError> { if let Err(err) = write_bridge_config_to_file(tedge_config, bridge_config).await { // We want to preserve previous errors and therefore discard result of this function. - // let _ = clean_up(tedge_config, bridge_config); + let _ = clean_up(tedge_config, bridge_config); return Err(err); } diff --git a/design/decisions/0007-c8y-mqtt-service-connect.md b/design/decisions/0007-c8y-mqtt-service-connect.md index 168e945fd2a..71947525235 100644 --- a/design/decisions/0007-c8y-mqtt-service-connect.md +++ b/design/decisions/0007-c8y-mqtt-service-connect.md @@ -32,6 +32,7 @@ To establish a bridge connection to the mqtt service endpoint, the following inf * host: Same as legacy mqtt endpoint * port: 9883 by default +* client id: device id (but need not be the device id itself) * auth: * for username/password based authentication * username: `/` From 86654d9b42f9215d1190ff6660b1c1c2c2387911 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Fri, 4 Jul 2025 14:06:09 +0000 Subject: [PATCH 07/12] refactor core mqtt bridge config definition --- crates/core/tedge_mapper/src/c8y/mapper.rs | 356 +++++++++++---------- 1 file changed, 184 insertions(+), 172 deletions(-) diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index f7b6fe0f4a4..ee5038214dc 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -16,6 +16,7 @@ use std::borrow::Cow; use tedge_api::entity::EntityExternalId; use tedge_api::mqtt_topics::EntityTopicId; use tedge_config::tedge_toml::ProfileName; +use tedge_config::tedge_toml::TEdgeConfigReaderC8y; use tedge_config::TEdgeConfig; use tedge_downloader_ext::DownloaderActor; use tedge_file_system_ext::FsWatchActorBuilder; @@ -25,6 +26,7 @@ use tedge_mqtt_bridge::rumqttc::Transport; use tedge_mqtt_bridge::use_credentials; use tedge_mqtt_bridge::BridgeConfig; use tedge_mqtt_bridge::MqttBridgeActorBuilder; +use tedge_mqtt_bridge::MqttOptions; use tedge_mqtt_bridge::QoS; use tedge_mqtt_ext::MqttActorBuilder; use tedge_timer_ext::TimerActor; @@ -53,179 +55,12 @@ impl TEdgeComponent for CumulocityMapper { let c8y_mapper_config = C8yMapperConfig::from_tedge_config(cfg_dir, &tedge_config, c8y_profile)?; if tedge_config.mqtt.bridge.built_in { - let smartrest_1_topics = c8y_config - .smartrest1 - .templates - .0 - .iter() - .map(|id| Cow::Owned(format!("s/dl/{id}"))); - - let smartrest_2_topics = c8y_config - .smartrest - .templates - .0 - .iter() - .map(|id| Cow::Owned(format!("s/dc/{id}"))); - - let use_certificate = c8y_config - .auth_method - .is_certificate(&c8y_config.credentials_path); - let cloud_topics = [ - ("s/dt", true), - ("s/ds", true), - ("s/dat", use_certificate), - ("s/e", true), - ("devicecontrol/notifications", true), - ("error", true), - ] - .into_iter() - .filter_map(|(topic, active)| { - if active { - Some(Cow::Borrowed(topic)) - } else { - None - } - }) - .chain(smartrest_1_topics) - .chain(smartrest_2_topics); - - let mut tc = BridgeConfig::new(); - let local_prefix = format!("{}/", c8y_config.bridge.topic_prefix.as_str()); - - for topic in cloud_topics { - tc.forward_from_remote(topic, local_prefix.clone(), "")?; - } - - // Templates - tc.forward_from_local("s/ut/#", local_prefix.clone(), "")?; - - // Static templates - tc.forward_from_local("s/us/#", local_prefix.clone(), "")?; - tc.forward_from_local("t/us/#", local_prefix.clone(), "")?; - tc.forward_from_local("q/us/#", local_prefix.clone(), "")?; - tc.forward_from_local("c/us/#", local_prefix.clone(), "")?; - - // SmartREST1 - if !use_certificate { - tc.forward_from_local("s/ul/#", local_prefix.clone(), "")?; - tc.forward_from_local("t/ul/#", local_prefix.clone(), "")?; - tc.forward_from_local("q/ul/#", local_prefix.clone(), "")?; - tc.forward_from_local("c/ul/#", local_prefix.clone(), "")?; - } - - // SmartREST2 - tc.forward_from_local("s/uc/#", local_prefix.clone(), "")?; - tc.forward_from_local("t/uc/#", local_prefix.clone(), "")?; - tc.forward_from_local("q/uc/#", local_prefix.clone(), "")?; - tc.forward_from_local("c/uc/#", local_prefix.clone(), "")?; - - // c8y JSON - tc.forward_from_local( - "inventory/managedObjects/update/#", - local_prefix.clone(), - "", + let (tc, cloud_config) = builtin_bridge_config( + &tedge_config, + c8y_config, + &c8y_mapper_config, + &c8y_mapper_name, )?; - tc.forward_from_local( - "measurement/measurements/create/#", - local_prefix.clone(), - "", - )?; - tc.forward_from_local( - "measurement/measurements/createBulk/#", - local_prefix.clone(), - "", - )?; - tc.forward_from_local("event/events/create/#", local_prefix.clone(), "")?; - tc.forward_from_local("event/events/createBulk/#", local_prefix.clone(), "")?; - tc.forward_from_local("alarm/alarms/create/#", local_prefix.clone(), "")?; - tc.forward_from_local("alarm/alarms/createBulk/#", local_prefix.clone(), "")?; - - // JWT token - if use_certificate { - tc.forward_from_local("s/uat", local_prefix.clone(), "")?; - } - - let c8y = c8y_config.mqtt.or_config_not_set()?; - let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( - c8y_config.device.id()?, - c8y.host().to_string(), - c8y.port().into(), - ); - // Cumulocity tells us not to not set clean session to false, so don't - // https://cumulocity.com/docs/device-integration/mqtt/#mqtt-clean-session - cloud_config.set_clean_session(true); - - if use_certificate { - let tls_config = tedge_config - .mqtt_client_config_rustls(c8y_config) - .context("Failed to create MQTT TLS config")?; - cloud_config.set_transport(Transport::tls_with_config(tls_config.into())); - } else { - // TODO(marcel): integrate credentials auth into MqttAuthConfig? - let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; - use_credentials( - &mut cloud_config, - &c8y_config.root_cert_path, - username, - password, - )?; - } - - let main_device_xid: EntityExternalId = c8y_config.device.id()?.into(); - let service_type = &tedge_config.service.ty; - let service_type = if service_type.is_empty() { - "service".to_string() - } else { - service_type.to_string() - }; - - // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme - - // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some - // predefined topic id instead of trying to derive it from current device? - let entity_topic_id: EntityTopicId = tedge_config - .mqtt - .device_topic_id - .clone() - .parse() - .context("Invalid device_topic_id")?; - - let mapper_service_topic_id = entity_topic_id - .default_service_for_device(&c8y_mapper_name) - .context("Can't derive service name if device topic id not in default scheme")?; - - let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id( - &mapper_service_topic_id, - &main_device_xid, - ); - - let last_will_message_mapper = - c8y_api::smartrest::inventory::service_creation_message_payload( - mapper_service_external_id.as_ref(), - &c8y_mapper_name, - service_type.as_str(), - "down", - )? - .into_inner(); - let last_will_message_bridge = - c8y_api::smartrest::inventory::service_creation_message_payload( - mapper_service_external_id.as_ref(), - &c8y_mapper_config.bridge_service_name, - service_type.as_str(), - "down", - )? - .into_inner(); - - cloud_config.set_last_will(LastWill { - topic: "s/us".into(), - qos: QoS::AtLeastOnce, - message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(), - retain: false, - }); - cloud_config.set_keep_alive(c8y_config.bridge.keepalive_interval.duration()); - - configure_proxy(&tedge_config, &mut cloud_config)?; - runtime .spawn( MqttBridgeActorBuilder::new( @@ -360,3 +195,180 @@ pub fn service_monitor_client_config( .with_last_will_message(last_will_message); Ok(mqtt_config) } + +fn builtin_bridge_config( + tedge_config: &TEdgeConfig, + c8y_config: &TEdgeConfigReaderC8y, + c8y_mapper_config: &C8yMapperConfig, + c8y_mapper_name: &str, +) -> Result<(BridgeConfig, MqttOptions), anyhow::Error> { + let smartrest_1_topics = c8y_config + .smartrest1 + .templates + .0 + .iter() + .map(|id| Cow::Owned(format!("s/dl/{id}"))); + + let smartrest_2_topics = c8y_config + .smartrest + .templates + .0 + .iter() + .map(|id| Cow::Owned(format!("s/dc/{id}"))); + + let use_certificate = c8y_config + .auth_method + .is_certificate(&c8y_config.credentials_path); + let cloud_topics = [ + ("s/dt", true), + ("s/ds", true), + ("s/dat", use_certificate), + ("s/e", true), + ("devicecontrol/notifications", true), + ("error", true), + ] + .into_iter() + .filter_map(|(topic, active)| { + if active { + Some(Cow::Borrowed(topic)) + } else { + None + } + }) + .chain(smartrest_1_topics) + .chain(smartrest_2_topics); + + let mut tc = BridgeConfig::new(); + let local_prefix = format!("{}/", c8y_config.bridge.topic_prefix.as_str()); + + for topic in cloud_topics { + tc.forward_from_remote(topic, local_prefix.clone(), "")?; + } + + // Templates + tc.forward_from_local("s/ut/#", local_prefix.clone(), "")?; + + // Static templates + tc.forward_from_local("s/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/us/#", local_prefix.clone(), "")?; + + // SmartREST1 + if !use_certificate { + tc.forward_from_local("s/ul/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/ul/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/ul/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/ul/#", local_prefix.clone(), "")?; + } + + // SmartREST2 + tc.forward_from_local("s/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/uc/#", local_prefix.clone(), "")?; + + // c8y JSON + tc.forward_from_local( + "inventory/managedObjects/update/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local( + "measurement/measurements/create/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local( + "measurement/measurements/createBulk/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local("event/events/create/#", local_prefix.clone(), "")?; + tc.forward_from_local("event/events/createBulk/#", local_prefix.clone(), "")?; + tc.forward_from_local("alarm/alarms/create/#", local_prefix.clone(), "")?; + tc.forward_from_local("alarm/alarms/createBulk/#", local_prefix.clone(), "")?; + + // JWT token + if use_certificate { + tc.forward_from_local("s/uat", local_prefix.clone(), "")?; + } + + let c8y = c8y_config.mqtt.or_config_not_set()?; + let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( + c8y_config.device.id()?, + c8y.host().to_string(), + c8y.port().into(), + ); + // Cumulocity tells us not to not set clean session to false, so don't + // https://cumulocity.com/docs/device-integration/mqtt/#mqtt-clean-session + cloud_config.set_clean_session(true); + + if use_certificate { + let tls_config = tedge_config + .mqtt_client_config_rustls(c8y_config) + .context("Failed to create MQTT TLS config")?; + cloud_config.set_transport(Transport::tls_with_config(tls_config.into())); + } else { + // TODO(marcel): integrate credentials auth into MqttAuthConfig? + let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; + use_credentials( + &mut cloud_config, + &c8y_config.root_cert_path, + username, + password, + )?; + } + + let main_device_xid: EntityExternalId = c8y_config.device.id()?.into(); + let service_type = &tedge_config.service.ty; + let service_type = if service_type.is_empty() { + "service".to_string() + } else { + service_type.to_string() + }; + + // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme + + // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some + // predefined topic id instead of trying to derive it from current device? + let entity_topic_id: EntityTopicId = tedge_config + .mqtt + .device_topic_id + .clone() + .parse() + .context("Invalid device_topic_id")?; + + let mapper_service_topic_id = entity_topic_id + .default_service_for_device(c8y_mapper_name) + .context("Can't derive service name if device topic id not in default scheme")?; + + let mapper_service_external_id = + CumulocityConverter::map_to_c8y_external_id(&mapper_service_topic_id, &main_device_xid); + + let last_will_message_mapper = c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + c8y_mapper_name, + service_type.as_str(), + "down", + )? + .into_inner(); + let last_will_message_bridge = c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + &c8y_mapper_config.bridge_service_name, + service_type.as_str(), + "down", + )? + .into_inner(); + + cloud_config.set_last_will(LastWill { + topic: "s/us".into(), + qos: QoS::AtLeastOnce, + message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(), + retain: false, + }); + cloud_config.set_keep_alive(c8y_config.bridge.keepalive_interval.duration()); + + configure_proxy(tedge_config, &mut cloud_config)?; + Ok((tc, cloud_config)) +} From 6c8700bcb5622836e51c5e2145baaf6e2ebf3777 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Fri, 4 Jul 2025 19:13:38 +0000 Subject: [PATCH 08/12] feat: builtin bridge support for mqtt service endpoint --- .../src/tedge_toml/tedge_config.rs | 2 +- crates/core/tedge_mapper/src/c8y/mapper.rs | 93 ++++++++++++++++++- .../bridge_config/builtin_bridge.robot | 2 +- .../tedge_connet_mqtt_service.robot | 14 +++ 4 files changed, 107 insertions(+), 4 deletions(-) diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs index 03bf54eddff..1e5889fce60 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs @@ -445,7 +445,7 @@ define_tedge_config! { enabled: bool, /// MQTT service endpoint for the Cumulocity tenant, with optional port. - #[tedge_config(example = "mqtt.your-tenant.cumulocity.com:1234")] + #[tedge_config(example = "mqtt.your-tenant.cumulocity.com:9883")] #[tedge_config(default(from_optional_key = "c8y.url"))] url: HostPort, diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index ee5038214dc..967c69e2ef5 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -12,9 +12,12 @@ use c8y_mapper_ext::compatibility_adapter::OldAgentAdapter; use c8y_mapper_ext::config::C8yMapperConfig; use c8y_mapper_ext::converter::CumulocityConverter; use mqtt_channel::Config; +use mqtt_channel::Topic; use std::borrow::Cow; use tedge_api::entity::EntityExternalId; use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::service_health_topic; use tedge_config::tedge_toml::ProfileName; use tedge_config::tedge_toml::TEdgeConfigReaderC8y; use tedge_config::TEdgeConfig; @@ -55,7 +58,7 @@ impl TEdgeComponent for CumulocityMapper { let c8y_mapper_config = C8yMapperConfig::from_tedge_config(cfg_dir, &tedge_config, c8y_profile)?; if tedge_config.mqtt.bridge.built_in { - let (tc, cloud_config) = builtin_bridge_config( + let (tc, cloud_config) = core_mqtt_bridge_config( &tedge_config, c8y_config, &c8y_mapper_config, @@ -73,6 +76,24 @@ impl TEdgeComponent for CumulocityMapper { .await, ) .await?; + + if c8y_config.mqtt_service.enabled { + let (bridge_config, bridge_service_name, bridge_health_topic, cloud_config) = + mqtt_service_bridge_config(&tedge_config, c8y_config)?; + + runtime + .spawn( + MqttBridgeActorBuilder::new( + &tedge_config, + &bridge_service_name, + &bridge_health_topic, + bridge_config, + cloud_config, + ) + .await, + ) + .await?; + } } else if tedge_config.proxy.address.or_none().is_some() { warn!("`proxy.address` is configured without the built-in bridge enabled. The bridge MQTT connection to the cloud will {} communicate via the configured proxy.", "not".bold()) } @@ -196,7 +217,7 @@ pub fn service_monitor_client_config( Ok(mqtt_config) } -fn builtin_bridge_config( +fn core_mqtt_bridge_config( tedge_config: &TEdgeConfig, c8y_config: &TEdgeConfigReaderC8y, c8y_mapper_config: &C8yMapperConfig, @@ -372,3 +393,71 @@ fn builtin_bridge_config( configure_proxy(tedge_config, &mut cloud_config)?; Ok((tc, cloud_config)) } + +fn mqtt_service_bridge_config( + tedge_config: &TEdgeConfig, + c8y_config: &TEdgeConfigReaderC8y, +) -> Result<(BridgeConfig, String, Topic, MqttOptions), anyhow::Error> { + let use_certificate = c8y_config + .auth_method + .is_certificate(&c8y_config.credentials_path); + let sub_topics = c8y_config.mqtt_service.topics.clone(); + let cloud_topics = sub_topics.0.into_iter().map(Cow::Owned); + + let mut tc = BridgeConfig::new(); + let local_prefix = format!("{}/", c8y_config.mqtt_service.topic_prefix.as_str()); + + for topic in cloud_topics { + tc.forward_from_remote(topic, local_prefix.clone(), "")?; + } + + // Templates + tc.forward_from_local("#", local_prefix.clone(), "")?; + + let c8y = c8y_config.mqtt_service.url.or_config_not_set()?; + let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( + c8y_config.device.id()?, + c8y.host().to_string(), + c8y.port().into(), + ); + // Cumulocity tells us not to not set clean session to false, so don't + // https://cumulocity.com/docs/device-integration/mqtt/#mqtt-clean-session + cloud_config.set_clean_session(true); + + if use_certificate { + let tls_config = tedge_config + .mqtt_client_config_rustls(c8y_config) + .context("Failed to create MQTT TLS config")?; + cloud_config.set_transport(Transport::tls_with_config(tls_config.into())); + + // When cert based auth is used, provide the tenant id as the username + let username = c8y_config.tenant_id.or_config_not_set()?; + cloud_config.set_credentials(username, ""); + } else { + // TODO(marcel): integrate credentials auth into MqttAuthConfig? + let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; + use_credentials( + &mut cloud_config, + &c8y_config.root_cert_path, + username, + password, + )?; + } + + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone()); + let bridge_service_name = format!( + "tedge-mapper-bridge-{}", + c8y_config.mqtt_service.topic_prefix.as_str() + ); + let device_topic_id: EntityTopicId = tedge_config + .mqtt + .device_topic_id + .clone() + .parse() + .context("Invalid device_topic_id")?; + + let bridge_health_topic = + service_health_topic(&mqtt_schema, &device_topic_id, &bridge_service_name); + + Ok((tc, bridge_service_name, bridge_health_topic, cloud_config)) +} diff --git a/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot b/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot index 13d68d91150..a379271aff1 100644 --- a/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot +++ b/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot @@ -10,7 +10,7 @@ Test Teardown Get Logs *** Test Cases *** Connection test [Documentation] Repeatedly test the cloud connection - FOR ${attempt} IN RANGE 0 10 1 + FOR ${attempt} IN RANGE 0 1 1 ${output}= Execute Command tedge connect c8y --test timeout=10 Should Not Contain ${output} connection check failed END diff --git a/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot index e2352f37141..4ec9343c95a 100644 --- a/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot +++ b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot @@ -39,3 +39,17 @@ Connect to Cumulocity MQTT Service endpoint basic auth # TODO: Validate message received on test/topic on the other client Sleep 1s + +Connect to Cumulocity MQTT Service endpoint builtin bridge + ${DEVICE_SN}= Setup connect=${False} + Execute Command tedge config set mqtt.bridge.built_in true + Execute Command tedge config set c8y.tenant_id t37070943 + Execute Command tedge config set c8y.mqtt_service.enabled true + Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic' + Execute Command tedge connect c8y + + # TODO: Subscribing to test/topic from another client + Execute Command tedge mqtt pub c8y-mqtt/test/topic '"hello"' + # TODO: Validate message received on test/topic on the other client + + Sleep 1s From 0a066063dd68d2231091cce5eeea7bec14ccae47 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Wed, 9 Jul 2025 21:06:12 +0000 Subject: [PATCH 09/12] fixup! feat: builtin bridge support for mqtt service endpoint --- .../tests/cumulocity/bridge_config/builtin_bridge.robot | 2 +- .../tests/tedge_connect/tedge_connet_mqtt_service.robot | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot b/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot index a379271aff1..13d68d91150 100644 --- a/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot +++ b/tests/RobotFramework/tests/cumulocity/bridge_config/builtin_bridge.robot @@ -10,7 +10,7 @@ Test Teardown Get Logs *** Test Cases *** Connection test [Documentation] Repeatedly test the cloud connection - FOR ${attempt} IN RANGE 0 1 1 + FOR ${attempt} IN RANGE 0 10 1 ${output}= Execute Command tedge connect c8y --test timeout=10 Should Not Contain ${output} connection check failed END diff --git a/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot index 4ec9343c95a..375d9230069 100644 --- a/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot +++ b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot @@ -45,7 +45,7 @@ Connect to Cumulocity MQTT Service endpoint builtin bridge Execute Command tedge config set mqtt.bridge.built_in true Execute Command tedge config set c8y.tenant_id t37070943 Execute Command tedge config set c8y.mqtt_service.enabled true - Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic' + Execute Command tedge config set c8y.mqtt_service.topics demo/topic Execute Command tedge connect c8y # TODO: Subscribing to test/topic from another client From 015ee3db5afa211849a314ba61e1b7e47511caf8 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Tue, 15 Jul 2025 13:59:09 +0000 Subject: [PATCH 10/12] fixup! feat: tedge connect to c8y mqtt service endpoint --- .../tedge_config/src/tedge_toml/tedge_config.rs | 8 +++++++- crates/core/tedge/src/cli/connect/command.rs | 11 ++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs index 1e5889fce60..94fc5fd696e 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs @@ -446,7 +446,7 @@ define_tedge_config! { /// MQTT service endpoint for the Cumulocity tenant, with optional port. #[tedge_config(example = "mqtt.your-tenant.cumulocity.com:9883")] - #[tedge_config(default(from_optional_key = "c8y.url"))] + #[tedge_config(default(from_optional_key = "c8y.mqtt"))] url: HostPort, /// The topic prefix that will be used for the Cumulocity MQTT service endpoint connection. @@ -1145,6 +1145,12 @@ fn c8y_mqtt_service_topic_prefix() -> TopicPrefix { TopicPrefix::try_new("c8y-mqtt").unwrap() } +impl From> for HostPort { + fn from(value: HostPort) -> Self { + HostPort::try_from(value.host().to_string()).expect("Source hostname must have been valid") + } +} + fn az_topic_prefix() -> TopicPrefix { TopicPrefix::try_new("az").unwrap() } diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index 726e898a0f8..d5ed44ac320 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -846,7 +846,9 @@ impl ConnectCommand { } if bridge_config.bridge_location == BridgeLocation::Mosquitto { - write_mosquitto_bridge_config_file(tedge_config, bridge_config).await?; + let spinner = Spinner::start("Creating mosquitto bridge"); + let res = write_mosquitto_bridge_config_file(tedge_config, bridge_config).await; + spinner.finish(res)?; if let Cloud::C8y(profile_name) = &self.cloud { let c8y_config = tedge_config.c8y.try_get(profile_name.as_deref())?; @@ -856,8 +858,11 @@ impl ConnectCommand { profile_name.as_deref(), ))?; let mqtt_svc_bridge_config = BridgeConfig::from(config_params); - write_mosquitto_bridge_config_file(tedge_config, &mqtt_svc_bridge_config) - .await?; + let spinner = Spinner::start("Creating mosquitto bridge to MQTT service"); + let res = + write_mosquitto_bridge_config_file(tedge_config, &mqtt_svc_bridge_config) + .await; + spinner.finish(res)?; } } } else { From b25610ab1c3556b6be04801683d1185b4245c637 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Tue, 15 Jul 2025 13:59:43 +0000 Subject: [PATCH 11/12] fixup! feat: builtin bridge support for mqtt service endpoint --- crates/core/tedge_mapper/src/c8y/mapper.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 967c69e2ef5..4623e344db9 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -408,7 +408,7 @@ fn mqtt_service_bridge_config( let local_prefix = format!("{}/", c8y_config.mqtt_service.topic_prefix.as_str()); for topic in cloud_topics { - tc.forward_from_remote(topic, local_prefix.clone(), "")?; + tc.forward_bidirectionally(topic, local_prefix.clone(), "")?; } // Templates From 7a4975c7d5d2589df73b47ce2750ed4c3e287500 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Tue, 15 Jul 2025 14:08:27 +0000 Subject: [PATCH 12/12] doc: tedge connect c8y mqtt service documentation --- docs/src/operate/c8y/connect-mqtt-service.md | 111 +++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 docs/src/operate/c8y/connect-mqtt-service.md diff --git a/docs/src/operate/c8y/connect-mqtt-service.md b/docs/src/operate/c8y/connect-mqtt-service.md new file mode 100644 index 00000000000..cfdc8b300e5 --- /dev/null +++ b/docs/src/operate/c8y/connect-mqtt-service.md @@ -0,0 +1,111 @@ +--- +title: 🚧 Connecting to Cumulocity MQTT Service +tags: [Operate, Cloud, Connection, Cumulocity] +description: Connecting %%te%% to Cumulocity +sidebar_position: 1 +draft: true +--- + +import UserContext from '@site/src/components/UserContext'; +import UserContextForm from '@site/src/components/UserContextForm'; +import BrowserWindow from '@site/src/components/BrowserWindow'; + +:::tip +#### User Context {#user-context} + +You can customize the documentation and commands shown on this page by providing relevant settings which will be reflected in the instructions. It makes it even easier to explore and use %%te%%. + + + +The user context will be persisted in your web browser's local storage. +::: + +## MQTT Service + +Cumulocity supports a generic [MQTT service endpoint](https://cumulocity.com/docs/device-integration/mqtt-service) +that allows users to send free-form messages (any arbitrary topic and payload) to it. +It does not replace the existing core MQTT endpoint and hence needs to be connected to, separately. +The following sections cover steps required to establish a connection to the Cumulocity MQTT service endpoint. + +:::caution +Cumulocity MQTT service is still a work-in-progress and the external interfaces are bound to change in future. +The %%te%% interfaces might also change accordingly. +::: + +### Configure the device + +Most of the configurations used to connect to the MQTT service endpoint of Cumulocity are same as +the ones used to connect to the core MQTT endpoint. + +1. Configure Cumulocity tenant URL: + + + + ```sh + sudo tedge config set c8y.url $C8Y_URL + ``` + + + +1. If you're using device certificates to connect to Cumulocity, set the tenant id (skip this when using username/password): + + + + ```sh + sudo tedge config set c8y.tenant_id $C8Y_TENANT_ID + ``` + + + +1. Enable connection to MQTT service endpoint: + + ```sh + sudo tedge config set c8y.mqtt_service.enabled true + ``` + +1. Provide a topic to subscribe to (only a single topic is allowed at the moment): + + ```sh + sudo tedge config set c8y.mqtt_service.topics demo/topic + ``` + +### Configure the cloud to trust the device certificate + +Follow the steps [here](./connect.md#making-the-cloud-trust-the-device) to make Cumulocity trust the device certificate. + +### Connect the device + +The device is connected to MQTT service endpoint along with the core MQTT connection to Cumulocity: + +```sh +sudo tedge connect c8y +``` + +This step establishes the bridge connection to both the core endpoint as well as the mqtt service endpoints simultaneously. + +### Validate the connection + +Once connected, all messages published to `c8y-mqtt/#` topics are forwarded to the MQTT service endpoint. + +To validate the same, connect a different MQTT client directly to the MQTT service endpoint (`mosquitto_sub` in this example) +and subscribe to the desired topic: + + + +```sh +mosquitto_sub -d -v -h $C8Y_URL -p 2883 -i test-client-123 -u $C8Y_TENANT_ID/$C8Y_USERNAME -P $C8Y_PASSWORD -t sub/topic +``` + + + +:::note +The test client must use a client id that is different from the device id. +::: + +Once subscribed, publish to the same topic from the %%te%% device: + +```sh +tedge mqtt pub c8y-mqtt/test/topic "hello world" +``` + +Now, validate that the same message is received by the test client. \ No newline at end of file