Skip to content

Commit 5b99e49

Browse files
authored
Merge pull request #2716 from jarhodes314/2592-mapper-bridge
Add support for built-in bridge to tedge-mapper-c8y
2 parents dc8fcb5 + 9203378 commit 5b99e49

File tree

44 files changed

+1409
-221
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1409
-221
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ tedge_file_system_ext = { path = "crates/extensions/tedge_file_system_ext" }
144144
tedge_health_ext = { path = "crates/extensions/tedge_health_ext" }
145145
tedge_http_ext = { path = "crates/extensions/tedge_http_ext" }
146146
tedge_log_manager = { path = "crates/extensions/tedge_log_manager" }
147+
tedge_mqtt_bridge = { path = "crates/extensions/tedge_mqtt_bridge" }
147148
tedge_mqtt_ext = { path = "crates/extensions/tedge_mqtt_ext" }
148149
tedge_script_ext = { path = "crates/extensions/tedge_script_ext" }
149150
tedge_signal_ext = { path = "crates/extensions/tedge_signal_ext" }

crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,20 @@ use camino::Utf8PathBuf;
1414
use certificate::CertificateError;
1515
use certificate::PemCertificate;
1616
use doku::Document;
17+
use doku::Type;
1718
use once_cell::sync::Lazy;
19+
use serde::Deserialize;
1820
use std::borrow::Cow;
21+
use std::convert::Infallible;
22+
use std::fmt;
23+
use std::fmt::Formatter;
1924
use std::io::Read;
2025
use std::net::IpAddr;
2126
use std::net::Ipv4Addr;
2227
use std::num::NonZeroU16;
28+
use std::ops::Deref;
2329
use std::path::PathBuf;
30+
use std::str::FromStr;
2431
use std::sync::Arc;
2532
use tedge_config_macros::all_or_nothing;
2633
use tedge_config_macros::define_tedge_config;
@@ -444,7 +451,19 @@ define_tedge_config! {
444451
#[tedge_config(note = "If set to 'auto', this cleans the local session accordingly the detected version of mosquitto.")]
445452
#[tedge_config(example = "auto", default(variable = "AutoFlag::Auto"))]
446453
local_cleansession: AutoFlag,
447-
}
454+
},
455+
456+
#[tedge_config(default(value = false))]
457+
#[doku(skip)] // Hide the configuration in `tedge config list --doc`
458+
built_in: bool,
459+
460+
// TODO validation
461+
/// The topic prefix that will be used for the mapper bridge MQTT topic. For instance,
462+
/// if this is set to "c8y", then messages published to `c8y/s/us` will be
463+
/// forwarded by to Cumulocity on the `s/us` topic
464+
#[tedge_config(example = "c8y", default(value = "c8y"))]
465+
#[doku(skip)] // Hide the configuration in `tedge config list --doc`
466+
topic_prefix: TopicPrefix,
448467
},
449468

