Skip to content

Commit 3d85d66

Browse files
authored
Merge pull request #3292 from albinsuresh/refactor/simplify-smartrest-publish-topics
refactor: Simplify SmartREST publish topics
2 parents fb7ff67 + 4b8213a commit 3d85d66

File tree

8 files changed

+98
-72
lines changed

8 files changed

+98
-72
lines changed

crates/core/c8y_api/src/smartrest/inventory.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
// smartrest messages are sent. There should be one comprehensive API for
1010
// generating them.
1111

12-
use crate::smartrest::topic::publish_topic_from_ancestors;
12+
use crate::smartrest::topic::publish_topic_from_parent;
1313
use crate::smartrest::topic::C8yTopic;
1414
use mqtt_channel::MqttMessage;
1515
use std::time::Duration;
@@ -29,7 +29,8 @@ pub fn child_device_creation_message(
2929
child_id: &str,
3030
device_name: Option<&str>,
3131
device_type: Option<&str>,
32-
ancestors: &[String],
32+
parent: Option<&str>,
33+
main_device_id: &str,
3334
prefix: &TopicPrefix,
3435
) -> Result<MqttMessage, InvalidValueError> {
3536
if child_id.is_empty() {
@@ -60,7 +61,7 @@ pub fn child_device_creation_message(
6061
.expect("child_id, device_name, device_type should not increase payload size over the limit");
6162

6263
Ok(MqttMessage::new(
63-
&publish_topic_from_ancestors(ancestors, prefix),
64+
&publish_topic_from_parent(parent, main_device_id, prefix),
6465
payload.into_inner(),
6566
))
6667
}
@@ -73,11 +74,12 @@ pub fn service_creation_message(
7374
service_name: &str,
7475
service_type: &str,
7576
service_status: &str,
76-
ancestors: &[String],
77+
parent: Option<&str>,
78+
main_device_id: &str,
7779
prefix: &TopicPrefix,
7880
) -> Result<MqttMessage, InvalidValueError> {
7981
Ok(MqttMessage::new(
80-
&publish_topic_from_ancestors(ancestors, prefix),
82+
&publish_topic_from_parent(parent, main_device_id, prefix),
8183
service_creation_message_payload(service_id, service_name, service_type, service_status)?
8284
.into_inner(),
8385
))

crates/core/c8y_api/src/smartrest/topic.rs

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::json_c8y::C8yAlarm;
22
use mqtt_channel::MqttError;
33
use mqtt_channel::Topic;
44
use mqtt_channel::TopicFilter;
5-
use tedge_api::entity_store::EntityMetadata;
5+
use tedge_api::entity_store::EntityExternalId;
66
use tedge_api::entity_store::EntityType;
77
use tedge_config::TopicPrefix;
88

@@ -19,13 +19,14 @@ pub enum C8yTopic {
1919
impl C8yTopic {
2020
/// Return the c8y SmartRest response topic for the given entity
2121
pub fn smartrest_response_topic(
22-
entity: &EntityMetadata,
22+
external_id: &EntityExternalId,
23+
entity_type: &EntityType,
2324
prefix: &TopicPrefix,
2425
) -> Option<Topic> {
25-
match entity.r#type {
26+
match entity_type {
2627
EntityType::MainDevice => Some(C8yTopic::upstream_topic(prefix)),
2728
EntityType::ChildDevice | EntityType::Service => {
28-
Self::ChildSmartRestResponse(entity.external_id.clone().into())
29+
Self::ChildSmartRestResponse(external_id.clone().into())
2930
.to_topic(prefix)
3031
.ok()
3132
}
@@ -77,28 +78,30 @@ impl From<&C8yAlarm> for C8yTopic {
7778
}
7879
}
7980

80-
/// Generates the SmartREST topic to publish to, for a given managed object
81-
/// from the list of external IDs of itself and all its parents.
82-
///
83-
/// The parents are appended in the reverse order,
84-
/// starting from the main device at the end of the list.
85-
/// The main device itself is represented by the root topic c8y/s/us,
86-
/// with the rest of the children appended to it at each topic level.
81+
/// Generates the SmartREST topic to publish to, from the external ID of its parent.
82+
/// If the parent is the main device, the topic would be `<prefix>/s/us`.
83+
/// For all other parent devices, the target topic would be `<prefix>/s/us/<parent-xid>`.
84+
/// For the main device with no parent, and the topic would be `<prefix>/s/us` in that case as well.
8785
///
8886
/// # Examples
8987
///
90-
/// - `["main"]` -> `c8y/s/us`
91-
/// - `["child1", "main"]` -> `c8y/s/us/child1`
92-
/// - `["child2", "child1", "main"]` -> `c8y/s/us/child1/child2`
93-
pub fn publish_topic_from_ancestors(ancestors: &[impl AsRef<str>], prefix: &TopicPrefix) -> Topic {
94-
let mut target_topic = format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}");
95-
for ancestor in ancestors.iter().rev().skip(1) {
96-
// Skipping the last ancestor as it is the main device represented by the root topic itself
97-
target_topic.push('/');
98-
target_topic.push_str(ancestor.as_ref());
88+
/// - `(Some("main"), "main", "c8y")` -> `c8y/s/us`
89+
/// - `[Some("child1"), "main", "c8y"]` -> `c8y/s/us/child1`
90+
/// - `[Some("service1"), "main", "c8y"]` -> `c8y/s/us/service1`
91+
/// - `(None, "main", "c8y")` -> `c8y/s/us`
92+
pub fn publish_topic_from_parent(
93+
parent_xid: Option<&str>,
94+
main_device_xid: &str,
95+
prefix: &TopicPrefix,
96+
) -> Topic {
97+
if let Some(parent) = parent_xid {
98+
if parent != main_device_xid {
99+
return C8yTopic::ChildSmartRestResponse(parent.to_string())
100+
.to_topic(prefix)
101+
.unwrap();
102+
}
99103
}
100-
101-
Topic::new_unchecked(&target_topic)
104+
C8yTopic::upstream_topic(prefix)
102105
}
103106

104107
#[cfg(test)]
@@ -135,13 +138,12 @@ mod tests {
135138
)
136139
}
137140

138-
#[test_case(& ["main"], "c8y2/s/us")]
139-
#[test_case(& ["foo"], "c8y2/s/us")]
140-
#[test_case(& ["child1", "main"], "c8y2/s/us/child1")]
141-
#[test_case(& ["child3", "child2", "child1", "main"], "c8y2/s/us/child1/child2/child3")]
142-
fn topic_from_ancestors(ancestors: &[&str], topic: &str) {
141+
#[test_case(None, "main-device", "c8y2/s/us")]
142+
#[test_case(Some("child01"), "main-device", "c8y2/s/us/child01")]
143+
#[test_case(Some("main-device"), "main-device", "c8y2/s/us")]
144+
fn topic_from_parent(parent_xid: Option<&str>, main_device_xid: &str, topic: &str) {
143145
let nested_child_topic =
144-
publish_topic_from_ancestors(ancestors, &"c8y2".try_into().unwrap());
146+
publish_topic_from_parent(parent_xid, main_device_xid, &"c8y2".try_into().unwrap());
145147
assert_eq!(nested_child_topic, Topic::new_unchecked(topic));
146148
}
147149
}

crates/core/tedge_api/src/entity_store.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,25 @@ impl EntityStore {
354354
self.get(&self.main_device).unwrap().external_id.clone()
355355
}
356356

357+
/// Returns the external id of the parent of the given entity.
358+
/// Returns None for the main device, that doesn't have any parents.
359+
pub fn parent_external_id(
360+
&self,
361+
entity_tid: &EntityTopicId,
362+
) -> Result<Option<&EntityExternalId>, Error> {
363+
let entity = self.try_get(entity_tid)?;
364+
let parent_xid = entity.parent.as_ref().map(|tid| {
365+
&self
366+
.try_get(tid)
367+
.expect(
368+
"for every registered entity, its parent is also guaranteed to be registered",
369+
)
370+
.external_id
371+
});
372+
373+
Ok(parent_xid)
374+
}
375+
357376
/// Returns an ordered list of ancestors of the given entity
358377
/// starting from the immediate parent all the way till the root main device.
359378
/// The last parent in the list for any entity would always be the main device.

crates/core/tedge_mapper/src/c8y/mapper.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,8 @@ pub fn service_monitor_client_config(
336336
c8y_mapper_name,
337337
service_type.as_str(),
338338
"down",
339-
&[],
339+
None,
340+
main_device_xid.as_ref(),
340341
prefix,
341342
)?;
342343

crates/extensions/c8y_mapper_ext/src/converter.rs

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_id;
4343
use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name;
4444
use c8y_api::smartrest::smartrest_serializer::EmbeddedCsv;
4545
use c8y_api::smartrest::smartrest_serializer::TextOrCsv;
46-
use c8y_api::smartrest::topic::publish_topic_from_ancestors;
4746
use c8y_api::smartrest::topic::C8yTopic;
4847
use c8y_http_proxy::handle::C8YHttpProxy;
4948
use c8y_http_proxy::messages::CreateEvent;
@@ -372,31 +371,29 @@ impl CumulocityConverter {
372371
let display_type = input.other.get("type").and_then(|v| v.as_str());
373372

374373
let entity_topic_id = &input.topic_id;
375-
let external_id = self
376-
.entity_store
377-
.try_get(entity_topic_id)
378-
.map(|e| &e.external_id)?;
374+
let entity = self.entity_store.try_get(entity_topic_id)?;
375+
let external_id = &entity.external_id;
379376
match input.r#type {
380377
EntityType::MainDevice => {
381378
self.entity_store.update(input.clone())?;
382379
Ok(vec![])
383380
}
384381
EntityType::ChildDevice => {
385-
let ancestors_external_ids =
386-
self.entity_store.ancestors_external_ids(entity_topic_id)?;
382+
let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?;
383+
387384
let child_creation_message = child_device_creation_message(
388385
external_id.as_ref(),
389386
display_name,
390387
display_type,
391-
&ancestors_external_ids,
388+
parent_xid.map(|xid| xid.as_ref()),
389+
&self.device_name,
392390
&self.config.bridge_config.c8y_prefix,
393391
)
394392
.context("Could not create device creation message")?;
395393
Ok(vec![child_creation_message])
396394
}
397395
EntityType::Service => {
398-
let ancestors_external_ids =
399-
self.entity_store.ancestors_external_ids(entity_topic_id)?;
396+
let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?;
400397

401398
let service_creation_message = service_creation_message(
402399
external_id.as_ref(),
@@ -407,7 +404,8 @@ impl CumulocityConverter {
407404
}),
408405
display_type.unwrap_or(&self.service_type),
409406
"up",
410-
&ancestors_external_ids,
407+
parent_xid.map(|xid| xid.as_ref()),
408+
&self.device_name,
411409
&self.config.bridge_config.c8y_prefix,
412410
)
413411
.context("Could not create service creation message")?;
@@ -423,14 +421,13 @@ impl CumulocityConverter {
423421
entity_topic_id: &EntityTopicId,
424422
) -> Result<Topic, ConversionError> {
425423
let entity = self.entity_store.try_get(entity_topic_id)?;
426-
427-
let mut ancestors_external_ids =
428-
self.entity_store.ancestors_external_ids(entity_topic_id)?;
429-
ancestors_external_ids.insert(0, entity.external_id.as_ref().into());
430-
Ok(publish_topic_from_ancestors(
431-
&ancestors_external_ids,
424+
let topic = C8yTopic::smartrest_response_topic(
425+
&entity.external_id,
426+
&entity.r#type,
432427
&self.config.bridge_config.c8y_prefix,
433-
))
428+
)
429+
.expect("Topic must have been valid as the external id is pre-validated");
430+
Ok(topic)
434431
}
435432

436433
/// Generates external ID of the given entity.
@@ -610,19 +607,17 @@ impl CumulocityConverter {
610607

611608
pub async fn process_health_status_message(
612609
&mut self,
613-
entity: &EntityTopicId,
610+
entity_tid: &EntityTopicId,
614611
message: &MqttMessage,
615612
) -> Result<Vec<MqttMessage>, ConversionError> {
616-
let entity_metadata = self
617-
.entity_store
618-
.get(entity)
619-
.expect("entity was registered");
613+
let entity = self.entity_store.try_get(entity_tid)?;
614+
let parent_xid = self.entity_store.parent_external_id(entity_tid)?;
620615

621-
let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?;
622616
Ok(convert_health_status_message(
623617
&self.config.mqtt_schema,
624-
entity_metadata,
625-
&ancestors_external_ids,
618+
entity,
619+
parent_xid,
620+
&self.entity_store.main_device_external_id(),
626621
message,
627622
&self.config.bridge_config.c8y_prefix,
628623
))

crates/extensions/c8y_mapper_ext/src/service_monitor.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use c8y_api::smartrest;
2+
use tedge_api::entity_store::EntityExternalId;
23
use tedge_api::entity_store::EntityMetadata;
34
use tedge_api::entity_store::EntityType;
45
use tedge_api::mqtt_topics::MqttSchema;
@@ -26,7 +27,8 @@ pub fn is_c8y_bridge_established(
2627
pub fn convert_health_status_message(
2728
mqtt_schema: &MqttSchema,
2829
entity: &EntityMetadata,
29-
ancestors_external_ids: &[String],
30+
parent_xid: Option<&EntityExternalId>,
31+
main_device_xid: &EntityExternalId,
3032
message: &MqttMessage,
3133
prefix: &TopicPrefix,
3234
) -> Vec<MqttMessage> {
@@ -56,7 +58,8 @@ pub fn convert_health_status_message(
5658
display_name,
5759
display_type,
5860
&status.to_string(),
59-
ancestors_external_ids,
61+
parent_xid.map(|v| v.as_ref()),
62+
main_device_xid.as_ref(),
6063
prefix,
6164
) else {
6265
error!("Can't create 102 for service status update");
@@ -174,7 +177,7 @@ mod tests {
174177
"service-monitoring-mosquitto-bridge-unknown-status"
175178
)]
176179
fn translate_health_status_to_c8y_service_monitoring_message(
177-
device_name: &str,
180+
main_device_id: &str,
178181
health_topic: &str,
179182
health_payload: &str,
180183
c8y_monitor_topic: &str,
@@ -193,7 +196,7 @@ mod tests {
193196

194197
let temp_dir = tempfile::tempdir().unwrap();
195198
let main_device_registration =
196-
EntityRegistrationMessage::main_device(device_name.to_string());
199+
EntityRegistrationMessage::main_device(main_device_id.to_string());
197200
let mut entity_store = EntityStore::with_main_device_and_default_service_type(
198201
MqttSchema::default(),
199202
main_device_registration,
@@ -220,14 +223,18 @@ mod tests {
220223
entity_store.update(entity_registration).unwrap();
221224

222225
let entity = entity_store.get(&entity_topic_id).unwrap();
223-
let ancestors_external_ids = entity_store
224-
.ancestors_external_ids(&entity_topic_id)
225-
.unwrap();
226+
let parent = entity
227+
.parent
228+
.as_ref()
229+
.filter(|tid| *tid != "device/main//")
230+
.map(|tid| &entity_store.try_get(tid).unwrap().external_id);
231+
dbg!(&parent);
226232

227233
let msg = convert_health_status_message(
228234
&mqtt_schema,
229235
entity,
230-
&ancestors_external_ids,
236+
parent,
237+
&main_device_id.into(),
231238
&health_message,
232239
&"c8y".try_into().unwrap(),
233240
);

crates/extensions/c8y_mapper_ext/src/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ async fn child_device_registration_mapping() {
176176
assert_received_contains_str(
177177
&mut mqtt,
178178
[(
179-
"c8y/s/us/test-device:device:child1/test-device:device:child2",
179+
"c8y/s/us/test-device:device:child2",
180180
"101,child3,child3,thin-edge.io-child",
181181
)],
182182
)
@@ -326,7 +326,7 @@ async fn service_registration_mapping() {
326326
assert_received_contains_str(
327327
&mut mqtt,
328328
[(
329-
"c8y/s/us/test-device:device:child1/test-device:device:child2",
329+
"c8y/s/us/test-device:device:child2",
330330
"102,test-device:device:child2:service:collectd,systemd,Collectd,up",
331331
)],
332332
)

crates/extensions/tedge_mqtt_ext/src/test_helpers.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ pub fn assert_message_contains_str(message: &MqttMessage, expected: (&str, &str)
4242
let expected_payload = expected.1;
4343
assert!(
4444
TopicFilter::new_unchecked(expected_topic).accept(message),
45-
"\nReceived unexpected message: {:?} \nExpected: {expected_payload:?}",
45+
"\nReceived unexpected message: {:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
4646
message
4747
);
4848
let payload = message.payload_str().expect("non UTF-8 payload");
4949
assert!(
5050
payload.contains(expected_payload),
51-
"Payload assertion failed.\n Actual: {payload:?} \nExpected: {expected_payload:?}",
51+
"Payload assertion failed.\n Actual: {payload:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
5252
)
5353
}
5454

0 commit comments

Comments
 (0)