diff --git a/crates/common/tedge_config/src/tedge_toml/models/mod.rs b/crates/common/tedge_config/src/tedge_toml/models/mod.rs index 1cf84288031..9d0d1a58d4c 100644 --- a/crates/common/tedge_config/src/tedge_toml/models/mod.rs +++ b/crates/common/tedge_config/src/tedge_toml/models/mod.rs @@ -24,6 +24,7 @@ use strum::Display; pub const HTTPS_PORT: u16 = 443; pub const MQTT_TLS_PORT: u16 = 8883; +pub const MQTT_SVC_TLS_PORT: u16 = 9883; pub use self::apt_config::*; pub use self::auto::*; diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs index 307a665fd82..94fc5fd696e 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config.rs @@ -20,6 +20,7 @@ use super::models::SoftwareManagementApiFlag; use super::models::TemplatesSet; use super::models::TopicPrefix; use super::models::HTTPS_PORT; +use super::models::MQTT_SVC_TLS_PORT; use super::models::MQTT_TLS_PORT; use super::tedge_config_location::TEdgeConfigLocation; use crate::models::AbsolutePath; @@ -226,6 +227,10 @@ define_tedge_config! { #[tedge_config(reader(private))] url: ConnectUrl, + /// Cumulocity tenant ID + #[tedge_config(example = "t12345678")] + tenant_id: String, + /// The path where Cumulocity root certificate(s) are stored #[tedge_config(note = "The value can be a directory path as well as the path of the certificate file.")] #[tedge_config(example = "/etc/tedge/c8y-trusted-root-certificates.pem", default(function = "default_root_cert_path"))] @@ -433,6 +438,28 @@ define_tedge_config! { #[tedge_config(example = "60m", default(from_str = "60m"))] interval: SecondsOrHumanTime, }, + + mqtt_service: { + /// Wheather to connect to the MQTT service endpoint or not + #[tedge_config(example = "true", default(value = false))] + enabled: bool, + + /// MQTT service endpoint for the Cumulocity tenant, with optional port. + #[tedge_config(example = "mqtt.your-tenant.cumulocity.com:9883")] + #[tedge_config(default(from_optional_key = "c8y.mqtt"))] + url: HostPort, + + /// The topic prefix that will be used for the Cumulocity MQTT service endpoint connection. + /// For instance, if set to "c8y-mqtt", then messages published to `c8y-mqtt/xyz` + /// will be forwarded to the MQTT service endpoint on the `xyz` topic + #[tedge_config(example = "c8y-mqtt", default(function = "c8y_mqtt_service_topic_prefix"))] + topic_prefix: TopicPrefix, + + /// Set of MQTT topics the bridge should subscribe to on the Cumulocity MQTT service endpoint + #[tedge_config(example = "incoming/topic,another/topic,test/topic")] + #[tedge_config(default(value = "$demo/$error"))] + topics: TemplatesSet, + } }, #[tedge_config(deprecated_name = "azure")] // for 0.1.0 compatibility @@ -1114,6 +1141,16 @@ fn c8y_topic_prefix() -> TopicPrefix { TopicPrefix::try_new("c8y").unwrap() } +fn c8y_mqtt_service_topic_prefix() -> TopicPrefix { + TopicPrefix::try_new("c8y-mqtt").unwrap() +} + +impl From> for HostPort { + fn from(value: HostPort) -> Self { + HostPort::try_from(value.host().to_string()).expect("Source hostname must have been valid") + } +} + fn az_topic_prefix() -> TopicPrefix { TopicPrefix::try_new("az").unwrap() } diff --git a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs index 2a21f0c7e82..b92b2d241c9 100644 --- a/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs +++ b/crates/common/tedge_config/src/tedge_toml/tedge_config/append_remove.rs @@ -40,6 +40,7 @@ impl_append_remove_for_single_value!( ConnectUrl, HostPort, HostPort, + HostPort, bool, IpAddr, u16, diff --git a/crates/core/tedge/src/bridge/c8y.rs b/crates/core/tedge/src/bridge/c8y.rs index 84beb11fff8..07cc9d15b10 100644 --- a/crates/core/tedge/src/bridge/c8y.rs +++ b/crates/core/tedge/src/bridge/c8y.rs @@ -1,6 +1,8 @@ use super::config::ProxyWrapper; use super::BridgeConfig; use crate::bridge::config::BridgeLocation; +use crate::ConfigError; +use c8y_api::http_proxy::read_c8y_credentials; use camino::Utf8PathBuf; use std::borrow::Cow; use std::process::Command; @@ -13,8 +15,10 @@ use tedge_config::models::AutoFlag; use tedge_config::models::HostPort; use tedge_config::models::TemplatesSet; use tedge_config::models::TopicPrefix; +use tedge_config::models::MQTT_SVC_TLS_PORT; use tedge_config::models::MQTT_TLS_PORT; use tedge_config::tedge_toml::ProfileName; +use tedge_config::TEdgeConfig; use which::which; #[derive(Debug)] @@ -201,6 +205,184 @@ impl From for BridgeConfig { } } +#[derive(Debug)] +pub struct BridgeConfigC8yMqttServiceParams { + pub mqtt_host: HostPort, + pub config_file: Cow<'static, str>, + pub tenant_id: Option, + pub remote_clientid: String, + pub remote_username: Option, + pub remote_password: Option, + pub bridge_root_cert_path: Utf8PathBuf, + pub bridge_certfile: Utf8PathBuf, + pub bridge_keyfile: Utf8PathBuf, + pub include_local_clean_session: AutoFlag, + pub bridge_location: BridgeLocation, + pub topic_prefix: TopicPrefix, + pub profile_name: Option, + pub mqtt_schema: MqttSchema, + pub keepalive_interval: Duration, + pub sub_topics: TemplatesSet, +} + +impl TryFrom<(&TEdgeConfig, Option<&ProfileName>)> for BridgeConfigC8yMqttServiceParams { + type Error = ConfigError; + + fn try_from(value: (&TEdgeConfig, Option<&ProfileName>)) -> Result { + let (config, profile) = value; + + let bridge_location = match config.mqtt.bridge.built_in { + true => BridgeLocation::BuiltIn, + false => BridgeLocation::Mosquitto, + }; + let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone()); + let c8y_config = config.c8y.try_get(profile)?; + + let (remote_username, remote_password) = + match c8y_config.auth_method.to_type(&c8y_config.credentials_path) { + AuthType::Certificate => (None, None), + AuthType::Basic => { + let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; + (Some(username), Some(password)) + } + }; + + let config_file = if let Some(profile_name) = &profile { + format!("c8y-mqtt-svc@{profile_name}-bridge.conf") + } else { + "c8y-mqtt-svc-bridge.conf".to_string() + }; + + let tenant_id = if remote_password.is_some() { + None + } else { + Some(c8y_config.tenant_id.or_config_not_set()?.clone()) + }; + + let params = BridgeConfigC8yMqttServiceParams { + mqtt_host: c8y_config.mqtt_service.url.or_config_not_set()?.clone(), + config_file: config_file.into(), + tenant_id, + bridge_root_cert_path: c8y_config.root_cert_path.clone().into(), + remote_clientid: c8y_config.device.id()?.clone(), + remote_username, + remote_password, + bridge_certfile: c8y_config.device.cert_path.clone().into(), + bridge_keyfile: c8y_config.device.key_path.clone().into(), + include_local_clean_session: c8y_config.bridge.include.local_cleansession.clone(), + bridge_location, + topic_prefix: c8y_config.mqtt_service.topic_prefix.clone(), + profile_name: profile.cloned(), + mqtt_schema, + keepalive_interval: c8y_config.bridge.keepalive_interval.duration(), + sub_topics: c8y_config.mqtt_service.topics.clone(), + }; + + Ok(params) + } +} + +impl From for BridgeConfig { + fn from(params: BridgeConfigC8yMqttServiceParams) -> Self { + let BridgeConfigC8yMqttServiceParams { + mqtt_host, + config_file, + bridge_root_cert_path, + tenant_id, + mut remote_username, + remote_password, + remote_clientid, + bridge_certfile, + bridge_keyfile, + include_local_clean_session, + bridge_location, + topic_prefix, + profile_name, + mqtt_schema, + keepalive_interval, + sub_topics, + } = params; + + let address = mqtt_host + .to_string() + .parse::>() + .expect("MQTT service address must be in the expected format"); + + let mut topics: Vec = vec![ + // Outgoing + format!(r#"# out 1 {topic_prefix}/"#), + ]; + + // Topics to subscribe to + for topic in sub_topics.0.iter() { + topics.push(format!(r#"{topic} in 1 {topic_prefix}/"#)); + } + + let auth_type = if remote_password.is_some() { + AuthType::Basic + } else { + AuthType::Certificate + }; + + // When cert based auth is used, provide the tenant id as the username + if auth_type == AuthType::Certificate { + remote_username = tenant_id; + } + + let (include_local_clean_session, mosquitto_version) = match include_local_clean_session { + AutoFlag::True => (true, None), + AutoFlag::False => (false, None), + AutoFlag::Auto => is_mosquitto_version_above_2(), + }; + + let service_name = format!("mosquitto-{topic_prefix}-bridge"); + let health = mqtt_schema.topic_for( + &EntityTopicId::default_main_service(&service_name).unwrap(), + &Channel::Health, + ); + + Self { + cloud_name: "c8y-mqtt".into(), + config_file, + connection: if let Some(profile) = &profile_name { + format!("edge_to_c8y_mqtt_service@{profile}") + } else { + "edge_to_c8y_mqtt_service".into() + }, + address, + remote_username, + remote_password, + bridge_root_cert_path, + remote_clientid, + local_clientid: if let Some(profile) = &profile_name { + format!("CumulocityMqttService@{profile}") + } else { + "CumulocityMqttService".into() + }, + bridge_certfile, + bridge_keyfile, + use_mapper: true, + use_agent: true, + try_private: false, + start_type: "automatic".into(), + clean_session: true, + include_local_clean_session, + local_clean_session: false, + notifications: true, + notifications_local_only: true, + notification_topic: health.name, + bridge_attempt_unsubscribe: false, + topics, + bridge_location, + connection_check_attempts: 3, + auth_type, + mosquitto_version, + keepalive_interval, + proxy: None, + } + } +} + /// Return whether or not mosquitto version is >= 2.0.0 /// /// As mosquitto doesn't provide a `--version` flag, this is a guess. @@ -236,6 +418,7 @@ pub fn is_mosquitto_version_above_2() -> (bool, Option) { #[cfg(test)] mod tests { use super::*; + use std::convert::TryFrom; #[test] fn test_bridge_config_from_c8y_params() -> anyhow::Result<()> { @@ -442,4 +625,132 @@ mod tests { Ok(()) } + + #[test] + fn test_bridge_config_from_c8y_mqtt_service_params_certificate_auth() -> anyhow::Result<()> { + let params = BridgeConfigC8yMqttServiceParams { + mqtt_host: HostPort::::try_from("test.test.io").unwrap(), + config_file: "c8y-mqtt-svc-bridge.conf".into(), + tenant_id: Some("t12345678".into()), + remote_clientid: "alpha".into(), + remote_username: None, + remote_password: None, + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + include_local_clean_session: AutoFlag::False, + bridge_location: BridgeLocation::Mosquitto, + topic_prefix: "c8y-mqtt".try_into().unwrap(), + profile_name: None, + mqtt_schema: MqttSchema::with_root("te".into()), + keepalive_interval: Duration::from_secs(45), + sub_topics: TemplatesSet::try_from(vec!["test/topic", "demo/topic"])?, + }; + + let bridge = BridgeConfig::from(params); + + let expected = BridgeConfig { + cloud_name: "c8y-mqtt".into(), + config_file: "c8y-mqtt-svc-bridge.conf".into(), + connection: "edge_to_c8y_mqtt_service".into(), + address: HostPort::::try_from("test.test.io:9883")?, + remote_username: Some("t12345678".into()), + remote_password: None, + remote_clientid: "alpha".into(), + local_clientid: "CumulocityMqttService".into(), + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + use_mapper: true, + use_agent: true, + topics: vec![ + "# out 1 c8y-mqtt/".into(), + "test/topic in 1 c8y-mqtt/".into(), + "demo/topic in 1 c8y-mqtt/".into(), + ], + try_private: false, + start_type: "automatic".into(), + clean_session: true, + include_local_clean_session: false, + local_clean_session: false, + notifications: true, + notifications_local_only: true, + notification_topic: "te/device/main/service/mosquitto-c8y-mqtt-bridge/status/health" + .into(), + bridge_attempt_unsubscribe: false, + bridge_location: BridgeLocation::Mosquitto, + connection_check_attempts: 3, + auth_type: AuthType::Certificate, + mosquitto_version: None, + keepalive_interval: Duration::from_secs(45), + proxy: None, + }; + + assert_eq!(bridge, expected); + Ok(()) + } + + #[test] + fn test_bridge_config_from_c8y_mqtt_service_params_basic_auth() -> anyhow::Result<()> { + let params = BridgeConfigC8yMqttServiceParams { + mqtt_host: HostPort::::try_from("test.test.io")?, + config_file: "c8y-mqtt-svc-bridge.conf".into(), + tenant_id: None, + remote_clientid: "alpha".into(), + remote_username: Some("octocat".into()), + remote_password: Some("abcd1234".into()), + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + include_local_clean_session: AutoFlag::False, + bridge_location: BridgeLocation::Mosquitto, + topic_prefix: "c8y-mqtt".try_into().unwrap(), + profile_name: None, + mqtt_schema: MqttSchema::with_root("te".into()), + keepalive_interval: Duration::from_secs(45), + sub_topics: TemplatesSet::try_from(vec!["test/topic", "demo/topic"])?, + }; + + let bridge = BridgeConfig::from(params); + + let expected = BridgeConfig { + cloud_name: "c8y-mqtt".into(), + config_file: "c8y-mqtt-svc-bridge.conf".into(), + connection: "edge_to_c8y_mqtt_service".into(), + address: HostPort::::try_from("test.test.io:9883")?, + remote_username: Some("octocat".into()), + remote_password: Some("abcd1234".into()), + remote_clientid: "alpha".into(), + local_clientid: "CumulocityMqttService".into(), + bridge_root_cert_path: Utf8PathBuf::from("./test_root.pem"), + bridge_certfile: "./test-certificate.pem".into(), + bridge_keyfile: "./test-private-key.pem".into(), + use_mapper: true, + use_agent: true, + topics: vec![ + "# out 1 c8y-mqtt/".into(), + "test/topic in 1 c8y-mqtt/".into(), + "demo/topic in 1 c8y-mqtt/".into(), + ], + try_private: false, + start_type: "automatic".into(), + clean_session: true, + include_local_clean_session: false, + local_clean_session: false, + notifications: true, + notifications_local_only: true, + notification_topic: "te/device/main/service/mosquitto-c8y-mqtt-bridge/status/health" + .into(), + bridge_attempt_unsubscribe: false, + bridge_location: BridgeLocation::Mosquitto, + connection_check_attempts: 3, + auth_type: AuthType::Basic, + mosquitto_version: None, + keepalive_interval: Duration::from_secs(45), + proxy: None, + }; + + assert_eq!(bridge, expected); + Ok(()) + } } diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index 0f97acf0656..d5ed44ac320 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -2,6 +2,7 @@ use crate::bridge::aws::BridgeConfigAwsParams; #[cfg(feature = "azure")] use crate::bridge::azure::BridgeConfigAzureParams; +use crate::bridge::c8y::BridgeConfigC8yMqttServiceParams; #[cfg(feature = "c8y")] use crate::bridge::c8y::BridgeConfigC8yParams; use crate::bridge::BridgeConfig; @@ -746,6 +747,10 @@ pub fn bridge_config( } }; + if c8y_config.mqtt_service.enabled && remote_password.is_none() { + // If the MQTT service connection is enabled and cert based auth is used, then tenant_id must be set + c8y_config.tenant_id.or_config_not_set()?; + } let params = BridgeConfigC8yParams { mqtt_host: c8y_config.mqtt.or_config_not_set()?.clone(), config_file: cloud.bridge_config_filename(), @@ -841,10 +846,24 @@ impl ConnectCommand { } if bridge_config.bridge_location == BridgeLocation::Mosquitto { - if let Err(err) = write_bridge_config_to_file(tedge_config, bridge_config).await { - // We want to preserve previous errors and therefore discard result of this function. - let _ = clean_up(tedge_config, bridge_config); - return Err(err.into()); + let spinner = Spinner::start("Creating mosquitto bridge"); + let res = write_mosquitto_bridge_config_file(tedge_config, bridge_config).await; + spinner.finish(res)?; + + if let Cloud::C8y(profile_name) = &self.cloud { + let c8y_config = tedge_config.c8y.try_get(profile_name.as_deref())?; + if c8y_config.mqtt_service.enabled { + let config_params = BridgeConfigC8yMqttServiceParams::try_from(( + tedge_config, + profile_name.as_deref(), + ))?; + let mqtt_svc_bridge_config = BridgeConfig::from(config_params); + let spinner = Spinner::start("Creating mosquitto bridge to MQTT service"); + let res = + write_mosquitto_bridge_config_file(tedge_config, &mqtt_svc_bridge_config) + .await; + spinner.finish(res)?; + } } } else { use_built_in_bridge(tedge_config, bridge_config).await?; @@ -1040,6 +1059,19 @@ async fn write_generic_mosquitto_config_to_file( Ok(()) } +async fn write_mosquitto_bridge_config_file( + tedge_config: &TEdgeConfig, + bridge_config: &BridgeConfig, +) -> Result<(), ConnectError> { + if let Err(err) = write_bridge_config_to_file(tedge_config, bridge_config).await { + // We want to preserve previous errors and therefore discard result of this function. + let _ = clean_up(tedge_config, bridge_config); + return Err(err); + } + + Ok(()) +} + async fn write_bridge_config_to_file( config: &TEdgeConfig, bridge_config: &BridgeConfig, diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index f7b6fe0f4a4..4623e344db9 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -12,10 +12,14 @@ use c8y_mapper_ext::compatibility_adapter::OldAgentAdapter; use c8y_mapper_ext::config::C8yMapperConfig; use c8y_mapper_ext::converter::CumulocityConverter; use mqtt_channel::Config; +use mqtt_channel::Topic; use std::borrow::Cow; use tedge_api::entity::EntityExternalId; use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::service_health_topic; use tedge_config::tedge_toml::ProfileName; +use tedge_config::tedge_toml::TEdgeConfigReaderC8y; use tedge_config::TEdgeConfig; use tedge_downloader_ext::DownloaderActor; use tedge_file_system_ext::FsWatchActorBuilder; @@ -25,6 +29,7 @@ use tedge_mqtt_bridge::rumqttc::Transport; use tedge_mqtt_bridge::use_credentials; use tedge_mqtt_bridge::BridgeConfig; use tedge_mqtt_bridge::MqttBridgeActorBuilder; +use tedge_mqtt_bridge::MqttOptions; use tedge_mqtt_bridge::QoS; use tedge_mqtt_ext::MqttActorBuilder; use tedge_timer_ext::TimerActor; @@ -53,179 +58,12 @@ impl TEdgeComponent for CumulocityMapper { let c8y_mapper_config = C8yMapperConfig::from_tedge_config(cfg_dir, &tedge_config, c8y_profile)?; if tedge_config.mqtt.bridge.built_in { - let smartrest_1_topics = c8y_config - .smartrest1 - .templates - .0 - .iter() - .map(|id| Cow::Owned(format!("s/dl/{id}"))); - - let smartrest_2_topics = c8y_config - .smartrest - .templates - .0 - .iter() - .map(|id| Cow::Owned(format!("s/dc/{id}"))); - - let use_certificate = c8y_config - .auth_method - .is_certificate(&c8y_config.credentials_path); - let cloud_topics = [ - ("s/dt", true), - ("s/ds", true), - ("s/dat", use_certificate), - ("s/e", true), - ("devicecontrol/notifications", true), - ("error", true), - ] - .into_iter() - .filter_map(|(topic, active)| { - if active { - Some(Cow::Borrowed(topic)) - } else { - None - } - }) - .chain(smartrest_1_topics) - .chain(smartrest_2_topics); - - let mut tc = BridgeConfig::new(); - let local_prefix = format!("{}/", c8y_config.bridge.topic_prefix.as_str()); - - for topic in cloud_topics { - tc.forward_from_remote(topic, local_prefix.clone(), "")?; - } - - // Templates - tc.forward_from_local("s/ut/#", local_prefix.clone(), "")?; - - // Static templates - tc.forward_from_local("s/us/#", local_prefix.clone(), "")?; - tc.forward_from_local("t/us/#", local_prefix.clone(), "")?; - tc.forward_from_local("q/us/#", local_prefix.clone(), "")?; - tc.forward_from_local("c/us/#", local_prefix.clone(), "")?; - - // SmartREST1 - if !use_certificate { - tc.forward_from_local("s/ul/#", local_prefix.clone(), "")?; - tc.forward_from_local("t/ul/#", local_prefix.clone(), "")?; - tc.forward_from_local("q/ul/#", local_prefix.clone(), "")?; - tc.forward_from_local("c/ul/#", local_prefix.clone(), "")?; - } - - // SmartREST2 - tc.forward_from_local("s/uc/#", local_prefix.clone(), "")?; - tc.forward_from_local("t/uc/#", local_prefix.clone(), "")?; - tc.forward_from_local("q/uc/#", local_prefix.clone(), "")?; - tc.forward_from_local("c/uc/#", local_prefix.clone(), "")?; - - // c8y JSON - tc.forward_from_local( - "inventory/managedObjects/update/#", - local_prefix.clone(), - "", - )?; - tc.forward_from_local( - "measurement/measurements/create/#", - local_prefix.clone(), - "", - )?; - tc.forward_from_local( - "measurement/measurements/createBulk/#", - local_prefix.clone(), - "", + let (tc, cloud_config) = core_mqtt_bridge_config( + &tedge_config, + c8y_config, + &c8y_mapper_config, + &c8y_mapper_name, )?; - tc.forward_from_local("event/events/create/#", local_prefix.clone(), "")?; - tc.forward_from_local("event/events/createBulk/#", local_prefix.clone(), "")?; - tc.forward_from_local("alarm/alarms/create/#", local_prefix.clone(), "")?; - tc.forward_from_local("alarm/alarms/createBulk/#", local_prefix.clone(), "")?; - - // JWT token - if use_certificate { - tc.forward_from_local("s/uat", local_prefix.clone(), "")?; - } - - let c8y = c8y_config.mqtt.or_config_not_set()?; - let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( - c8y_config.device.id()?, - c8y.host().to_string(), - c8y.port().into(), - ); - // Cumulocity tells us not to not set clean session to false, so don't - // https://cumulocity.com/docs/device-integration/mqtt/#mqtt-clean-session - cloud_config.set_clean_session(true); - - if use_certificate { - let tls_config = tedge_config - .mqtt_client_config_rustls(c8y_config) - .context("Failed to create MQTT TLS config")?; - cloud_config.set_transport(Transport::tls_with_config(tls_config.into())); - } else { - // TODO(marcel): integrate credentials auth into MqttAuthConfig? - let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; - use_credentials( - &mut cloud_config, - &c8y_config.root_cert_path, - username, - password, - )?; - } - - let main_device_xid: EntityExternalId = c8y_config.device.id()?.into(); - let service_type = &tedge_config.service.ty; - let service_type = if service_type.is_empty() { - "service".to_string() - } else { - service_type.to_string() - }; - - // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme - - // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some - // predefined topic id instead of trying to derive it from current device? - let entity_topic_id: EntityTopicId = tedge_config - .mqtt - .device_topic_id - .clone() - .parse() - .context("Invalid device_topic_id")?; - - let mapper_service_topic_id = entity_topic_id - .default_service_for_device(&c8y_mapper_name) - .context("Can't derive service name if device topic id not in default scheme")?; - - let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id( - &mapper_service_topic_id, - &main_device_xid, - ); - - let last_will_message_mapper = - c8y_api::smartrest::inventory::service_creation_message_payload( - mapper_service_external_id.as_ref(), - &c8y_mapper_name, - service_type.as_str(), - "down", - )? - .into_inner(); - let last_will_message_bridge = - c8y_api::smartrest::inventory::service_creation_message_payload( - mapper_service_external_id.as_ref(), - &c8y_mapper_config.bridge_service_name, - service_type.as_str(), - "down", - )? - .into_inner(); - - cloud_config.set_last_will(LastWill { - topic: "s/us".into(), - qos: QoS::AtLeastOnce, - message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(), - retain: false, - }); - cloud_config.set_keep_alive(c8y_config.bridge.keepalive_interval.duration()); - - configure_proxy(&tedge_config, &mut cloud_config)?; - runtime .spawn( MqttBridgeActorBuilder::new( @@ -238,6 +76,24 @@ impl TEdgeComponent for CumulocityMapper { .await, ) .await?; + + if c8y_config.mqtt_service.enabled { + let (bridge_config, bridge_service_name, bridge_health_topic, cloud_config) = + mqtt_service_bridge_config(&tedge_config, c8y_config)?; + + runtime + .spawn( + MqttBridgeActorBuilder::new( + &tedge_config, + &bridge_service_name, + &bridge_health_topic, + bridge_config, + cloud_config, + ) + .await, + ) + .await?; + } } else if tedge_config.proxy.address.or_none().is_some() { warn!("`proxy.address` is configured without the built-in bridge enabled. The bridge MQTT connection to the cloud will {} communicate via the configured proxy.", "not".bold()) } @@ -360,3 +216,248 @@ pub fn service_monitor_client_config( .with_last_will_message(last_will_message); Ok(mqtt_config) } + +fn core_mqtt_bridge_config( + tedge_config: &TEdgeConfig, + c8y_config: &TEdgeConfigReaderC8y, + c8y_mapper_config: &C8yMapperConfig, + c8y_mapper_name: &str, +) -> Result<(BridgeConfig, MqttOptions), anyhow::Error> { + let smartrest_1_topics = c8y_config + .smartrest1 + .templates + .0 + .iter() + .map(|id| Cow::Owned(format!("s/dl/{id}"))); + + let smartrest_2_topics = c8y_config + .smartrest + .templates + .0 + .iter() + .map(|id| Cow::Owned(format!("s/dc/{id}"))); + + let use_certificate = c8y_config + .auth_method + .is_certificate(&c8y_config.credentials_path); + let cloud_topics = [ + ("s/dt", true), + ("s/ds", true), + ("s/dat", use_certificate), + ("s/e", true), + ("devicecontrol/notifications", true), + ("error", true), + ] + .into_iter() + .filter_map(|(topic, active)| { + if active { + Some(Cow::Borrowed(topic)) + } else { + None + } + }) + .chain(smartrest_1_topics) + .chain(smartrest_2_topics); + + let mut tc = BridgeConfig::new(); + let local_prefix = format!("{}/", c8y_config.bridge.topic_prefix.as_str()); + + for topic in cloud_topics { + tc.forward_from_remote(topic, local_prefix.clone(), "")?; + } + + // Templates + tc.forward_from_local("s/ut/#", local_prefix.clone(), "")?; + + // Static templates + tc.forward_from_local("s/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/us/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/us/#", local_prefix.clone(), "")?; + + // SmartREST1 + if !use_certificate { + tc.forward_from_local("s/ul/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/ul/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/ul/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/ul/#", local_prefix.clone(), "")?; + } + + // SmartREST2 + tc.forward_from_local("s/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("t/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("q/uc/#", local_prefix.clone(), "")?; + tc.forward_from_local("c/uc/#", local_prefix.clone(), "")?; + + // c8y JSON + tc.forward_from_local( + "inventory/managedObjects/update/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local( + "measurement/measurements/create/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local( + "measurement/measurements/createBulk/#", + local_prefix.clone(), + "", + )?; + tc.forward_from_local("event/events/create/#", local_prefix.clone(), "")?; + tc.forward_from_local("event/events/createBulk/#", local_prefix.clone(), "")?; + tc.forward_from_local("alarm/alarms/create/#", local_prefix.clone(), "")?; + tc.forward_from_local("alarm/alarms/createBulk/#", local_prefix.clone(), "")?; + + // JWT token + if use_certificate { + tc.forward_from_local("s/uat", local_prefix.clone(), "")?; + } + + let c8y = c8y_config.mqtt.or_config_not_set()?; + let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( + c8y_config.device.id()?, + c8y.host().to_string(), + c8y.port().into(), + ); + // Cumulocity tells us not to not set clean session to false, so don't + // https://cumulocity.com/docs/device-integration/mqtt/#mqtt-clean-session + cloud_config.set_clean_session(true); + + if use_certificate { + let tls_config = tedge_config + .mqtt_client_config_rustls(c8y_config) + .context("Failed to create MQTT TLS config")?; + cloud_config.set_transport(Transport::tls_with_config(tls_config.into())); + } else { + // TODO(marcel): integrate credentials auth into MqttAuthConfig? + let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; + use_credentials( + &mut cloud_config, + &c8y_config.root_cert_path, + username, + password, + )?; + } + + let main_device_xid: EntityExternalId = c8y_config.device.id()?.into(); + let service_type = &tedge_config.service.ty; + let service_type = if service_type.is_empty() { + "service".to_string() + } else { + service_type.to_string() + }; + + // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme + + // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some + // predefined topic id instead of trying to derive it from current device? + let entity_topic_id: EntityTopicId = tedge_config + .mqtt + .device_topic_id + .clone() + .parse() + .context("Invalid device_topic_id")?; + + let mapper_service_topic_id = entity_topic_id + .default_service_for_device(c8y_mapper_name) + .context("Can't derive service name if device topic id not in default scheme")?; + + let mapper_service_external_id = + CumulocityConverter::map_to_c8y_external_id(&mapper_service_topic_id, &main_device_xid); + + let last_will_message_mapper = c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + c8y_mapper_name, + service_type.as_str(), + "down", + )? + .into_inner(); + let last_will_message_bridge = c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + &c8y_mapper_config.bridge_service_name, + service_type.as_str(), + "down", + )? + .into_inner(); + + cloud_config.set_last_will(LastWill { + topic: "s/us".into(), + qos: QoS::AtLeastOnce, + message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(), + retain: false, + }); + cloud_config.set_keep_alive(c8y_config.bridge.keepalive_interval.duration()); + + configure_proxy(tedge_config, &mut cloud_config)?; + Ok((tc, cloud_config)) +} + +fn mqtt_service_bridge_config( + tedge_config: &TEdgeConfig, + c8y_config: &TEdgeConfigReaderC8y, +) -> Result<(BridgeConfig, String, Topic, MqttOptions), anyhow::Error> { + let use_certificate = c8y_config + .auth_method + .is_certificate(&c8y_config.credentials_path); + let sub_topics = c8y_config.mqtt_service.topics.clone(); + let cloud_topics = sub_topics.0.into_iter().map(Cow::Owned); + + let mut tc = BridgeConfig::new(); + let local_prefix = format!("{}/", c8y_config.mqtt_service.topic_prefix.as_str()); + + for topic in cloud_topics { + tc.forward_bidirectionally(topic, local_prefix.clone(), "")?; + } + + // Templates + tc.forward_from_local("#", local_prefix.clone(), "")?; + + let c8y = c8y_config.mqtt_service.url.or_config_not_set()?; + let mut cloud_config = tedge_mqtt_bridge::MqttOptions::new( + c8y_config.device.id()?, + c8y.host().to_string(), + c8y.port().into(), + ); + // Cumulocity tells us not to not set clean session to false, so don't + // https://cumulocity.com/docs/device-integration/mqtt/#mqtt-clean-session + cloud_config.set_clean_session(true); + + if use_certificate { + let tls_config = tedge_config + .mqtt_client_config_rustls(c8y_config) + .context("Failed to create MQTT TLS config")?; + cloud_config.set_transport(Transport::tls_with_config(tls_config.into())); + + // When cert based auth is used, provide the tenant id as the username + let username = c8y_config.tenant_id.or_config_not_set()?; + cloud_config.set_credentials(username, ""); + } else { + // TODO(marcel): integrate credentials auth into MqttAuthConfig? + let (username, password) = read_c8y_credentials(&c8y_config.credentials_path)?; + use_credentials( + &mut cloud_config, + &c8y_config.root_cert_path, + username, + password, + )?; + } + + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.clone()); + let bridge_service_name = format!( + "tedge-mapper-bridge-{}", + c8y_config.mqtt_service.topic_prefix.as_str() + ); + let device_topic_id: EntityTopicId = tedge_config + .mqtt + .device_topic_id + .clone() + .parse() + .context("Invalid device_topic_id")?; + + let bridge_health_topic = + service_health_topic(&mqtt_schema, &device_topic_id, &bridge_service_name); + + Ok((tc, bridge_service_name, bridge_health_topic, cloud_config)) +} diff --git a/design/decisions/0007-c8y-mqtt-service-connect.md b/design/decisions/0007-c8y-mqtt-service-connect.md new file mode 100644 index 00000000000..71947525235 --- /dev/null +++ b/design/decisions/0007-c8y-mqtt-service-connect.md @@ -0,0 +1,173 @@ +# Connect to C8Y MQTT service endpoint + +* Date: __2024-06-23__ +* Status: __New__ + +## Requirement + +Cumulocity has introduced a new [MQTT service endpoint](https://cumulocity.com/docs/device-integration/mqtt-service) +that allows users to send free-form messages (any arbitrary topic and payload) to Cumulocity. +It does not replace the existing core MQTT endpoint and hence a separate connection is required. +To support the same, `tedge connect` must be enhanced to connect to this new endpoint as well, +in addition to the existing connection to the core endpoint. + +## MQTT service specifications/limitations + +- The mqtt service endpoint is exposed at the same tenant URL via port 9883 (with TLS). +- Both username/password based and cert based authentication is supported. +- Only connections with clean session enabled are supported. Persistent connection attempts are rejected. +- Subscribing to wildcard topics (`#` and `+`) and system topics (starting with `$SYS`) are not supported. +- Publishing/subscribing to legacy C8Y topics (SmartREST and JSON over MQTT) are not supported either. +- QoS 2 is not supported. +- Retained flag is accepted, but ignored. +- There are limits to the number of topics that can be created + and the number of messages that can be queued per topic. + Publishes are rejected when the queue is full. +- Each published message has a time-to-live, after which it is cleared from the queue. + The subscribed clients are not notified of expired messages. + +## Solutions + +To establish a bridge connection to the mqtt service endpoint, the following info must be provided in the connect request: + +* host: Same as legacy mqtt endpoint +* port: 9883 by default +* client id: device id (but need not be the device id itself) +* auth: + * for username/password based authentication + * username: `/` + * password + * for cert based authentication + * device cert path + * device key path + * server root ca path + * username: `` +* bridge topics: `/#` + +:::note +Even for cert based auth, the `username` must be provided with the tenant id. +::: + +The following are the solution approaches that are considered: + +### Approach 1: Connect to mqtt service along with core mqtt connection + +The mqtt service bridge connection is established along with `tedge connect c8y` command. + +:::note +For the builtin bridge functionality, whether to spawn a different mapper, +or establish a secondary connection from the existing mapper itself is to be decided. +The same applies for the trivial mapping rule that adds the timestamp to the payload. +::: + +#### Configuration + +The mqtt service connection related configs are defined under the `[c8y]` table in `tedge.toml` : + +| Config | Description | Default | +| ------ | ----------- | ------- | +| `c8y.mqtt_service.enabled` | Whether the mqtt server connection must be established or not | `false` | +| `c8y.mqtt_service.url` | The config URL for the MQTT service | `:9883` | +| `c8y.mqtt_service.topic_prefix` | topic prefix used to bridge mqtt topics | `c8y-mqtt` | +| `c8y.mqtt_service.topics` | topics to subscribe to on the mqtt service | `$debug/$error` | + +To support connecting exclusively to the mqtt service, excluding the core mqtt connection, +a config flag (`c8y.core_mqtt.enabled`) to enable/disable the core mqtt connection can also be introduced. + + +To connect to the mqtt service, it must be enabled before `tedge connect c8y` is executed: + +```sh +tedge config set c8y.mqtt_service.enabled "true" +``` + +:::note +The tenant id which must be passed in the username is retrieved using the `https:///tenant/currentTenant` REST API. +An alternative is to take this as an additional `c8y.tenant_id` configuration. +::: + +#### Connection + +The command to connect to C8Y remains the same: + +```sh +tedge connect c8y +``` + +The connection to the mqtt service is established only if it is enabled. + +**Pros** + +- Makes the mqtt service connection seamless for existing users, with minimal configuration. +- Reuse most of the existing configurations like `c8y.url`, `c8y.device.key_path`, `c8y.device.cert_path` etc. +- Users can choose **not** to connect to the new endpoint, by keeping it disabled. +- In future, if the core mqtt connection is replaced with the new mqtt service, + we can just switch what is enabled and disabled by default. + +**Cons*** + +- The mqtt service connection is tied to the direct `c8y` connection, to retrieve the tenant id + (although this dependency can be removed if tenant id is accepted via config). + +### Approach 2: Connect to mqtt service as an independent cloud endpoint + +Do not establish both the core mqtt connection and the mqtt service connection together with `tedge connect c8y` command, +but treat the mqtt service as an independent/generic MQTT broker endpoint like connecting to AWS or AZ endpoints. + +#### Configuration + +As an independent cloud endpoint, the following config values must be provided: + +```sh +tedge config set remote.url example.cumulocity.com:9883 +tedge config set remote.username t123456 +tedge config set remote.bridge.topic_prefix c8y-mqtt +``` + +The following setting can also be configured, though they all have default values: + +| Config | Description | Default | +| ------ | ----------- | ------- | +| `remote.device.cert_path` | The device cert path used for the authentication | Same as `device.cert_path` | +| `remote.device.key_path` | The device cert path used for the authentication | Same as `device.key_path` | + +In addition to the above, pretty much all the configs defined in the `tedge.toml` for `az` and `aws` can be defined +for this generic `remote` endpoint as well, as none of them are cloud specific but applies to any mqtt broker. + +#### Connection + +```sh +tedge connect remote +``` + +**Pros** + +- Clear separation from `tedge connect c8y` allows the second connection to be established and disconnected on demand. +- Existing users are not affected, as the `tedge connect c8y` behavior stays the same. +- Allows customers to connect exclusively to the new mqtt service, if they don't need the core mqtt connection. +- There is no real dependency on Cumulocity as multiple `remote` profiles can be used to connect to any generic broker. + +**Cons** + +- Many of the config parameters defined in the `c8y` profile like the `url`, `cert_path` etc + will have to be duplicated in `remote` profile as well. +- When the device is connected to multiple c8y cloud profiles, + corresponding `remote` profiles must be configured explicitly per c8y instance. + +### Approach 3: Connect to all configured cloud instances with tedge connect + +This is more of an extension of `Approach 2`, where instead of explicitly connecting to each cloud instance/profile, +a `tedge connect` command (without the cloud arg), detects all the cloud profiles that are configured/enabled +and connects to each endpoint one by one. + +Only the cloud instances with a `url` explicitly configured are considered as enabled. +An additional `enabled` flag can be added to each cloud profile, to skip connecting to it, even when it is configured. + +#### Configuration + +TBD + +#### Connection + +TBD + diff --git a/docs/src/operate/c8y/connect-mqtt-service.md b/docs/src/operate/c8y/connect-mqtt-service.md new file mode 100644 index 00000000000..cfdc8b300e5 --- /dev/null +++ b/docs/src/operate/c8y/connect-mqtt-service.md @@ -0,0 +1,111 @@ +--- +title: 🚧 Connecting to Cumulocity MQTT Service +tags: [Operate, Cloud, Connection, Cumulocity] +description: Connecting %%te%% to Cumulocity +sidebar_position: 1 +draft: true +--- + +import UserContext from '@site/src/components/UserContext'; +import UserContextForm from '@site/src/components/UserContextForm'; +import BrowserWindow from '@site/src/components/BrowserWindow'; + +:::tip +#### User Context {#user-context} + +You can customize the documentation and commands shown on this page by providing relevant settings which will be reflected in the instructions. It makes it even easier to explore and use %%te%%. + + + +The user context will be persisted in your web browser's local storage. +::: + +## MQTT Service + +Cumulocity supports a generic [MQTT service endpoint](https://cumulocity.com/docs/device-integration/mqtt-service) +that allows users to send free-form messages (any arbitrary topic and payload) to it. +It does not replace the existing core MQTT endpoint and hence needs to be connected to, separately. +The following sections cover steps required to establish a connection to the Cumulocity MQTT service endpoint. + +:::caution +Cumulocity MQTT service is still a work-in-progress and the external interfaces are bound to change in future. +The %%te%% interfaces might also change accordingly. +::: + +### Configure the device + +Most of the configurations used to connect to the MQTT service endpoint of Cumulocity are same as +the ones used to connect to the core MQTT endpoint. + +1. Configure Cumulocity tenant URL: + + + + ```sh + sudo tedge config set c8y.url $C8Y_URL + ``` + + + +1. If you're using device certificates to connect to Cumulocity, set the tenant id (skip this when using username/password): + + + + ```sh + sudo tedge config set c8y.tenant_id $C8Y_TENANT_ID + ``` + + + +1. Enable connection to MQTT service endpoint: + + ```sh + sudo tedge config set c8y.mqtt_service.enabled true + ``` + +1. Provide a topic to subscribe to (only a single topic is allowed at the moment): + + ```sh + sudo tedge config set c8y.mqtt_service.topics demo/topic + ``` + +### Configure the cloud to trust the device certificate + +Follow the steps [here](./connect.md#making-the-cloud-trust-the-device) to make Cumulocity trust the device certificate. + +### Connect the device + +The device is connected to MQTT service endpoint along with the core MQTT connection to Cumulocity: + +```sh +sudo tedge connect c8y +``` + +This step establishes the bridge connection to both the core endpoint as well as the mqtt service endpoints simultaneously. + +### Validate the connection + +Once connected, all messages published to `c8y-mqtt/#` topics are forwarded to the MQTT service endpoint. + +To validate the same, connect a different MQTT client directly to the MQTT service endpoint (`mosquitto_sub` in this example) +and subscribe to the desired topic: + + + +```sh +mosquitto_sub -d -v -h $C8Y_URL -p 2883 -i test-client-123 -u $C8Y_TENANT_ID/$C8Y_USERNAME -P $C8Y_PASSWORD -t sub/topic +``` + + + +:::note +The test client must use a client id that is different from the device id. +::: + +Once subscribed, publish to the same topic from the %%te%% device: + +```sh +tedge mqtt pub c8y-mqtt/test/topic "hello world" +``` + +Now, validate that the same message is received by the test client. \ No newline at end of file diff --git a/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot new file mode 100644 index 00000000000..375d9230069 --- /dev/null +++ b/tests/RobotFramework/tests/tedge_connect/tedge_connet_mqtt_service.robot @@ -0,0 +1,55 @@ +*** Settings *** +Resource ../../resources/common.resource +Library DateTime +Library Cumulocity +Library ThinEdgeIO + +Test Teardown Get Logs + + +*** Test Cases *** +Connect to Cumulocity MQTT Service endpoint + ${DEVICE_SN}= Setup connect=${False} + Execute Command tedge config set c8y.tenant_id t37070943 + Execute Command tedge config set c8y.mqtt_service.enabled true + Execute Command tedge config set c8y.mqtt_service.topics 'sub/topic,demo/topic' + Execute Command tedge connect c8y + + # TODO: Subscribing to test/topic from another client + Execute Command tedge mqtt pub c8y-mqtt/test/topic '"hello"' + # TODO: Validate message received on test/topic on the other client + + Sleep 1s + +Connect to Cumulocity MQTT Service endpoint basic auth + ${DEVICE_SN}= Setup register=${False} + + Execute Command tedge config set device.id ${DEVICE_SN} + Execute Command tedge config set c8y.url "${C8Y_CONFIG.host}" + Execute Command tedge config set c8y.mqtt_service.enabled true + + Execute Command + ... cmd=printf '[c8y]\nusername = "%s"\npassword = "%s"\n' '${C8Y_CONFIG.username}' '${C8Y_CONFIG.password}' > /etc/tedge/credentials.toml + Execute Command tedge config set c8y.auth_method basic + + Execute Command tedge connect c8y + + # TODO: Subscribing to test/topic from another client + Execute Command tedge mqtt pub c8y-mqtt/test/topic '"hello"' + # TODO: Validate message received on test/topic on the other client + + Sleep 1s + +Connect to Cumulocity MQTT Service endpoint builtin bridge + ${DEVICE_SN}= Setup connect=${False} + Execute Command tedge config set mqtt.bridge.built_in true + Execute Command tedge config set c8y.tenant_id t37070943 + Execute Command tedge config set c8y.mqtt_service.enabled true + Execute Command tedge config set c8y.mqtt_service.topics demo/topic + Execute Command tedge connect c8y + + # TODO: Subscribing to test/topic from another client + Execute Command tedge mqtt pub c8y-mqtt/test/topic '"hello"' + # TODO: Validate message received on test/topic on the other client + + Sleep 1s