450469
entity_store: {
@@ -798,6 +817,80 @@ define_tedge_config! {
798817

799818
}
800819

820+
impl ReadableKey {
821+
// This is designed to be a simple way of controlling whether values appear in the output of
822+
// `tedge config list`. Ideally this would be integrated into [define_tedge_config], see
823+
// https://github.com/thin-edge/thin-edge.io/issues/2767 for more detail on that.
824+
// Currently this accompanies `#[doku(skip)]` on the relevant configurations, which hides
825+
// them in `tedge config list --doc`. The configurations are hidden to avoid unfinished
826+
// features from being discovered.
827+
pub fn is_printable_value(self, value: &str) -> bool {
828+
match self {
829+
Self::C8yBridgeBuiltIn => value != "false",
830+
Self::C8yBridgeTopicPrefix => value != "c8y",
831+
_ => true,
832+
}
833+
}
834+
}
835+
836+
// TODO doc comment
837+
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Deserialize, serde::Serialize)]
838+
#[serde(from = "String", into = "Arc<str>")]
839+
pub struct TopicPrefix(Arc<str>);
840+
841+
impl Document for TopicPrefix {
842+
fn ty() -> Type {
843+
String::ty()
844+
}
845+
}
846+
847+
// TODO actual validation
848+
// TODO make sure we don't allow c8y-internal either, or az, or aws as those are all used
849+
impl From<String> for TopicPrefix {
850+
fn from(value: String) -> Self {
851+
Self(value.into())
852+
}
853+
}
854+
855+
impl From<&str> for TopicPrefix {
856+
fn from(value: &str) -> Self {
857+
Self(value.into())
858+
}
859+
}
860+
861+
impl FromStr for TopicPrefix {
862+
type Err = Infallible;
863+
fn from_str(s: &str) -> Result<Self, Self::Err> {
864+
Ok(s.into())
865+
}
866+
}
867+
868+
impl From<TopicPrefix> for Arc<str> {
869+
fn from(value: TopicPrefix) -> Self {
870+
value.0
871+
}
872+
}
873+
874+
// TODO is deref actually right here
875+
impl Deref for TopicPrefix {
876+
type Target = str;
877+
fn deref(&self) -> &Self::Target {
878+
&self.0
879+
}
880+
}
881+
882+
impl TopicPrefix {
883+
pub fn as_str(&self) -> &str {
884+
&self.0
885+
}
886+
}
887+
888+
impl fmt::Display for TopicPrefix {
889+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
890+
self.0.fmt(f)
891+
}
892+
}
893+
801894
fn default_http_bind_address(dto: &TEdgeConfigDto) -> IpAddr {
802895
let external_address = dto.mqtt.external.bind.address;
803896
external_address
@@ -817,7 +910,8 @@ fn device_id(reader: &TEdgeConfigReader) -> Result<String, ReadError> {
817910
fn cert_error_into_config_error(key: &'static str, err: CertificateError) -> ReadError {
818911
match &err {
819912
CertificateError::IoError(io_err) => match io_err.kind() {
820-
std::io::ErrorKind::NotFound => ReadError::ReadOnlyNotFound { key,
913+
std::io::ErrorKind::NotFound => ReadError::ReadOnlyNotFound {
914+
key,
821915
message: concat!(
822916
"The device id is read from the device certificate.\n",
823917
"To set 'device.id' to some <id>, you can use `tedge cert create --device-id <id>`.",

crates/core/c8y_api/src/http_proxy.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::collections::HashMap;
1010
use std::time::Duration;
1111
use tedge_config::mqtt_config::MqttConfigBuildError;
1212
use tedge_config::TEdgeConfig;
13+
use tedge_config::TopicPrefix;
1314
use tracing::error;
1415
use tracing::info;
1516

@@ -112,33 +113,41 @@ impl C8yEndPoint {
112113

113114
pub struct C8yMqttJwtTokenRetriever {
114115
mqtt_config: mqtt_channel::Config,
116+
topic_prefix: TopicPrefix,
115117
}
116118

117119
impl C8yMqttJwtTokenRetriever {
118120
pub fn from_tedge_config(tedge_config: &TEdgeConfig) -> Result<Self, MqttConfigBuildError> {
119121
let mqtt_config = tedge_config.mqtt_config()?;
120122

121-
Ok(Self::new(mqtt_config))
123+
Ok(Self::new(
124+
mqtt_config,
125+
tedge_config.c8y.bridge.topic_prefix.clone(),
126+
))
122127
}
123128

124-
pub fn new(mqtt_config: mqtt_channel::Config) -> Self {
125-
let topic = TopicFilter::new_unchecked("c8y/s/dat");
129+
pub fn new(mqtt_config: mqtt_channel::Config, topic_prefix: TopicPrefix) -> Self {
130+
let topic = TopicFilter::new_unchecked(&format!("{topic_prefix}/s/dat"));
126131
let mqtt_config = mqtt_config
127132
.with_no_session() // Ignore any already published tokens, possibly stale.
128133
.with_subscriptions(topic);
129134

130-
C8yMqttJwtTokenRetriever { mqtt_config }
135+
C8yMqttJwtTokenRetriever {
136+
mqtt_config,
137+
topic_prefix,
138+
}
131139
}
132140

133141
pub async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, JwtError> {
134142
let mut mqtt_con = Connection::new(&self.mqtt_config).await?;
143+
let pub_topic = format!("{}/s/uat", self.topic_prefix);
135144

136145
tokio::time::sleep(Duration::from_millis(20)).await;
137146
for _ in 0..3 {
138147
mqtt_con
139148
.published
140149
.publish(
141-
mqtt_channel::Message::new(&Topic::new_unchecked("c8y/s/uat"), "".to_string())
150+
mqtt_channel::Message::new(&Topic::new_unchecked(&pub_topic), "".to_string())
142151
.with_qos(mqtt_channel::QoS::AtMostOnce),
143152
)
144153
.await?;
@@ -184,7 +193,6 @@ pub enum JwtError {
184193

185194
#[cfg(test)]
186195
mod tests {
187-
188196
use super::*;
189197
use test_case::test_case;
190198

crates/core/c8y_api/src/json_c8y_deserializer.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@ use tedge_api::mqtt_topics::EntityTopicId;
77
use tedge_api::SoftwareModule;
88
use tedge_api::SoftwareModuleUpdate;
99
use tedge_api::SoftwareUpdateCommand;
10+
use tedge_config::TopicPrefix;
1011
use time::OffsetDateTime;
1112

1213
pub struct C8yDeviceControlTopic;
1314

1415
impl C8yDeviceControlTopic {
15-
pub fn topic() -> Topic {
16-
Topic::new_unchecked(Self::name())
16+
pub fn topic(prefix: &TopicPrefix) -> Topic {
17+
Topic::new_unchecked(&Self::name(prefix))
1718
}
1819

19-
pub fn accept(topic: &Topic) -> bool {
20-
topic.name.starts_with(Self::name())
20+
pub fn accept(topic: &Topic, prefix: &TopicPrefix) -> bool {
21+
topic.name.starts_with(&Self::name(prefix))
2122
}
2223

23-
pub fn name() -> &'static str {
24-
"c8y/devicecontrol/notifications"
24+
pub fn name(prefix: &TopicPrefix) -> String {
25+
format!("{prefix}/devicecontrol/notifications")
2526
}
2627
}
2728

@@ -422,10 +423,15 @@ pub trait C8yDeviceControlOperationHelper {
422423
}
423424

424425
impl C8yDeviceControlOperationHelper for C8yRestart {}
426+
425427
impl C8yDeviceControlOperationHelper for C8ySoftwareUpdate {}
428+
426429
impl C8yDeviceControlOperationHelper for C8yLogfileRequest {}
430+
427431
impl C8yDeviceControlOperationHelper for C8yUploadConfigFile {}
432+
428433
impl C8yDeviceControlOperationHelper for C8yDownloadConfigFile {}
434+
429435
impl C8yDeviceControlOperationHelper for C8yFirmware {}
430436

431437
#[derive(thiserror::Error, Debug)]

crates/core/c8y_api/src/smartrest/inventory.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use crate::smartrest::csv::fields_to_csv_string;
1313
use crate::smartrest::topic::publish_topic_from_ancestors;
1414
use mqtt_channel::Message;
15+
use tedge_config::TopicPrefix;
1516

1617
use super::message::sanitize_for_smartrest;
1718

@@ -23,6 +24,7 @@ pub fn child_device_creation_message(
2324
device_name: Option<&str>,
2425
device_type: Option<&str>,
2526
ancestors: &[String],
27+
prefix: &TopicPrefix,
2628
) -> Result<Message, InvalidValueError> {
2729
if child_id.is_empty() {
2830
return Err(InvalidValueError {
@@ -44,7 +46,7 @@ pub fn child_device_creation_message(
4446
}
4547

4648
Ok(Message::new(
47-
&publish_topic_from_ancestors(ancestors),
49+
&publish_topic_from_ancestors(ancestors, prefix),
4850
// XXX: if any arguments contain commas, output will be wrong
4951
format!(
5052
"101,{},{},{}",
@@ -64,6 +66,7 @@ pub fn service_creation_message(
6466
service_type: &str,
6567
service_status: &str,
6668
ancestors: &[String],
69+
prefix: &TopicPrefix,
6770
) -> Result<Message, InvalidValueError> {
6871
// TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format
6972
if service_id.is_empty() {
@@ -92,7 +95,7 @@ pub fn service_creation_message(
9295
}
9396

9497
Ok(Message::new(
95-
&publish_topic_from_ancestors(ancestors),
98+
&publish_topic_from_ancestors(ancestors, prefix),
9699
fields_to_csv_string(&[
97100
"102",
98101
service_id,
@@ -116,8 +119,9 @@ pub fn service_creation_message(
116119
pub fn service_status_update_message(
117120
external_ids: &[impl AsRef<str>],
118121
service_status: &str,
122+
prefix: &TopicPrefix,
119123
) -> Message {
120-
let topic = publish_topic_from_ancestors(external_ids);
124+
let topic = publish_topic_from_ancestors(external_ids, prefix);
121125

122126
let service_status =
123127
sanitize_for_smartrest(service_status, super::message::MAX_PAYLOAD_LIMIT_IN_BYTES);

crates/core/c8y_api/src/smartrest/smartrest_serializer.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use serde::ser::SerializeSeq;
77
use serde::Deserialize;
88
use serde::Serialize;
99
use serde::Serializer;
10+
use tedge_config::TopicPrefix;
1011
use tracing::warn;
1112

1213
pub type SmartRest = String;
@@ -204,20 +205,20 @@ where
204205

205206
/// Helper to generate a SmartREST operation status message
206207
pub trait OperationStatusMessage {
207-
fn executing() -> Message {
208-
Self::create_message(Self::status_executing())
208+
fn executing(prefix: &TopicPrefix) -> Message {
209+
Self::create_message(Self::status_executing(), prefix)
209210
}
210211

211-
fn successful(parameter: Option<&str>) -> Message {
212-
Self::create_message(Self::status_successful(parameter))
212+
fn successful(parameter: Option<&str>, prefix: &TopicPrefix) -> Message {
213+
Self::create_message(Self::status_successful(parameter), prefix)
213214
}
214215

215-
fn failed(failure_reason: &str) -> Message {
216-
Self::create_message(Self::status_failed(failure_reason))
216+
fn failed(failure_reason: &str, prefix: &TopicPrefix) -> Message {
217+
Self::create_message(Self::status_failed(failure_reason), prefix)
217218
}
218219

219-
fn create_message(payload: SmartRest) -> Message {
220-
let topic = C8yTopic::SmartRestResponse.to_topic().unwrap(); // never fail
220+
fn create_message(payload: SmartRest, prefix: &TopicPrefix) -> Message {
221+
let topic = C8yTopic::SmartRestResponse.to_topic(prefix).unwrap(); // never fail
221222
Message::new(&topic, payload)
222223
}
223224

0 commit comments

Comments
 (0)