Skip to content

Commit 8391207

Browse files
authored
Merge pull request #3383 from albinsuresh/feat/3339/entity-store-query-api-filtering
feat: Query parameter support for entity store query API
2 parents 93a3bb8 + 2a4a658 commit 8391207

File tree

10 files changed

+1098
-188
lines changed

10 files changed

+1098
-188
lines changed

crates/core/tedge_agent/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ repository = { workspace = true }
1212
[dependencies]
1313
anyhow = { workspace = true }
1414
async-trait = { workspace = true }
15-
axum = { workspace = true }
15+
axum = { workspace = true, features = ["macros"] }
1616
axum-server = { workspace = true }
1717
axum_tls = { workspace = true }
1818
camino = { workspace = true }

crates/core/tedge_agent/src/entity_manager/server.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use tedge_actors::Server;
66
use tedge_api::entity::EntityMetadata;
77
use tedge_api::entity_store;
88
use tedge_api::entity_store::EntityRegistrationMessage;
9+
use tedge_api::entity_store::ListFilters;
910
use tedge_api::mqtt_topics::Channel;
1011
use tedge_api::mqtt_topics::EntityTopicId;
1112
use tedge_api::mqtt_topics::MqttSchema;
@@ -21,7 +22,7 @@ pub enum EntityStoreRequest {
2122
Get(EntityTopicId),
2223
Create(EntityRegistrationMessage),
2324
Delete(EntityTopicId),
24-
List(Option<EntityTopicId>),
25+
List(ListFilters),
2526
MqttMessage(MqttMessage),
2627
}
2728

@@ -30,7 +31,7 @@ pub enum EntityStoreResponse {
3031
Get(Option<EntityMetadata>),
3132
Create(Result<Vec<RegisteredEntityData>, entity_store::Error>),
3233
Delete(Vec<EntityTopicId>),
33-
List(Result<Vec<EntityMetadata>, entity_store::Error>),
34+
List(Vec<EntityMetadata>),
3435
Ok,
3536
}
3637

