Skip to content

Commit 38d5c1c

Browse files
authored
Merge pull request #3355 from albinsuresh/feat/3339/entity-store-query-api
feat: REST API for querying entity store
2 parents 39b2e5b + 1a2f96f commit 38d5c1c

File tree

7 files changed

+505
-11
lines changed

7 files changed

+505
-11
lines changed

crates/core/tedge_agent/src/entity_manager/server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub enum EntityStoreRequest {
2121
Get(EntityTopicId),
2222
Create(EntityRegistrationMessage),
2323
Delete(EntityTopicId),
24+
List(Option<EntityTopicId>),
2425
MqttMessage(MqttMessage),
2526
}
2627

@@ -29,6 +30,7 @@ pub enum EntityStoreResponse {
2930
Get(Option<EntityMetadata>),
3031
Create(Result<Vec<RegisteredEntityData>, entity_store::Error>),
3132
Delete(Vec<EntityTopicId>),
33+
List(Result<Vec<EntityMetadata>, entity_store::Error>),
3234
Ok,
3335
}
3436

@@ -80,6 +82,12 @@ impl Server for EntityStoreServer {
8082
let deleted_entities = self.deregister_entity(topic_id).await;
8183
EntityStoreResponse::Delete(deleted_entities)
8284
}
85+
EntityStoreRequest::List(topic_id) => {
86+
let entities = self.entity_store.list_entity_tree(topic_id.as_ref());
87+
EntityStoreResponse::List(
88+
entities.map(|entities| entities.into_iter().cloned().collect()),
89+
)
90+
}
8391
EntityStoreRequest::MqttMessage(mqtt_message) => {
8492
self.process_mqtt_message(mqtt_message).await;
8593
EntityStoreResponse::Ok

crates/core/tedge_agent/src/http_server/entity_store.rs

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl IntoResponse for Error {
5454
Error::InvalidEntityTopicId(_) => StatusCode::BAD_REQUEST,
5555
Error::EntityStoreError(err) => match err {
5656
entity_store::Error::EntityAlreadyRegistered(_) => StatusCode::CONFLICT,
57+
entity_store::Error::UnknownEntity(_) => StatusCode::NOT_FOUND,
5758
_ => StatusCode::BAD_REQUEST,
5859
},
5960
Error::EntityNotFound(_) => StatusCode::NOT_FOUND,
@@ -68,7 +69,7 @@ impl IntoResponse for Error {
6869

6970
pub(crate) fn entity_store_router(state: AgentState) -> Router {
7071
Router::new()
71-
.route("/v1/entities", post(register_entity))
72+
.route("/v1/entities", post(register_entity).get(list_entities))
7273
.route(
7374
"/v1/entities/*path",
7475
get(get_entity).delete(deregister_entity),
@@ -138,6 +139,22 @@ async fn deregister_entity(
138139
Ok(Json(deleted))
139140
}
140141

142+
async fn list_entities(
143+
State(state): State<AgentState>,
144+
) -> Result<Json<Vec<EntityMetadata>>, Error> {
145+
let response = state
146+
.entity_store_handle
147+
.clone()
148+
.await_response(EntityStoreRequest::List(None))
149+
.await?;
150+
151+
let EntityStoreResponse::List(entities) = response else {
152+
return Err(Error::InvalidEntityStoreResponse);
153+
};
154+
155+
Ok(Json(entities?))
156+
}
157+
141158
#[cfg(test)]
142159
mod tests {
143160
use super::AgentState;
@@ -153,6 +170,7 @@ mod tests {
153170
use serde_json::json;
154171
use serde_json::Map;
155172
use serde_json::Value;
173+
use std::collections::HashSet;
156174
use tedge_actors::Builder;
157175
use tedge_actors::ClientMessageBox;
158176
use tedge_actors::MessageReceiver;
@@ -447,6 +465,81 @@ mod tests {
447465
assert_eq!(response.status(), StatusCode::OK);
448466
}
449467

468+
#[tokio::test]
469+
async fn entity_list() {
470+
let TestHandle {
471+
mut app,
472+
mut entity_store_box,
473+
} = setup();
474+
475+
// Mock entity store actor response
476+
tokio::spawn(async move {
477+
if let Some(mut req) = entity_store_box.recv().await {
478+
if let EntityStoreRequest::List(_) = req.request {
479+
req.reply_to
480+
.send(EntityStoreResponse::List(Ok(vec![
481+
EntityMetadata::main_device("main".to_string()),
482+
EntityMetadata::child_device("child0".to_string()).unwrap(),
483+
EntityMetadata::child_device("child1".to_string()).unwrap(),
484+
])))
485+
.await
486+
.unwrap();
487+
}
488+
}
489+
});
490+
491+
let req = Request::builder()
492+
.method(Method::GET)
493+
.uri("/v1/entities")
494+
.body(Body::empty())
495+
.expect("request builder");
496+
497+
let response = app.call(req).await.unwrap();
498+
assert_eq!(response.status(), StatusCode::OK);
499+
500+
let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
501+
let entities: Vec<EntityMetadata> = serde_json::from_slice(&body).unwrap();
502+
503+
let entity_set = entities
504+
.iter()
505+
.map(|e| e.topic_id.as_str())
506+
.collect::<HashSet<_>>();
507+
assert!(entity_set.contains("device/main//"));
508+
assert!(entity_set.contains("device/child0//"));
509+
assert!(entity_set.contains("device/child1//"));
510+
}
511+
512+
#[tokio::test]
513+
async fn entity_list_unknown_entity() {
514+
let TestHandle {
515+
mut app,
516+
mut entity_store_box,
517+
} = setup();
518+
519+
// Mock entity store actor response
520+
tokio::spawn(async move {
521+
if let Some(mut req) = entity_store_box.recv().await {
522+
if let EntityStoreRequest::List(_) = req.request {
523+
req.reply_to
524+
.send(EntityStoreResponse::List(Err(
525+
entity_store::Error::UnknownEntity("unknown".to_string()),
526+
)))
527+
.await
528+
.unwrap();
529+
}
530+
}
531+
});
532+
533+
let req = Request::builder()
534+
.method(Method::GET)
535+
.uri("/v1/entities")
536+
.body(Body::empty())
537+
.expect("request builder");
538+
539+
let response = app.call(req).await.unwrap();
540+
assert_eq!(response.status(), StatusCode::NOT_FOUND);
541+
}
542+
450543
struct TestHandle {
451544
app: Router,
452545
entity_store_box: ServerMessageBox<EntityStoreRequest, EntityStoreResponse>,

crates/core/tedge_api/src/entity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl From<&EntityExternalId> for String {
6161
pub struct EntityMetadata {
6262
#[serde(rename = "@topic-id")]
6363
pub topic_id: EntityTopicId,
64-
#[serde(rename = "@parent")]
64+
#[serde(rename = "@parent", skip_serializing_if = "Option::is_none")]
6565
pub parent: Option<EntityTopicId>,
6666
#[serde(rename = "@type")]
6767
pub r#type: EntityType,

crates/core/tedge_api/src/entity_store.rs

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use serde_json::Value as JsonValue;
3333
use std::collections::hash_map::Entry;
3434
use std::collections::HashMap;
3535
use std::collections::HashSet;
36+
use std::collections::VecDeque;
3637
use std::path::Path;
3738

3839
// In the future, root will be read from config
@@ -514,6 +515,14 @@ impl EntityStore {
514515
pub fn cache_early_data_message(&mut self, message: MqttMessage) {
515516
self.pending_entity_store.cache_early_data_message(message)
516517
}
518+
519+
pub fn list_entity_tree<'a>(
520+
&'a self,
521+
root: Option<&'a EntityTopicId>,
522+
) -> Result<Vec<&'a EntityMetadata>, entity_store::Error> {
523+
self.entities
524+
.list_entity_tree(root.unwrap_or_else(|| self.main_device()))
525+
}
517526
}
518527

519528
/// In-memory representation of the entity tree
@@ -650,6 +659,26 @@ impl EntityTree {
650659
removed_entities.push(topic_id.clone())
651660
}
652661
}
662+
663+
pub fn list_entity_tree<'a>(
664+
&'a self,
665+
root: &'a EntityTopicId,
666+
) -> Result<Vec<&'a EntityMetadata>, Error> {
667+
if let Some(metadata) = self.entities.get(root).map(|node| node.metadata()) {
668+
let mut topic_ids = VecDeque::from(vec![root]);
669+
let mut entities = vec![metadata];
670+
671+
while let Some(topic_id) = topic_ids.pop_front() {
672+
let (child_topics, children): (Vec<_>, Vec<_>) =
673+
self.children(topic_id).into_iter().unzip();
674+
topic_ids.extend(child_topics);
675+
entities.extend(children);
676+
}
677+
Ok(entities)
678+
} else {
679+
Err(Error::UnknownEntity(root.to_string()))
680+
}
681+
}
653682
}
654683

655684
/// Represents an error encountered while updating the store.
@@ -1020,6 +1049,124 @@ mod tests {
10201049
assert!(children.iter().any(|&e| e == "device/child2//"));
10211050
}
10221051

1052+
#[test]
1053+
fn list_entity_tree() {
1054+
let temp_dir = tempfile::tempdir().unwrap();
1055+
let mut store = new_entity_store(&temp_dir, true);
1056+
1057+
// Build the entity tree:
1058+
//
1059+
// main
1060+
// |-- child0
1061+
// | |-- child00
1062+
// | | |-- child000
1063+
// |-- child1
1064+
// |-- child2
1065+
// | |-- child20
1066+
// | |-- child21
1067+
// | | |-- child210
1068+
// | | | |-- child2100
1069+
// | | |-- child211
1070+
// | |-- child22
1071+
build_entity_tree(
1072+
&mut store,
1073+
vec![
1074+
("device/child0//", "child-device", None),
1075+
("device/child1//", "child-device", None),
1076+
("device/child2//", "child-device", None),
1077+
("device/child00//", "child-device", Some("device/child0//")),
1078+
("device/child20//", "child-device", Some("device/child2//")),
1079+
("device/child21//", "child-device", Some("device/child2//")),
1080+
("device/child22//", "child-device", Some("device/child2//")),
1081+
(
1082+
"device/child000//",
1083+
"child-device",
1084+
Some("device/child00//"),
1085+
),
1086+
(
1087+
"device/child210//",
1088+
"child-device",
1089+
Some("device/child21//"),
1090+
),
1091+
(
1092+
"device/child211//",
1093+
"child-device",
1094+
Some("device/child21//"),
1095+
),
1096+
(
1097+
"device/child2100//",
1098+
"child-device",
1099+
Some("device/child210//"),
1100+
),
1101+
],
1102+
);
1103+
1104+
// List entity tree from root
1105+
let entities = store
1106+
.list_entity_tree(None)
1107+
.unwrap()
1108+
.iter()
1109+
.map(|e| e.topic_id.as_str())
1110+
.collect::<HashSet<_>>();
1111+
let expected_entities = [
1112+
"device/main//",
1113+
"device/child0//",
1114+
"device/child1//",
1115+
"device/child2//",
1116+
"device/child00//",
1117+
"device/child20//",
1118+
"device/child21//",
1119+
"device/child22//",
1120+
"device/child000//",
1121+
"device/child210//",
1122+
"device/child211//",
1123+
"device/child2100//",
1124+
]
1125+
.into_iter()
1126+
.collect::<HashSet<_>>();
1127+
assert_eq!(entities, expected_entities);
1128+
1129+
// List entity tree from child2
1130+
let child_root = EntityTopicId::from_str("device/child2//").unwrap();
1131+
let entities = store
1132+
.list_entity_tree(Some(&child_root))
1133+
.unwrap()
1134+
.iter()
1135+
.map(|e| e.topic_id.as_str())
1136+
.collect::<HashSet<_>>();
1137+
let expected_entities = [
1138+
"device/child2//",
1139+
"device/child20//",
1140+
"device/child21//",
1141+
"device/child22//",
1142+
"device/child210//",
1143+
"device/child211//",
1144+
"device/child2100//",
1145+
]
1146+
.into_iter()
1147+
.collect::<HashSet<_>>();
1148+
assert_eq!(entities, expected_entities);
1149+
}
1150+
1151+
// Each item in the vector represents (topic_id, type, parent)
1152+
fn build_entity_tree(store: &mut EntityStore, entity_tree: Vec<(&str, &str, Option<&str>)>) {
1153+
for entity in entity_tree {
1154+
let topic_id = EntityTopicId::from_str(entity.0).unwrap();
1155+
let r#type = EntityType::from_str(entity.1).unwrap();
1156+
let parent = entity.2.map(|p| EntityTopicId::from_str(p).unwrap());
1157+
1158+
store
1159+
.update(EntityRegistrationMessage {
1160+
topic_id,
1161+
r#type,
1162+
external_id: None,
1163+
parent,
1164+
other: Map::new(),
1165+
})
1166+
.unwrap();
1167+
}
1168+
}
1169+
10231170
#[test]
10241171
fn lists_services() {
10251172
let temp_dir = tempfile::tempdir().unwrap();

0 commit comments

Comments
 (0)