@@ -92,11 +93,9 @@ impl Server for EntityStoreServer {
9293
let deleted_entities = self.deregister_entity(topic_id).await;
9394
EntityStoreResponse::Delete(deleted_entities)
9495
}
95-
EntityStoreRequest::List(topic_id) => {
96-
let entities = self.entity_store.list_entity_tree(topic_id.as_ref());
97-
EntityStoreResponse::List(
98-
entities.map(|entities| entities.into_iter().cloned().collect()),
99-
)
96+
EntityStoreRequest::List(filters) => {
97+
let entities = self.entity_store.list_entity_tree(filters);
98+
EntityStoreResponse::List(entities.into_iter().cloned().collect())
10099
}
101100
EntityStoreRequest::MqttMessage(mqtt_message) => {
102101
self.process_mqtt_message(mqtt_message).await;

crates/core/tedge_agent/src/http_server/entity_store.rs

Lines changed: 227 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use super::server::AgentState;
1212
use crate::entity_manager::server::EntityStoreRequest;
1313
use crate::entity_manager::server::EntityStoreResponse;
1414
use axum::extract::Path;
15+
use axum::extract::Query;
1516
use axum::extract::State;
1617
use axum::response::IntoResponse;
1718
use axum::response::Response;
@@ -20,14 +21,72 @@ use axum::routing::post;
2021
use axum::Json;
2122
use axum::Router;
2223
use hyper::StatusCode;
24+
use serde::Deserialize;
2325
use serde_json::json;
2426
use std::str::FromStr;
2527
use tedge_api::entity::EntityMetadata;
28+
use tedge_api::entity::InvalidEntityType;
2629
use tedge_api::entity_store;
2730
use tedge_api::entity_store::EntityRegistrationMessage;
31+
use tedge_api::entity_store::ListFilters;
2832
use tedge_api::mqtt_topics::EntityTopicId;
2933
use tedge_api::mqtt_topics::TopicIdError;
3034

35+
#[derive(Debug, Default, Deserialize)]
36+
pub struct ListParams {
37+
#[serde(default)]
38+
root: Option<String>,
39+
#[serde(default)]
40+
parent: Option<String>,
41+
#[serde(default)]
42+
r#type: Option<String>,
43+
}
44+
45+
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
46+
pub enum InputValidationError {
47+
#[error(transparent)]
48+
InvalidEntityType(#[from] InvalidEntityType),
49+
#[error(transparent)]
50+
InvalidEntityTopic(#[from] TopicIdError),
51+
#[error("The provided parameters: {0} and {1} are mutually exclusive. Use either one.")]
52+
IncompatibleParams(String, String),
53+
}
54+
55+
impl TryFrom<ListParams> for ListFilters {
56+
type Error = InputValidationError;
57+
58+
fn try_from(params: ListParams) -> Result<Self, Self::Error> {
59+
let root = params
60+
.root
61+
.filter(|v| !v.is_empty())
62+
.map(|val| val.parse())
63+
.transpose()?;
64+
let parent = params
65+
.parent
66+
.filter(|v| !v.is_empty())
67+
.map(|val| val.parse())
68+
.transpose()?;
69+
let r#type = params
70+
.r#type
71+
.filter(|v| !v.is_empty())
72+
.map(|val| val.parse())
73+
.transpose()?;
74+
75+
if root.is_some() && parent.is_some() {
76+
return Err(InputValidationError::IncompatibleParams(
77+
"root".to_string(),
78+
"parent".to_string(),
79+
));
80+
}
81+
82+
Ok(Self {
83+
root,
84+
parent,
85+
r#type,
86+
})
87+
}
88+
}
89+
3190
#[derive(thiserror::Error, Debug)]
3291
enum Error {
3392
#[error(transparent)]
@@ -46,6 +105,9 @@ enum Error {
46105

47106
#[error("Received unexpected response from entity store")]
48107
InvalidEntityStoreResponse,
108+
109+
#[error(transparent)]
110+
InvalidInput(#[from] InputValidationError),
49111
}
50112

51113
impl IntoResponse for Error {
@@ -60,10 +122,11 @@ impl IntoResponse for Error {
60122
Error::EntityNotFound(_) => StatusCode::NOT_FOUND,
61123
Error::ChannelError(_) => StatusCode::INTERNAL_SERVER_ERROR,
62124
Error::InvalidEntityStoreResponse => StatusCode::INTERNAL_SERVER_ERROR,
125+
Error::InvalidInput(_) => StatusCode::BAD_REQUEST,
63126
};
64127
let error_message = self.to_string();
65128

66-
(status_code, error_message).into_response()
129+
(status_code, Json(json!({ "error": error_message }))).into_response()
67130
}
68131
}
69132

@@ -141,18 +204,20 @@ async fn deregister_entity(
141204

142205
async fn list_entities(
143206
State(state): State<AgentState>,
207+
Query(params): Query<ListParams>,
144208
) -> Result<Json<Vec<EntityMetadata>>, Error> {
209+
let filters = params.try_into()?;
145210
let response = state
146211
.entity_store_handle
147212
.clone()
148-
.await_response(EntityStoreRequest::List(None))
213+
.await_response(EntityStoreRequest::List(filters))
149214
.await?;
150215

151216
let EntityStoreResponse::List(entities) = response else {
152217
return Err(Error::InvalidEntityStoreResponse);
153218
};
154219

155-
Ok(Json(entities?))
220+
Ok(Json(entities))
156221
}
157222

158223
#[cfg(test)]
@@ -254,6 +319,13 @@ mod tests {
254319

255320
let response = app.call(req).await.unwrap();
256321
assert_eq!(response.status(), StatusCode::NOT_FOUND);
322+
323+
let body = response.into_body().collect().await.unwrap().to_bytes();
324+
let entity: Value = serde_json::from_slice(&body).unwrap();
325+
assert_json_eq!(
326+
entity,
327+
json!( {"error":"Entity not found with topic id: device/test-child//"})
328+
);
257329
}
258330

259331
#[tokio::test]
@@ -345,6 +417,13 @@ mod tests {
345417

346418
let response = app.call(req).await.unwrap();
347419
assert_eq!(response.status(), StatusCode::CONFLICT);
420+
421+
let body = response.into_body().collect().await.unwrap().to_bytes();
422+
let entity: Value = serde_json::from_slice(&body).unwrap();
423+
assert_json_eq!(
424+
entity,
425+
json!( {"error":"An entity with topic id: device/test-child// is already registered"})
426+
);
348427
}
349428

350429
#[tokio::test]
@@ -391,6 +470,13 @@ mod tests {
391470

392471
let response = app.call(req).await.unwrap();
393472
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
473+
474+
let body = response.into_body().collect().await.unwrap().to_bytes();
475+
let entity: Value = serde_json::from_slice(&body).unwrap();
476+
assert_json_eq!(
477+
entity,
478+
json!( {"error":"Specified parent \"test-child\" does not exist in the store"})
479+
);
394480
}
395481

396482
#[tokio::test]
@@ -478,11 +564,11 @@ mod tests {
478564
if let Some(mut req) = entity_store_box.recv().await {
479565
if let EntityStoreRequest::List(_) = req.request {
480566
req.reply_to
481-
.send(EntityStoreResponse::List(Ok(vec![
567+
.send(EntityStoreResponse::List(vec![
482568
EntityMetadata::main_device(),
483569
EntityMetadata::child_device("child0".to_string()).unwrap(),
484570
EntityMetadata::child_device("child1".to_string()).unwrap(),
485-
])))
571+
]))
486572
.await
487573
.unwrap();
488574
}
@@ -522,9 +608,7 @@ mod tests {
522608
if let Some(mut req) = entity_store_box.recv().await {
523609
if let EntityStoreRequest::List(_) = req.request {
524610
req.reply_to
525-
.send(EntityStoreResponse::List(Err(
526-
entity_store::Error::UnknownEntity("unknown".to_string()),
527-
)))
611+
.send(EntityStoreResponse::List(vec![]))
528612
.await
529613
.unwrap();
530614
}
@@ -538,7 +622,141 @@ mod tests {
538622
.expect("request builder");
539623

540624
let response = app.call(req).await.unwrap();
541-
assert_eq!(response.status(), StatusCode::NOT_FOUND);
625+
assert_eq!(response.status(), StatusCode::OK);
626+
627+
let body = response.into_body().collect().await.unwrap().to_bytes();
628+
let entities: Vec<EntityMetadata> = serde_json::from_slice(&body).unwrap();
629+
assert!(entities.is_empty());
630+
}
631+
632+
#[tokio::test]
633+
async fn entity_list_query_parameters() {
634+
let TestHandle {
635+
mut app,
636+
mut entity_store_box,
637+
} = setup();
638+
639+
// Mock entity store actor response
640+
tokio::spawn(async move {
641+
if let Some(mut req) = entity_store_box.recv().await {
642+
if let EntityStoreRequest::List(_) = req.request {
643+
req.reply_to
644+
.send(EntityStoreResponse::List(vec![
645+
EntityMetadata::child_device("child00".to_string()).unwrap(),
646+
EntityMetadata::child_device("child01".to_string()).unwrap(),
647+
]))
648+
.await
649+
.unwrap();
650+
}
651+
}
652+
});
653+
654+
let req = Request::builder()
655+
.method(Method::GET)
656+
.uri("/v1/entities?parent=device/child0//&type=child-device")
657+
.body(Body::empty())
658+
.expect("request builder");
659+
660+
let response = app.call(req).await.unwrap();
661+
assert_eq!(response.status(), StatusCode::OK);
662+
663+
let body = response.into_body().collect().await.unwrap().to_bytes();
664+
let entities: Vec<EntityMetadata> = serde_json::from_slice(&body).unwrap();
665+
666+
let entity_set = entities
667+
.iter()
668+
.map(|e| e.topic_id.as_str())
669+
.collect::<HashSet<_>>();
670+
assert!(entity_set.contains("device/child00//"));
671+
assert!(entity_set.contains("device/child01//"));
672+
}
673+
674+
#[tokio::test]
675+
async fn entity_list_empty_query_param() {
676+
let TestHandle {
677+
mut app,
678+
mut entity_store_box,
679+
} = setup();
680+
// Mock entity store actor response
681+
tokio::spawn(async move {
682+
while let Some(mut req) = entity_store_box.recv().await {
683+
if let EntityStoreRequest::List(_) = req.request {
684+
req.reply_to
685+
.send(EntityStoreResponse::List(vec![]))
686+
.await
687+
.unwrap();
688+
}
689+
}
690+
});
691+
692+
for param in ["root=", "parent=", "type="].into_iter() {
693+
let uri = format!("/v1/entities?{}", param);
694+
let req = Request::builder()
695+
.method(Method::GET)
696+
.uri(uri)
697+
.body(Body::empty())
698+
.expect("request builder");
699+
700+
let response = app.call(req).await.unwrap();
701+
assert_eq!(response.status(), StatusCode::OK);
702+
}
703+
704+
let req = Request::builder()
705+
.method(Method::GET)
706+
.uri("/v1/entities?root=&parent=&type=")
707+
.body(Body::empty())
708+
.expect("request builder");
709+
710+
let response = app.call(req).await.unwrap();
711+
assert_eq!(response.status(), StatusCode::OK);
712+
}
713+
714+
#[tokio::test]
715+
async fn entity_list_bad_query_param() {
716+
let TestHandle {
717+
mut app,
718+
entity_store_box: _, // Not used
719+
} = setup();
720+
721+
let req = Request::builder()
722+
.method(Method::GET)
723+
.uri("/v1/entities?parent=an/invalid/topic/id/")
724+
.body(Body::empty())
725+
.expect("request builder");
726+
727+
let response = app.call(req).await.unwrap();
728+
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
729+
730+
let body = response.into_body().collect().await.unwrap().to_bytes();
731+
let entity: Value = serde_json::from_slice(&body).unwrap();
732+
assert_json_eq!(
733+
entity,
734+
json!( {"error":"An entity topic identifier has at most 4 segments"})
735+
);
736+
}
737+
738+
#[tokio::test]
739+
async fn entity_list_bad_query_parameter_combination() {
740+
let TestHandle {
741+
mut app,
742+
entity_store_box: _, // Not used
743+
} = setup();
744+
745+
let req = Request::builder()
746+
.method(Method::GET)
747+
.uri("/v1/entities?root=device/some/topic/id&parent=device/another/topic/id")
748+
.body(Body::empty())
749+
.expect("request builder");
750+
751+
let response = app.call(req).await.unwrap();
752+
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
753+
754+
let body = response.into_body().collect().await.unwrap().to_bytes();
755+
let entity: Value = serde_json::from_slice(&body).unwrap();
756+
assert_json_eq!(
757+
entity,
758+
json!( {"error":"The provided parameters: root and parent are mutually exclusive. Use either one."})
759+
);
542760
}
543761

544762
struct TestHandle {

crates/core/tedge_api/src/entity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl Display for EntityType {
136136
}
137137
}
138138

139-
#[derive(Debug, Error)]
139+
#[derive(Debug, Error, PartialEq, Eq, Clone)]
140140
#[error("Invalid entity type: {0}")]
141141
pub struct InvalidEntityType(String);
142142

0 commit comments

Comments
 (0)