From 994726dd816d9a69559e06b19c26fc658faa381f Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Tue, 1 Mar 2022 19:07:21 +0100 Subject: [PATCH 1/5] ListDocuments::into_stream --- sdk/data_cosmos/examples/database_00.rs | 6 +- sdk/data_cosmos/examples/document_00.rs | 6 +- .../examples/document_entries_00.rs | 20 +-- .../examples/document_entries_01.rs | 7 +- sdk/data_cosmos/examples/readme.rs | 8 +- .../examples/remove_all_documents.rs | 2 +- .../examples/user_permission_token.rs | 11 +- .../src/clients/collection_client.rs | 4 +- .../list_documents.rs} | 127 +++++++++++++----- sdk/data_cosmos/src/operations/mod.rs | 2 + .../src/operations/replace_document.rs | 2 +- .../operations/replace_stored_procedure.rs | 2 +- .../src/requests/list_documents_builder.rs | 126 ----------------- sdk/data_cosmos/src/requests/mod.rs | 2 - sdk/data_cosmos/src/resources/document/mod.rs | 14 +- sdk/data_cosmos/src/responses/mod.rs | 4 - sdk/data_cosmos/tests/cosmos_document.rs | 17 ++- .../tests/permission_token_usage.rs | 5 +- 18 files changed, 155 insertions(+), 210 deletions(-) rename sdk/data_cosmos/src/{responses/list_documents_response.rs => operations/list_documents.rs} (61%) delete mode 100644 sdk/data_cosmos/src/requests/list_documents_builder.rs diff --git a/sdk/data_cosmos/examples/database_00.rs b/sdk/data_cosmos/examples/database_00.rs index 5935b39cea..8e5ad54f03 100644 --- a/sdk/data_cosmos/examples/database_00.rs +++ b/sdk/data_cosmos/examples/database_00.rs @@ -80,8 +80,10 @@ async fn main() -> Result<(), Box> { let documents = collection_client .list_documents() - .execute::() - .await?; + .into_stream::() + .next() + .await + .unwrap()?; println!("\ndocuments as json == {:?}", documents); } } diff --git a/sdk/data_cosmos/examples/document_00.rs b/sdk/data_cosmos/examples/document_00.rs index 894fcaa17c..f16fc1c418 100644 --- a/sdk/data_cosmos/examples/document_00.rs +++ b/sdk/data_cosmos/examples/document_00.rs @@ -145,8 +145,10 @@ async fn main() -> Result<(), Box> { println!("Listing documents..."); let list_documents_response = collection_client .list_documents() - .execute::() - .await?; + .into_stream::() + .next() + .await + .unwrap()?; println!( "list_documents_response contains {} documents", list_documents_response.documents.len() diff --git a/sdk/data_cosmos/examples/document_entries_00.rs b/sdk/data_cosmos/examples/document_entries_00.rs index d4b4063afb..6e7f60e8d8 100644 --- a/sdk/data_cosmos/examples/document_entries_00.rs +++ b/sdk/data_cosmos/examples/document_entries_00.rs @@ -58,12 +58,13 @@ async fn main() -> Result<(), Box> { println!("Created 5 documents."); // Let's get 3 entries at a time. - let response = client + let mut paged = client .list_documents() .consistency_level(response.unwrap()) .max_item_count(3i32) - .execute::() - .await?; + .into_stream::(); + + let response = paged.next().await.unwrap()?; assert_eq!(response.documents.len(), 3); println!("response == {:#?}", response); @@ -72,16 +73,7 @@ async fn main() -> Result<(), Box> { // continuation_token must be present assert!(response.continuation_token.is_some()); - let session_token = &response; - let ct = response.continuation_token.clone().unwrap(); - println!("ct == {}", ct); - - let response = client - .list_documents() - .consistency_level(session_token) - .continuation(ct.as_str()) - .execute::() - .await?; + let response = paged.next().await.unwrap()?; assert_eq!(response.documents.len(), 2); println!("response == {:#?}", response); @@ -101,7 +93,7 @@ async fn main() -> Result<(), Box> { .list_documents() .consistency_level(session_token.clone()) .max_item_count(3); - let mut stream = Box::pin(stream.stream::()); + let mut stream = stream.into_stream::(); // TODO: As soon as the streaming functionality is completed // in Rust substitute this while let Some... into // for each (or whatever the Rust team picks). diff --git a/sdk/data_cosmos/examples/document_entries_01.rs b/sdk/data_cosmos/examples/document_entries_01.rs index 74aadefc96..f61cc905c4 100644 --- a/sdk/data_cosmos/examples/document_entries_01.rs +++ b/sdk/data_cosmos/examples/document_entries_01.rs @@ -1,4 +1,5 @@ use azure_data_cosmos::prelude::*; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use std::error::Error; @@ -82,8 +83,10 @@ async fn main() -> Result<(), Box> { let list_documents_response = client .list_documents() .consistency_level(&get_document_response) - .execute::() - .await?; + .into_stream::() + .next() + .await + .unwrap()?; println!("list_documents_response == {:#?}", list_documents_response); let query_documents_response = client diff --git a/sdk/data_cosmos/examples/readme.rs b/sdk/data_cosmos/examples/readme.rs index 6dc7a977c6..318b6f67ff 100644 --- a/sdk/data_cosmos/examples/readme.rs +++ b/sdk/data_cosmos/examples/readme.rs @@ -99,7 +99,7 @@ async fn main() -> Result<(), Box> { .list_documents() .consistency_level(session_token.clone()) .max_item_count(3); - let mut stream = Box::pin(stream.stream::()); + let mut stream = stream.into_stream::(); // TODO: As soon as the streaming functionality is stabilized // in Rust we can substitute this while let Some... into // for each (or whatever the Rust team picks). @@ -158,8 +158,10 @@ async fn main() -> Result<(), Box> { let list_documents_response = collection_client .list_documents() .consistency_level(session_token) - .execute::() // you can use this if you don't know/care about the return type! - .await?; + .into_stream::() // you can use this if you don't know/care about the return type! + .next() + .await + .unwrap()?; assert_eq!(list_documents_response.documents.len(), 4); Ok(()) diff --git a/sdk/data_cosmos/examples/remove_all_documents.rs b/sdk/data_cosmos/examples/remove_all_documents.rs index 31b357d1f1..3fd3925321 100644 --- a/sdk/data_cosmos/examples/remove_all_documents.rs +++ b/sdk/data_cosmos/examples/remove_all_documents.rs @@ -36,7 +36,7 @@ async fn main() -> Result<(), Box> { let mut documents = Vec::new(); let stream = client.list_documents(); - let mut stream = Box::pin(stream.stream::()); + let mut stream = stream.into_stream::(); while let Some(res) = stream.next().await { for doc in res?.documents { documents.push(doc); diff --git a/sdk/data_cosmos/examples/user_permission_token.rs b/sdk/data_cosmos/examples/user_permission_token.rs index e78f783a50..8331847042 100644 --- a/sdk/data_cosmos/examples/user_permission_token.rs +++ b/sdk/data_cosmos/examples/user_permission_token.rs @@ -1,4 +1,5 @@ use azure_data_cosmos::prelude::*; +use futures::StreamExt; use std::error::Error; #[tokio::main] @@ -42,9 +43,10 @@ async fn main() -> Result<(), Box> { // test list documents let list_documents_response = collection_client .list_documents() - .execute::() + .into_stream::() + .next() .await - .unwrap(); + .unwrap()?; println!( "list_documents_response got {} document(s).", list_documents_response.documents.len() @@ -86,9 +88,10 @@ async fn main() -> Result<(), Box> { .into_database_client(database_name.clone()) .into_collection_client(collection_name.clone()) .list_documents() - .execute::() + .into_stream::() + .next() .await - .unwrap(); + .unwrap()?; println!( "second list_documents_response got {} document(s).", list_documents_response.documents.len() diff --git a/sdk/data_cosmos/src/clients/collection_client.rs b/sdk/data_cosmos/src/clients/collection_client.rs index 61ecdfe792..4299da7524 100644 --- a/sdk/data_cosmos/src/clients/collection_client.rs +++ b/sdk/data_cosmos/src/clients/collection_client.rs @@ -60,8 +60,8 @@ impl CollectionClient { } /// list documents in a collection - pub fn list_documents(&self) -> requests::ListDocumentsBuilder<'_, '_> { - requests::ListDocumentsBuilder::new(self) + pub fn list_documents(&self) -> ListDocumentsBuilder { + ListDocumentsBuilder::new(self.clone()) } /// create a document in a collection diff --git a/sdk/data_cosmos/src/responses/list_documents_response.rs b/sdk/data_cosmos/src/operations/list_documents.rs similarity index 61% rename from sdk/data_cosmos/src/responses/list_documents_response.rs rename to sdk/data_cosmos/src/operations/list_documents.rs index c04a034888..6d7956bc54 100644 --- a/sdk/data_cosmos/src/responses/list_documents_response.rs +++ b/sdk/data_cosmos/src/operations/list_documents.rs @@ -1,15 +1,91 @@ use crate::headers::from_headers::*; +use crate::prelude::*; use crate::resources::document::{Document, DocumentAttributes}; +use crate::resources::ResourceType; use crate::ResourceQuota; - use azure_core::headers::{ - continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers, + self, continuation_token_from_headers_optional, item_count_from_headers, + session_token_from_headers, }; -use azure_core::SessionToken; +use azure_core::{collect_pinned_stream, Response, SessionToken}; +use azure_core::{prelude::*, Pageable}; use chrono::{DateTime, Utc}; -use http::response::Response; use serde::de::DeserializeOwned; +#[derive(Debug, Clone)] +pub struct ListDocumentsBuilder { + client: CollectionClient, + if_match_condition: Option, + consistency_level: Option, + max_item_count: MaxItemCount, + a_im: ChangeFeed, + partition_range_id: Option, + context: Context, +} + +impl ListDocumentsBuilder { + pub(crate) fn new(client: CollectionClient) -> Self { + Self { + client, + if_match_condition: None, + consistency_level: None, + max_item_count: MaxItemCount::new(-1), + a_im: ChangeFeed::None, + partition_range_id: None, + context: Context::new(), + } + } + + setters! { + consistency_level: ConsistencyLevel => Some(consistency_level), + max_item_count: i32 => MaxItemCount::new(max_item_count), + a_im: ChangeFeed, + if_match_condition: IfMatchCondition => Some(if_match_condition), + partition_range_id: String => Some(PartitionRangeId::new(partition_range_id)), + } + + pub fn into_stream(self) -> ListDocuments { + let make_request = move |continuation: Option| { + let this = self.clone(); + let ctx = self.context.clone(); + async move { + let mut req = this.client.cosmos_client().prepare_request_pipeline( + &format!( + "dbs/{}/colls/{}/docs", + this.client.database_client().database_name(), + this.client.collection_name() + ), + http::Method::GET, + ); + + azure_core::headers::add_optional_header2(&this.if_match_condition, &mut req)?; + azure_core::headers::add_optional_header2(&this.consistency_level, &mut req)?; + azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut req)?; + azure_core::headers::add_mandatory_header2(&this.a_im, &mut req)?; + azure_core::headers::add_optional_header2(&this.partition_range_id, &mut req)?; + + if let Some(c) = continuation { + let h = http::HeaderValue::from_str(c.as_str()) + .map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?; + req.headers_mut().append(headers::CONTINUATION, h); + } + + let response = this + .client + .pipeline() + .send(ctx.clone().insert(ResourceType::Documents), &mut req) + .await?; + + ListDocumentsResponse::try_from(response).await + } + }; + + Pageable::new(make_request) + } +} + +pub type ListDocuments = Pageable, crate::Error>; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ListDocumentsResponseAttributes { #[serde(rename = "_rid")] @@ -57,45 +133,22 @@ pub struct ListDocumentsResponseEntities { pub entities: Vec, } -impl std::convert::TryFrom<&[u8]> for ListDocumentsResponseAttributes { - type Error = crate::Error; - fn try_from(body: &[u8]) -> Result { - Ok(serde_json::from_slice(body)?) - } -} - -impl std::convert::TryFrom<&[u8]> for ListDocumentsResponseEntities -where - T: DeserializeOwned, -{ - type Error = crate::Error; - - fn try_from(body: &[u8]) -> Result { - Ok(serde_json::from_slice(body)?) - } -} - -impl std::convert::TryFrom> for ListDocumentsResponse +impl ListDocumentsResponse where T: DeserializeOwned, { - type Error = crate::Error; - - fn try_from(response: Response) -> Result { - let headers = response.headers(); - let body: &[u8] = response.body(); - - debug!("headers == {:#?}", headers); - debug!("body == {:#?}", std::str::from_utf8(body)); + pub(crate) async fn try_from(response: Response) -> crate::Result { + let (_status_code, headers, pinned_stream) = response.deconstruct(); + let body: bytes::Bytes = collect_pinned_stream(pinned_stream).await?; + let headers = &headers; // we will proceed in three steps: // 1- Deserialize the result as DocumentAttributes. The extra field will be ignored. // 2- Deserialize the result a type T. The extra fields will be ignored. // 3- Zip 1 and 2 in the resulting structure. // There is a lot of data movement here, let's hope the compiler is smarter than me :) - let document_attributes = ListDocumentsResponseAttributes::try_from(body)?; - debug!("document_attributes == {:?}", document_attributes); - let entries = ListDocumentsResponseEntities::try_from(body)?; + let document_attributes: ListDocumentsResponseAttributes = serde_json::from_slice(&body)?; + let entries: ListDocumentsResponseEntities = serde_json::from_slice(&body)?; let documents = document_attributes .documents @@ -139,6 +192,12 @@ where } } +impl Continuable for ListDocumentsResponse { + fn continuation(&self) -> Option { + self.continuation_token.clone() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/sdk/data_cosmos/src/operations/mod.rs b/sdk/data_cosmos/src/operations/mod.rs index bd94f33894..a06669cbb9 100644 --- a/sdk/data_cosmos/src/operations/mod.rs +++ b/sdk/data_cosmos/src/operations/mod.rs @@ -20,6 +20,7 @@ mod get_permission; mod get_user; mod list_collections; mod list_databases; +mod list_documents; mod list_users; mod replace_collection; mod replace_document; @@ -45,6 +46,7 @@ pub use get_permission::*; pub use get_user::*; pub use list_collections::*; pub use list_databases::*; +pub use list_documents::*; pub use list_users::*; pub use replace_collection::*; pub use replace_document::*; diff --git a/sdk/data_cosmos/src/operations/replace_document.rs b/sdk/data_cosmos/src/operations/replace_document.rs index 7882b0b453..92d7e0fe6c 100644 --- a/sdk/data_cosmos/src/operations/replace_document.rs +++ b/sdk/data_cosmos/src/operations/replace_document.rs @@ -94,7 +94,7 @@ pub type ReplaceDocument = futures::future::BoxFuture<'static, crate::Result>; #[cfg(feature = "into_future")] -impl std::future::IntoFuture for ReplaceDocumentBuilder { +impl std::future::IntoFuture for ReplaceDocumentBuilder { type Future = ReplaceDocument; type Output = ::Output; fn into_future(self) -> Self::Future { diff --git a/sdk/data_cosmos/src/operations/replace_stored_procedure.rs b/sdk/data_cosmos/src/operations/replace_stored_procedure.rs index 57dffccb4e..568e55c857 100644 --- a/sdk/data_cosmos/src/operations/replace_stored_procedure.rs +++ b/sdk/data_cosmos/src/operations/replace_stored_procedure.rs @@ -26,7 +26,7 @@ impl ReplaceStoredProcedureBuilder { consistency_level: ConsistencyLevel => Some(consistency_level), } - pub async fn into_future(self) -> ReplaceStoredProcedure { + pub fn into_future(self) -> ReplaceStoredProcedure { Box::pin(async move { let mut req = self .client diff --git a/sdk/data_cosmos/src/requests/list_documents_builder.rs b/sdk/data_cosmos/src/requests/list_documents_builder.rs deleted file mode 100644 index d0b527a5c1..0000000000 --- a/sdk/data_cosmos/src/requests/list_documents_builder.rs +++ /dev/null @@ -1,126 +0,0 @@ -use crate::prelude::*; -use crate::resources::ResourceType; -use crate::responses::ListDocumentsResponse; -use azure_core::prelude::*; -use futures::stream::{unfold, Stream}; -use http::StatusCode; -use serde::de::DeserializeOwned; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ListDocumentsBuilder<'a, 'b> { - collection_client: &'a CollectionClient, - if_match_condition: Option, - user_agent: Option>, - activity_id: Option>, - consistency_level: Option, - continuation: Option>, - max_item_count: MaxItemCount, - a_im: ChangeFeed, - partition_range_id: Option>, -} - -impl<'a, 'b> ListDocumentsBuilder<'a, 'b> { - pub(crate) fn new(collection_client: &'a CollectionClient) -> Self { - Self { - collection_client, - if_match_condition: None, - user_agent: None, - activity_id: None, - consistency_level: None, - continuation: None, - max_item_count: MaxItemCount::new(-1), - a_im: ChangeFeed::None, - partition_range_id: None, - } - } - - setters! { - user_agent: &'b str => Some(UserAgent::new(user_agent)), - activity_id: &'b str => Some(ActivityId::new(activity_id)), - consistency_level: ConsistencyLevel => Some(consistency_level), - continuation: &'b str => Some(Continuation::new(continuation)), - max_item_count: i32 => MaxItemCount::new(max_item_count), - a_im: ChangeFeed, - if_match_condition: IfMatchCondition => Some(if_match_condition), - partition_range_id: &'b str => Some(PartitionRangeId::new(partition_range_id)), - } - - pub async fn execute(&self) -> crate::Result> - where - T: DeserializeOwned, - { - let req = self.collection_client.cosmos_client().prepare_request( - &format!( - "dbs/{}/colls/{}/docs", - self.collection_client.database_client().database_name(), - self.collection_client.collection_name() - ), - http::Method::GET, - ResourceType::Documents, - ); - - // add trait headers - let req = azure_core::headers::add_optional_header(&self.if_match_condition, req); - let req = azure_core::headers::add_optional_header(&self.user_agent, req); - let req = azure_core::headers::add_optional_header(&self.activity_id, req); - let req = azure_core::headers::add_optional_header(&self.consistency_level, req); - let req = azure_core::headers::add_optional_header(&self.continuation, req); - let req = azure_core::headers::add_mandatory_header(&self.max_item_count, req); - let req = azure_core::headers::add_mandatory_header(&self.a_im, req); - let req = azure_core::headers::add_optional_header(&self.partition_range_id, req); - - let req = req.body(azure_core::EMPTY_BODY)?; - - Ok(self - .collection_client - .http_client() - .execute_request_check_status(req, StatusCode::OK) - .await? - .try_into()?) - } - - pub fn stream(&self) -> impl Stream>> + '_ - where - T: DeserializeOwned, - { - #[derive(Debug, Clone, PartialEq)] - enum States { - Init, - Continuation(String), - } - - unfold( - Some(States::Init), - move |continuation_token: Option| { - async move { - debug!("continuation_token == {:?}", &continuation_token); - let response = match continuation_token { - Some(States::Init) => self.execute().await, - Some(States::Continuation(continuation_token)) => { - self.clone() - .continuation(continuation_token.as_str()) - .execute() - .await - } - None => return None, - }; - - // the ? operator does not work in async move (yet?) - // so we have to resort to this boilerplate - let response = match response { - Ok(response) => response, - Err(err) => return Some((Err(err), None)), - }; - - let continuation_token = response - .continuation_token - .as_ref() - .map(|ct| States::Continuation(ct.to_owned())); - - Some((Ok(response), continuation_token)) - } - }, - ) - } -} diff --git a/sdk/data_cosmos/src/requests/mod.rs b/sdk/data_cosmos/src/requests/mod.rs index e5f78cf4c1..3cd167684d 100644 --- a/sdk/data_cosmos/src/requests/mod.rs +++ b/sdk/data_cosmos/src/requests/mod.rs @@ -18,7 +18,6 @@ mod execute_stored_procedure_builder; mod get_attachment_builder; mod get_partition_key_ranges_builder; mod list_attachments_builder; -mod list_documents_builder; mod list_permissions_builder; mod list_stored_procedures_builder; mod list_triggers_builder; @@ -39,7 +38,6 @@ pub use execute_stored_procedure_builder::ExecuteStoredProcedureBuilder; pub use get_attachment_builder::GetAttachmentBuilder; pub use get_partition_key_ranges_builder::GetPartitionKeyRangesBuilder; pub use list_attachments_builder::ListAttachmentsBuilder; -pub use list_documents_builder::ListDocumentsBuilder; pub use list_permissions_builder::ListPermissionsBuilder; pub use list_stored_procedures_builder::ListStoredProceduresBuilder; pub use list_triggers_builder::ListTriggersBuilder; diff --git a/sdk/data_cosmos/src/resources/document/mod.rs b/sdk/data_cosmos/src/resources/document/mod.rs index a5a817dd9f..c425d1bc29 100644 --- a/sdk/data_cosmos/src/resources/document/mod.rs +++ b/sdk/data_cosmos/src/resources/document/mod.rs @@ -245,19 +245,19 @@ impl AddAsHeader for TentativeWritesAllowance { } /// Collections of partition keys grouped by physical partitions -#[derive(Debug, Clone, Copy)] -pub struct PartitionRangeId<'a>(&'a str); +#[derive(Debug, Clone)] +pub struct PartitionRangeId(String); -impl<'a> PartitionRangeId<'a> { +impl PartitionRangeId { /// A new partition range id from a string - pub fn new(id: &'a str) -> Self { + pub fn new(id: String) -> Self { Self(id) } } -impl AddAsHeader for PartitionRangeId<'_> { +impl AddAsHeader for PartitionRangeId { fn add_as_header(&self, builder: Builder) -> Builder { - builder.header(headers::HEADER_DOCUMENTDB_PARTITIONRANGEID, self.0) + builder.header(headers::HEADER_DOCUMENTDB_PARTITIONRANGEID, &self.0) } fn add_as_header2( @@ -266,7 +266,7 @@ impl AddAsHeader for PartitionRangeId<'_> { ) -> Result<(), azure_core::HttpHeaderError> { request.headers_mut().append( headers::HEADER_DOCUMENTDB_PARTITIONRANGEID, - http::header::HeaderValue::from_str(self.0)?, + http::header::HeaderValue::from_str(&self.0)?, ); Ok(()) diff --git a/sdk/data_cosmos/src/responses/mod.rs b/sdk/data_cosmos/src/responses/mod.rs index 75df546d7d..83bb922760 100644 --- a/sdk/data_cosmos/src/responses/mod.rs +++ b/sdk/data_cosmos/src/responses/mod.rs @@ -15,7 +15,6 @@ mod execute_stored_procedure_response; mod get_attachment_response; mod get_partition_key_ranges_response; mod list_attachments_response; -mod list_documents_response; mod list_permissions_response; mod list_stored_procedures_response; mod list_triggers_response; @@ -36,9 +35,6 @@ pub use execute_stored_procedure_response::ExecuteStoredProcedureResponse; pub use get_attachment_response::GetAttachmentResponse; pub use get_partition_key_ranges_response::GetPartitionKeyRangesResponse; pub use list_attachments_response::ListAttachmentsResponse; -pub use list_documents_response::{ - ListDocumentsResponse, ListDocumentsResponseAttributes, ListDocumentsResponseEntities, -}; pub use list_permissions_response::ListPermissionsResponse; pub use list_stored_procedures_response::ListStoredProceduresResponse; pub use list_triggers_response::ListTriggersResponse; diff --git a/sdk/data_cosmos/tests/cosmos_document.rs b/sdk/data_cosmos/tests/cosmos_document.rs index c970225ed7..534f2dcc36 100644 --- a/sdk/data_cosmos/tests/cosmos_document.rs +++ b/sdk/data_cosmos/tests/cosmos_document.rs @@ -6,6 +6,7 @@ mod setup; use azure_core::prelude::*; use azure_data_cosmos::prelude::*; use collection::*; +use futures::StreamExt; #[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] struct MyDocument { @@ -70,9 +71,11 @@ async fn create_and_delete_document() { let documents = collection_client .list_documents() - .execute::() + .into_stream::() + .next() .await .unwrap() + .unwrap() .documents; assert!(documents.len() == 1); @@ -103,9 +106,11 @@ async fn create_and_delete_document() { let documents = collection_client .list_documents() - .execute::() + .into_stream::() + .next() .await .unwrap() + .unwrap() .documents; assert!(documents.len() == 0); @@ -164,9 +169,11 @@ async fn query_documents() { let documents = collection_client .list_documents() - .execute::() + .into_stream::() + .next() .await .unwrap() + .unwrap() .documents; assert!(documents.len() == 1); @@ -240,8 +247,10 @@ async fn replace_document() { let documents = collection_client .list_documents() - .execute::() + .into_stream::() + .next() .await + .unwrap() .unwrap(); assert!(documents.documents.len() == 1); diff --git a/sdk/data_cosmos/tests/permission_token_usage.rs b/sdk/data_cosmos/tests/permission_token_usage.rs index 5abf89fd20..6ee186cdbf 100644 --- a/sdk/data_cosmos/tests/permission_token_usage.rs +++ b/sdk/data_cosmos/tests/permission_token_usage.rs @@ -1,6 +1,7 @@ #![cfg(all(test, feature = "test_e2e"))] use azure_data_cosmos::prelude::*; use collection::*; +use futures::StreamExt; use serde::{Deserialize, Serialize}; mod setup; @@ -83,8 +84,10 @@ async fn permission_token_usage() { .clone() .into_collection_client(COLLECTION_NAME) .list_documents() - .execute::() + .into_stream::() + .next() .await + .unwrap() .unwrap(); let new_collection_client = new_database_client.into_collection_client(COLLECTION_NAME); From 8b22f0cfc0b4853b1bb0ac878e1db34f079912df Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Wed, 2 Mar 2022 10:49:27 +0100 Subject: [PATCH 2/5] ListPermissions::into_stream --- sdk/data_cosmos/examples/permission_00.rs | 7 +- sdk/data_cosmos/src/clients/user_client.rs | 12 +- .../src/operations/list_permissions.rs | 117 ++++++++++++++++++ sdk/data_cosmos/src/operations/mod.rs | 2 + .../src/requests/list_permissions_builder.rs | 109 ---------------- sdk/data_cosmos/src/requests/mod.rs | 2 - .../responses/list_permissions_response.rs | 50 -------- sdk/data_cosmos/src/responses/mod.rs | 1 - sdk/data_cosmos/tests/permission.rs | 17 ++- 9 files changed, 143 insertions(+), 174 deletions(-) create mode 100644 sdk/data_cosmos/src/operations/list_permissions.rs delete mode 100644 sdk/data_cosmos/src/requests/list_permissions_builder.rs diff --git a/sdk/data_cosmos/examples/permission_00.rs b/sdk/data_cosmos/examples/permission_00.rs index e48bbf82a7..54de996b84 100644 --- a/sdk/data_cosmos/examples/permission_00.rs +++ b/sdk/data_cosmos/examples/permission_00.rs @@ -1,4 +1,5 @@ use azure_data_cosmos::prelude::*; +use futures::StreamExt; use std::error::Error; #[tokio::main] @@ -92,8 +93,10 @@ async fn main() -> Result<(), Box> { .consistency_level(ConsistencyLevel::Session( create_permission2_response.session_token, )) - .execute() - .await?; + .into_stream() + .next() + .await + .unwrap()?; println!( "list_permissions_response == {:#?}", list_permissions_response diff --git a/sdk/data_cosmos/src/clients/user_client.rs b/sdk/data_cosmos/src/clients/user_client.rs index dece8fa4e8..4c10978389 100644 --- a/sdk/data_cosmos/src/clients/user_client.rs +++ b/sdk/data_cosmos/src/clients/user_client.rs @@ -1,7 +1,7 @@ use super::*; use crate::prelude::*; -use crate::{requests, ReadonlyString}; -use azure_core::{HttpClient, Pipeline, Request}; +use crate::ReadonlyString; +use azure_core::{Pipeline, Request}; /// A client for Cosmos user resources. #[derive(Debug, Clone)] @@ -57,8 +57,8 @@ impl UserClient { } /// List the user's permissions - pub fn list_permissions(&self) -> requests::ListPermissionsBuilder<'_, '_> { - requests::ListPermissionsBuilder::new(self) + pub fn list_permissions(&self) -> ListPermissionsBuilder { + ListPermissionsBuilder::new(self.clone()) } /// Convert into a [`PermissionClient`] @@ -80,10 +80,6 @@ impl UserClient { ) } - pub(crate) fn http_client(&self) -> &dyn HttpClient { - self.cosmos_client().http_client() - } - pub(crate) fn pipeline(&self) -> &Pipeline { self.cosmos_client().pipeline() } diff --git a/sdk/data_cosmos/src/operations/list_permissions.rs b/sdk/data_cosmos/src/operations/list_permissions.rs new file mode 100644 index 0000000000..627ba744ae --- /dev/null +++ b/sdk/data_cosmos/src/operations/list_permissions.rs @@ -0,0 +1,117 @@ +use crate::headers::from_headers::*; +use crate::prelude::*; +use crate::resources::Permission; +use crate::resources::ResourceType; +use azure_core::collect_pinned_stream; +use azure_core::headers::{ + self, continuation_token_from_headers_optional, session_token_from_headers, +}; +use azure_core::prelude::*; +use azure_core::{Pageable, Response as HttpResponse}; + +#[derive(Debug, Clone)] +pub struct ListPermissionsBuilder { + client: UserClient, + consistency_level: Option, + max_item_count: MaxItemCount, + context: Context, +} + +impl ListPermissionsBuilder { + pub(crate) fn new(client: UserClient) -> Self { + Self { + client, + consistency_level: None, + max_item_count: MaxItemCount::new(-1), + context: Context::new(), + } + } + + setters! { + consistency_level: ConsistencyLevel => Some(consistency_level), + max_item_count: i32 => MaxItemCount::new(max_item_count), + context: Context => context, + } + + pub fn into_stream(self) -> ListPermissions { + let make_request = move |continuation: Option| { + let this = self.clone(); + let ctx = self.context.clone(); + async move { + let mut request = this.client.cosmos_client().prepare_request_pipeline( + &format!( + "dbs/{}/users/{}/permissions", + this.client.database_client().database_name(), + this.client.user_name() + ), + http::Method::GET, + ); + + azure_core::headers::add_optional_header2(&this.consistency_level, &mut request)?; + azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut request)?; + + if let Some(c) = continuation { + let h = http::HeaderValue::from_str(c.as_str()) + .map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?; + request.headers_mut().append(headers::CONTINUATION, h); + } + + let response = this + .client + .pipeline() + .send(ctx.clone().insert(ResourceType::Permissions), &mut request) + .await?; + ListPermissionsResponse::try_from(response).await + } + }; + + Pageable::new(make_request) + } +} + +pub type ListPermissions = Pageable; + +#[derive(Debug, Clone, PartialEq)] +pub struct ListPermissionsResponse { + pub permissions: Vec, + pub charge: f64, + pub activity_id: uuid::Uuid, + pub session_token: String, + pub content_path: String, + pub alt_content_path: String, + pub continuation_token: Option, +} + +impl ListPermissionsResponse { + pub async fn try_from(response: HttpResponse) -> crate::Result { + let (_status_code, headers, pinned_stream) = response.deconstruct(); + let body = collect_pinned_stream(pinned_stream).await?; + + #[derive(Debug, Deserialize)] + struct Response { + _rid: String, + #[serde(rename = "Permissions")] + permissions: Vec, + _count: u32, + } + + let response: Response = serde_json::from_slice(&body)?; + let permissions = response.permissions; + + Ok(Self { + permissions, + charge: request_charge_from_headers(&headers)?, + activity_id: activity_id_from_headers(&headers)?, + session_token: session_token_from_headers(&headers)?, + content_path: content_path_from_headers(&headers)?.to_owned(), + alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(), + continuation_token: continuation_token_from_headers_optional(&headers)?, + }) + } +} + +impl Continuable for ListPermissionsResponse { + fn continuation(&self) -> Option { + self.continuation_token.clone() + } +} diff --git a/sdk/data_cosmos/src/operations/mod.rs b/sdk/data_cosmos/src/operations/mod.rs index a06669cbb9..31ffb369c9 100644 --- a/sdk/data_cosmos/src/operations/mod.rs +++ b/sdk/data_cosmos/src/operations/mod.rs @@ -21,6 +21,7 @@ mod get_user; mod list_collections; mod list_databases; mod list_documents; +mod list_permissions; mod list_users; mod replace_collection; mod replace_document; @@ -47,6 +48,7 @@ pub use get_user::*; pub use list_collections::*; pub use list_databases::*; pub use list_documents::*; +pub use list_permissions::*; pub use list_users::*; pub use replace_collection::*; pub use replace_document::*; diff --git a/sdk/data_cosmos/src/requests/list_permissions_builder.rs b/sdk/data_cosmos/src/requests/list_permissions_builder.rs deleted file mode 100644 index 32e817a315..0000000000 --- a/sdk/data_cosmos/src/requests/list_permissions_builder.rs +++ /dev/null @@ -1,109 +0,0 @@ -use crate::prelude::*; -use crate::resources::ResourceType; -use crate::responses::ListPermissionsResponse; -use azure_core::prelude::*; -use futures::stream::{unfold, Stream}; -use http::StatusCode; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ListPermissionsBuilder<'a, 'b> { - user_client: &'a UserClient, - user_agent: Option>, - activity_id: Option>, - consistency_level: Option, - continuation: Option>, - max_item_count: MaxItemCount, -} - -impl<'a, 'b> ListPermissionsBuilder<'a, 'b> { - pub(crate) fn new(user_client: &'a UserClient) -> Self { - Self { - user_client, - user_agent: None, - activity_id: None, - consistency_level: None, - continuation: None, - max_item_count: MaxItemCount::new(-1), - } - } - - setters! { - user_agent: &'b str => Some(UserAgent::new(user_agent)), - activity_id: &'b str => Some(ActivityId::new(activity_id)), - consistency_level: ConsistencyLevel => Some(consistency_level), - continuation: &'b str => Some(Continuation::new(continuation)), - max_item_count: i32 => MaxItemCount::new(max_item_count), - } - - pub async fn execute(&self) -> crate::Result { - trace!("ListPermissionsBuilder::execute called"); - - let request = self.user_client.cosmos_client().prepare_request( - &format!( - "dbs/{}/users/{}/permissions", - self.user_client.database_client().database_name(), - self.user_client.user_name() - ), - http::Method::GET, - ResourceType::Permissions, - ); - - let request = azure_core::headers::add_optional_header(&self.user_agent, request); - let request = azure_core::headers::add_optional_header(&self.activity_id, request); - let request = azure_core::headers::add_optional_header(&self.consistency_level, request); - let request = azure_core::headers::add_optional_header(&self.continuation, request); - let request = azure_core::headers::add_mandatory_header(&self.max_item_count, request); - - let request = request.body(azure_core::EMPTY_BODY)?; - debug!("\nrequest == {:#?}", request); - - Ok(self - .user_client - .http_client() - .execute_request_check_status(request, StatusCode::OK) - .await? - .try_into()?) - } - - pub fn stream(&self) -> impl Stream> + '_ { - #[derive(Debug, Clone, PartialEq)] - enum States { - Init, - Continuation(String), - } - - unfold( - Some(States::Init), - move |continuation_token: Option| { - async move { - debug!("continuation_token == {:?}", &continuation_token); - let response = match continuation_token { - Some(States::Init) => self.execute().await, - Some(States::Continuation(continuation_token)) => { - self.clone() - .continuation(continuation_token.as_str()) - .execute() - .await - } - None => return None, - }; - - // the ? operator does not work in async move (yet?) - // so we have to resort to this boilerplate - let response = match response { - Ok(response) => response, - Err(err) => return Some((Err(err), None)), - }; - - let continuation_token = response - .continuation_token - .as_ref() - .map(|ct| States::Continuation(ct.to_owned())); - - Some((Ok(response), continuation_token)) - } - }, - ) - } -} diff --git a/sdk/data_cosmos/src/requests/mod.rs b/sdk/data_cosmos/src/requests/mod.rs index 3cd167684d..b43cc68e45 100644 --- a/sdk/data_cosmos/src/requests/mod.rs +++ b/sdk/data_cosmos/src/requests/mod.rs @@ -18,7 +18,6 @@ mod execute_stored_procedure_builder; mod get_attachment_builder; mod get_partition_key_ranges_builder; mod list_attachments_builder; -mod list_permissions_builder; mod list_stored_procedures_builder; mod list_triggers_builder; mod list_user_defined_functions_builder; @@ -38,7 +37,6 @@ pub use execute_stored_procedure_builder::ExecuteStoredProcedureBuilder; pub use get_attachment_builder::GetAttachmentBuilder; pub use get_partition_key_ranges_builder::GetPartitionKeyRangesBuilder; pub use list_attachments_builder::ListAttachmentsBuilder; -pub use list_permissions_builder::ListPermissionsBuilder; pub use list_stored_procedures_builder::ListStoredProceduresBuilder; pub use list_triggers_builder::ListTriggersBuilder; pub use list_user_defined_functions_builder::ListUserDefinedFunctionsBuilder; diff --git a/sdk/data_cosmos/src/responses/list_permissions_response.rs b/sdk/data_cosmos/src/responses/list_permissions_response.rs index 268b40e4f1..e69de29bb2 100644 --- a/sdk/data_cosmos/src/responses/list_permissions_response.rs +++ b/sdk/data_cosmos/src/responses/list_permissions_response.rs @@ -1,50 +0,0 @@ -use crate::headers::from_headers::*; -use crate::resources::Permission; -use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers}; -use http::response::Response; - -#[derive(Debug, Clone, PartialEq)] -pub struct ListPermissionsResponse { - pub permissions: Vec, - pub charge: f64, - pub activity_id: uuid::Uuid, - pub session_token: String, - pub content_path: String, - pub alt_content_path: String, - pub continuation_token: Option, -} - -impl std::convert::TryFrom> for ListPermissionsResponse { - type Error = crate::Error; - - fn try_from(response: Response) -> Result { - let headers = response.headers(); - let body = response.body(); - - #[derive(Debug, Deserialize)] - struct Response { - _rid: String, - #[serde(rename = "Permissions")] - permissions: Vec, - _count: u32, - } - - // first get the Cosmos REST API permission - let response: Response = serde_json::from_slice(body)?; - debug!("response == {:#?}", response); - - // now convert every Cosmos REST API permission - // into the SDK struct - let permissions = response.permissions; - - Ok(Self { - permissions, - charge: request_charge_from_headers(headers)?, - activity_id: activity_id_from_headers(headers)?, - session_token: session_token_from_headers(headers)?, - content_path: content_path_from_headers(headers)?.to_owned(), - alt_content_path: alt_content_path_from_headers(headers)?.to_owned(), - continuation_token: continuation_token_from_headers_optional(headers)?, - }) - } -} diff --git a/sdk/data_cosmos/src/responses/mod.rs b/sdk/data_cosmos/src/responses/mod.rs index 83bb922760..d6a75dbb8d 100644 --- a/sdk/data_cosmos/src/responses/mod.rs +++ b/sdk/data_cosmos/src/responses/mod.rs @@ -35,7 +35,6 @@ pub use execute_stored_procedure_response::ExecuteStoredProcedureResponse; pub use get_attachment_response::GetAttachmentResponse; pub use get_partition_key_ranges_response::GetPartitionKeyRangesResponse; pub use list_attachments_response::ListAttachmentsResponse; -pub use list_permissions_response::ListPermissionsResponse; pub use list_stored_procedures_response::ListStoredProceduresResponse; pub use list_triggers_response::ListTriggersResponse; pub use list_user_defined_functions_response::ListUserDefinedFunctionsResponse; diff --git a/sdk/data_cosmos/tests/permission.rs b/sdk/data_cosmos/tests/permission.rs index b7d2989914..d26e12fa94 100644 --- a/sdk/data_cosmos/tests/permission.rs +++ b/sdk/data_cosmos/tests/permission.rs @@ -1,5 +1,6 @@ #![cfg(all(test, feature = "test_e2e"))] use azure_data_cosmos::prelude::*; +use futures::StreamExt; mod setup; @@ -54,10 +55,22 @@ async fn permissions() { .await .unwrap(); - let list_permissions_response = user1_client.list_permissions().execute().await.unwrap(); + let list_permissions_response = user1_client + .list_permissions() + .into_stream() + .next() + .await + .unwrap() + .unwrap(); assert_eq!(list_permissions_response.permissions.len(), 1); - let list_permissions_response = user2_client.list_permissions().execute().await.unwrap(); + let list_permissions_response = user2_client + .list_permissions() + .into_stream() + .next() + .await + .unwrap() + .unwrap(); assert_eq!(list_permissions_response.permissions.len(), 1); // delete the database From e47a4727e9c36321d002b3fbc262a60bb5e75ba4 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Wed, 2 Mar 2022 11:00:02 +0100 Subject: [PATCH 3/5] DeleteStoredProcedure::into_future --- sdk/data_cosmos/examples/stored_proc_01.rs | 2 +- .../src/clients/stored_procedure_client.rs | 4 +- .../src/operations/delete_stored_procedure.rs | 87 +++++++++++++++++++ sdk/data_cosmos/src/operations/mod.rs | 2 + .../delete_stored_procedure_builder.rs | 53 ----------- sdk/data_cosmos/src/requests/mod.rs | 2 - .../delete_stored_procedure_response.rs | 32 ------- sdk/data_cosmos/src/responses/mod.rs | 2 - 8 files changed, 92 insertions(+), 92 deletions(-) create mode 100644 sdk/data_cosmos/src/operations/delete_stored_procedure.rs delete mode 100644 sdk/data_cosmos/src/requests/delete_stored_procedure_builder.rs delete mode 100644 sdk/data_cosmos/src/responses/delete_stored_procedure_response.rs diff --git a/sdk/data_cosmos/examples/stored_proc_01.rs b/sdk/data_cosmos/examples/stored_proc_01.rs index 96a2341d20..187e54f18c 100644 --- a/sdk/data_cosmos/examples/stored_proc_01.rs +++ b/sdk/data_cosmos/examples/stored_proc_01.rs @@ -79,7 +79,7 @@ async fn main() -> Result<(), Box> { let delete_stored_procedure_response = stored_procedure_client .delete_stored_procedure() - .execute() + .into_future() .await?; println!( "delete_stored_procedure_response == {:#?}", diff --git a/sdk/data_cosmos/src/clients/stored_procedure_client.rs b/sdk/data_cosmos/src/clients/stored_procedure_client.rs index b6bb79b6d0..6d3f824768 100644 --- a/sdk/data_cosmos/src/clients/stored_procedure_client.rs +++ b/sdk/data_cosmos/src/clients/stored_procedure_client.rs @@ -64,8 +64,8 @@ impl StoredProcedureClient { } /// Delete the stored procedure - pub fn delete_stored_procedure(&self) -> requests::DeleteStoredProcedureBuilder<'_, '_> { - requests::DeleteStoredProcedureBuilder::new(self) + pub fn delete_stored_procedure(&self) -> DeleteStoredProcedureBuilder { + DeleteStoredProcedureBuilder::new(self.clone()) } pub(crate) fn prepare_request_with_stored_procedure_name( diff --git a/sdk/data_cosmos/src/operations/delete_stored_procedure.rs b/sdk/data_cosmos/src/operations/delete_stored_procedure.rs new file mode 100644 index 0000000000..ce8beead86 --- /dev/null +++ b/sdk/data_cosmos/src/operations/delete_stored_procedure.rs @@ -0,0 +1,87 @@ +use crate::headers::from_headers::*; +use crate::prelude::*; +use crate::ResourceQuota; +use azure_core::headers::session_token_from_headers; +use azure_core::prelude::*; +use azure_core::Response as HttpResponse; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone)] +pub struct DeleteStoredProcedureBuilder { + client: StoredProcedureClient, + consistency_level: Option, + context: Context, +} + +impl DeleteStoredProcedureBuilder { + pub(crate) fn new(client: StoredProcedureClient) -> Self { + Self { + client, + consistency_level: None, + context: Context::new(), + } + } + + setters! { + consistency_level: ConsistencyLevel => Some(consistency_level), + } + + pub fn into_future(self) -> DeleteStoredProcedure { + Box::pin(async move { + let mut request = self + .client + .prepare_pipeline_with_stored_procedure_name(http::Method::DELETE); + + azure_core::headers::add_optional_header2(&self.consistency_level, &mut request)?; + + let response = self + .client + .pipeline() + .send( + self.context.clone().insert(ResourceType::Permissions), + &mut request, + ) + .await?; + + DeleteStoredProcedureResponse::try_from(response).await + }) + } +} + +/// The future returned by calling `into_future` on the builder. +pub type DeleteStoredProcedure = + futures::future::BoxFuture<'static, crate::Result>; + +#[cfg(feature = "into_future")] +impl std::future::IntoFuture for DeleteStoredProcedureBuilder { + type Future = DeleteStoredProcedure; + type Output = ::Output; + fn into_future(self) -> Self::Future { + Self::into_future(self) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct DeleteStoredProcedureResponse { + pub charge: f64, + pub activity_id: uuid::Uuid, + pub session_token: String, + pub last_change: DateTime, + pub resource_quota: Vec, + pub resource_usage: Vec, +} + +impl DeleteStoredProcedureResponse { + pub async fn try_from(response: HttpResponse) -> crate::Result { + let headers = response.headers(); + + Ok(Self { + charge: request_charge_from_headers(headers)?, + activity_id: activity_id_from_headers(headers)?, + session_token: session_token_from_headers(headers)?, + last_change: last_state_change_from_headers(headers)?, + resource_quota: resource_quota_from_headers(headers)?, + resource_usage: resource_usage_from_headers(headers)?, + }) + } +} diff --git a/sdk/data_cosmos/src/operations/mod.rs b/sdk/data_cosmos/src/operations/mod.rs index 31ffb369c9..4a2bcfe010 100644 --- a/sdk/data_cosmos/src/operations/mod.rs +++ b/sdk/data_cosmos/src/operations/mod.rs @@ -12,6 +12,7 @@ mod delete_collection; mod delete_database; mod delete_document; mod delete_permission; +mod delete_stored_procedure; mod delete_user; mod get_collection; mod get_database; @@ -39,6 +40,7 @@ pub use delete_collection::*; pub use delete_database::*; pub use delete_document::*; pub use delete_permission::*; +pub use delete_stored_procedure::*; pub use delete_user::*; pub use get_collection::*; pub use get_database::*; diff --git a/sdk/data_cosmos/src/requests/delete_stored_procedure_builder.rs b/sdk/data_cosmos/src/requests/delete_stored_procedure_builder.rs deleted file mode 100644 index e79b09cfd5..0000000000 --- a/sdk/data_cosmos/src/requests/delete_stored_procedure_builder.rs +++ /dev/null @@ -1,53 +0,0 @@ -use crate::prelude::*; -use crate::responses::DeleteStoredProcedureResponse; -use azure_core::prelude::*; - -use http::StatusCode; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct DeleteStoredProcedureBuilder<'a, 'b> { - stored_procedure_client: &'a StoredProcedureClient, - user_agent: Option>, - activity_id: Option>, - consistency_level: Option, -} - -impl<'a, 'b> DeleteStoredProcedureBuilder<'a, 'b> { - pub(crate) fn new(stored_procedure_client: &'a StoredProcedureClient) -> Self { - Self { - stored_procedure_client, - user_agent: None, - activity_id: None, - consistency_level: None, - } - } - - setters! { - user_agent: &'b str => Some(UserAgent::new(user_agent)), - activity_id: &'b str => Some(ActivityId::new(activity_id)), - consistency_level: ConsistencyLevel => Some(consistency_level), - } - - pub async fn execute(&self) -> crate::Result { - trace!("DeleteStoredProcedureBuilder::execute called"); - - let request = self - .stored_procedure_client - .prepare_request_with_stored_procedure_name(http::Method::DELETE); - - // add trait headers - let request = azure_core::headers::add_optional_header(&self.user_agent, request); - let request = azure_core::headers::add_optional_header(&self.activity_id, request); - let request = azure_core::headers::add_optional_header(&self.consistency_level, request); - - let request = request.body(azure_core::EMPTY_BODY)?; - - Ok(self - .stored_procedure_client - .http_client() - .execute_request_check_status(request, StatusCode::NO_CONTENT) - .await? - .try_into()?) - } -} diff --git a/sdk/data_cosmos/src/requests/mod.rs b/sdk/data_cosmos/src/requests/mod.rs index b43cc68e45..7da91f5a6a 100644 --- a/sdk/data_cosmos/src/requests/mod.rs +++ b/sdk/data_cosmos/src/requests/mod.rs @@ -11,7 +11,6 @@ mod create_or_replace_user_defined_function_builder; mod create_reference_attachment_builder; mod create_slug_attachment_builder; mod delete_attachment_builder; -mod delete_stored_procedure_builder; mod delete_trigger_builder; mod delete_user_defined_function_builder; mod execute_stored_procedure_builder; @@ -30,7 +29,6 @@ pub use create_or_replace_user_defined_function_builder::CreateOrReplaceUserDefi pub use create_reference_attachment_builder::CreateReferenceAttachmentBuilder; pub use create_slug_attachment_builder::CreateSlugAttachmentBuilder; pub use delete_attachment_builder::DeleteAttachmentBuilder; -pub use delete_stored_procedure_builder::DeleteStoredProcedureBuilder; pub use delete_trigger_builder::DeleteTriggerBuilder; pub use delete_user_defined_function_builder::DeleteUserDefinedFunctionBuilder; pub use execute_stored_procedure_builder::ExecuteStoredProcedureBuilder; diff --git a/sdk/data_cosmos/src/responses/delete_stored_procedure_response.rs b/sdk/data_cosmos/src/responses/delete_stored_procedure_response.rs deleted file mode 100644 index fb1db30d14..0000000000 --- a/sdk/data_cosmos/src/responses/delete_stored_procedure_response.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::headers::from_headers::*; -use crate::ResourceQuota; -use azure_core::headers::session_token_from_headers; -use chrono::{DateTime, Utc}; -use http::response::Response; - -#[derive(Debug, Clone, PartialEq)] -pub struct DeleteStoredProcedureResponse { - pub charge: f64, - pub activity_id: uuid::Uuid, - pub session_token: String, - pub last_change: DateTime, - pub resource_quota: Vec, - pub resource_usage: Vec, -} - -impl std::convert::TryFrom> for DeleteStoredProcedureResponse { - type Error = crate::Error; - - fn try_from(response: Response) -> Result { - let headers = response.headers(); - - Ok(Self { - charge: request_charge_from_headers(headers)?, - activity_id: activity_id_from_headers(headers)?, - session_token: session_token_from_headers(headers)?, - last_change: last_state_change_from_headers(headers)?, - resource_quota: resource_quota_from_headers(headers)?, - resource_usage: resource_usage_from_headers(headers)?, - }) - } -} diff --git a/sdk/data_cosmos/src/responses/mod.rs b/sdk/data_cosmos/src/responses/mod.rs index d6a75dbb8d..dc6ef7ec2d 100644 --- a/sdk/data_cosmos/src/responses/mod.rs +++ b/sdk/data_cosmos/src/responses/mod.rs @@ -8,7 +8,6 @@ mod create_slug_attachment_response; mod create_trigger_response; mod create_user_defined_function_response; mod delete_attachment_response; -mod delete_stored_procedure_response; mod delete_trigger_response; mod delete_user_defined_function_response; mod execute_stored_procedure_response; @@ -28,7 +27,6 @@ pub use create_slug_attachment_response::CreateSlugAttachmentResponse; pub use create_trigger_response::CreateTriggerResponse; pub use create_user_defined_function_response::CreateUserDefinedFunctionResponse; pub use delete_attachment_response::DeleteAttachmentResponse; -pub use delete_stored_procedure_response::DeleteStoredProcedureResponse; pub use delete_trigger_response::DeleteTriggerResponse; pub use delete_user_defined_function_response::DeleteUserDefinedFunctionResponse; pub use execute_stored_procedure_response::ExecuteStoredProcedureResponse; From b0ab9cf3e5606eceda0d88af20c912532e1d0ec3 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Wed, 2 Mar 2022 11:09:22 +0100 Subject: [PATCH 4/5] fmt --- sdk/data_cosmos/src/responses/list_permissions_response.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/data_cosmos/src/responses/list_permissions_response.rs b/sdk/data_cosmos/src/responses/list_permissions_response.rs index e69de29bb2..8b13789179 100644 --- a/sdk/data_cosmos/src/responses/list_permissions_response.rs +++ b/sdk/data_cosmos/src/responses/list_permissions_response.rs @@ -0,0 +1 @@ + From 61fc41162609193e971b98c241a4b28e4b66cc9a Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Wed, 2 Mar 2022 12:21:17 +0100 Subject: [PATCH 5/5] ListAttachments::into_stream --- sdk/data_cosmos/examples/attachments_00.rs | 8 +- .../src/clients/document_client.rs | 12 +- .../src/operations/list_attachments.rs | 169 ++++++++++++++++++ sdk/data_cosmos/src/operations/mod.rs | 2 + .../src/requests/list_attachments_builder.rs | 121 ------------- sdk/data_cosmos/src/requests/mod.rs | 2 - .../responses/list_attachments_response.rs | 91 ---------- .../responses/list_permissions_response.rs | 1 - sdk/data_cosmos/src/responses/mod.rs | 3 - sdk/data_cosmos/tests/attachment_00.rs | 19 +- sdk/data_cosmos/tests/cosmos_collection.rs | 2 +- 11 files changed, 196 insertions(+), 234 deletions(-) create mode 100644 sdk/data_cosmos/src/operations/list_attachments.rs delete mode 100644 sdk/data_cosmos/src/requests/list_attachments_builder.rs delete mode 100644 sdk/data_cosmos/src/responses/list_attachments_response.rs delete mode 100644 sdk/data_cosmos/src/responses/list_permissions_response.rs diff --git a/sdk/data_cosmos/examples/attachments_00.rs b/sdk/data_cosmos/examples/attachments_00.rs index 7251304ef3..8f4a0dc139 100644 --- a/sdk/data_cosmos/examples/attachments_00.rs +++ b/sdk/data_cosmos/examples/attachments_00.rs @@ -1,4 +1,5 @@ use azure_data_cosmos::prelude::*; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use std::error::Error; @@ -62,7 +63,12 @@ async fn main() -> Result<(), Box> { let document_client = client.into_document_client(doc.id.clone(), &doc.id)?; // list attachments - let ret = document_client.list_attachments().execute().await?; + let ret = document_client + .list_attachments() + .into_stream() + .next() + .await + .unwrap()?; println!("list attachments == {:#?}", ret); // reference attachment diff --git a/sdk/data_cosmos/src/clients/document_client.rs b/sdk/data_cosmos/src/clients/document_client.rs index 836b6618ba..d3535c4fd7 100644 --- a/sdk/data_cosmos/src/clients/document_client.rs +++ b/sdk/data_cosmos/src/clients/document_client.rs @@ -1,7 +1,7 @@ use super::{AttachmentClient, CollectionClient, CosmosClient, DatabaseClient}; use crate::operations::*; -use crate::{requests, ReadonlyString}; -use azure_core::{HttpClient, Request}; +use crate::ReadonlyString; +use azure_core::Request; use serde::Serialize; /// A client for Cosmos document resources. @@ -75,8 +75,8 @@ impl DocumentClient { } /// List all attachments for a document - pub fn list_attachments(&self) -> requests::ListAttachmentsBuilder<'_, '_> { - requests::ListAttachmentsBuilder::new(self) + pub fn list_attachments(&self) -> ListAttachmentsBuilder { + ListAttachmentsBuilder::new(self.clone()) } /// Convert into an [`AttachmentClient`] @@ -101,8 +101,4 @@ impl DocumentClient { method, ) } - - pub(crate) fn http_client(&self) -> &dyn HttpClient { - self.cosmos_client().http_client() - } } diff --git a/sdk/data_cosmos/src/operations/list_attachments.rs b/sdk/data_cosmos/src/operations/list_attachments.rs new file mode 100644 index 0000000000..644b6a48b9 --- /dev/null +++ b/sdk/data_cosmos/src/operations/list_attachments.rs @@ -0,0 +1,169 @@ +use crate::headers::from_headers::*; +use crate::prelude::*; +use crate::resources::Attachment; +use crate::resources::ResourceType; +use crate::ResourceQuota; +use azure_core::collect_pinned_stream; +use azure_core::headers; +use azure_core::headers::{ + continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers, +}; +use azure_core::prelude::*; +use azure_core::{Pageable, Response as HttpResponse, SessionToken}; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone)] +pub struct ListAttachmentsBuilder { + client: DocumentClient, + if_match_condition: Option, + consistency_level: Option, + max_item_count: MaxItemCount, + a_im: ChangeFeed, + context: Context, +} + +impl ListAttachmentsBuilder { + pub(crate) fn new(client: DocumentClient) -> Self { + Self { + client, + if_match_condition: None, + consistency_level: None, + max_item_count: MaxItemCount::new(-1), + a_im: ChangeFeed::None, + context: Context::new(), + } + } + + setters! { + consistency_level: ConsistencyLevel => Some(consistency_level), + if_match_condition: IfMatchCondition => Some(if_match_condition), + max_item_count: i32 => MaxItemCount::new(max_item_count), + a_im: ChangeFeed, + context: Context => context, + } + + pub fn into_stream(self) -> ListAttachments { + let make_request = move |continuation: Option| { + let this = self.clone(); + let ctx = self.context.clone(); + async move { + let mut request = this.client.cosmos_client().prepare_request_pipeline( + &format!( + "dbs/{}/colls/{}/docs/{}/attachments", + this.client.database_client().database_name(), + this.client.collection_client().collection_name(), + this.client.document_name() + ), + http::Method::GET, + ); + + azure_core::headers::add_optional_header2(&this.if_match_condition, &mut request)?; + azure_core::headers::add_optional_header2(&this.consistency_level, &mut request)?; + azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut request)?; + azure_core::headers::add_mandatory_header2(&this.a_im, &mut request)?; + crate::cosmos_entity::add_as_partition_key_header_serialized2( + this.client.partition_key_serialized(), + &mut request, + ); + + if let Some(c) = continuation { + let h = http::HeaderValue::from_str(c.as_str()) + .map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?; + request.headers_mut().append(headers::CONTINUATION, h); + } + + let response = this + .client + .cosmos_client() + .pipeline() + .send(ctx.clone().insert(ResourceType::Attachments), &mut request) + .await?; + ListAttachmentsResponse::try_from(response).await + } + }; + + Pageable::new(make_request) + } +} + +pub type ListAttachments = Pageable; + +#[derive(Debug, Clone, Deserialize)] +struct JsonListAttachmentResponse { + #[serde(rename = "_rid")] + pub rid: String, + #[serde(rename = "_count")] + pub count: u64, + #[serde(rename = "Attachments")] + pub attachments: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ListAttachmentsResponse { + pub rid: String, + pub count: u64, + pub attachments: Vec, + + pub max_media_storage_usage_mb: u64, + pub media_storage_usage_mb: u64, + pub last_change: DateTime, + pub resource_quota: Vec, + pub resource_usage: Vec, + pub lsn: u64, + pub item_count: u32, + pub alt_content_path: String, + pub content_path: String, + pub global_committed_lsn: u64, + pub number_of_read_regions: u32, + pub transport_request_id: u64, + pub cosmos_llsn: u64, + pub session_token: SessionToken, + pub request_charge: f64, + pub service_version: String, + pub activity_id: uuid::Uuid, + pub gateway_version: String, + pub date: DateTime, + pub continuation_token: Option, +} + +impl ListAttachmentsResponse { + pub async fn try_from(response: HttpResponse) -> crate::Result { + let (_status_code, headers, pinned_stream) = response.deconstruct(); + let body = collect_pinned_stream(pinned_stream).await?; + + let json: JsonListAttachmentResponse = serde_json::from_slice(&body)?; + + Ok(Self { + rid: json.rid, + count: json.count, + attachments: json.attachments, + + max_media_storage_usage_mb: max_media_storage_usage_mb_from_headers(&headers)?, + media_storage_usage_mb: media_storage_usage_mb_from_headers(&headers)?, + last_change: last_state_change_from_headers(&headers)?, + resource_quota: resource_quota_from_headers(&headers)?, + resource_usage: resource_usage_from_headers(&headers)?, + lsn: lsn_from_headers(&headers)?, + item_count: item_count_from_headers(&headers)?, + alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(), + content_path: content_path_from_headers(&headers)?.to_owned(), + global_committed_lsn: global_committed_lsn_from_headers(&headers)?, + number_of_read_regions: number_of_read_regions_from_headers(&headers)?, + transport_request_id: transport_request_id_from_headers(&headers)?, + cosmos_llsn: cosmos_llsn_from_headers(&headers)?, + session_token: session_token_from_headers(&headers)?, + request_charge: request_charge_from_headers(&headers)?, + service_version: service_version_from_headers(&headers)?.to_owned(), + activity_id: activity_id_from_headers(&headers)?, + gateway_version: gateway_version_from_headers(&headers)?.to_owned(), + date: date_from_headers(&headers)?, + continuation_token: continuation_token_from_headers_optional(&headers)?, + }) + } +} + +impl Continuable for ListAttachmentsResponse { + fn continuation(&self) -> Option { + self.continuation_token.clone() + } +} diff --git a/sdk/data_cosmos/src/operations/mod.rs b/sdk/data_cosmos/src/operations/mod.rs index 4a2bcfe010..3b3cd17f40 100644 --- a/sdk/data_cosmos/src/operations/mod.rs +++ b/sdk/data_cosmos/src/operations/mod.rs @@ -19,6 +19,7 @@ mod get_database; mod get_document; mod get_permission; mod get_user; +mod list_attachments; mod list_collections; mod list_databases; mod list_documents; @@ -47,6 +48,7 @@ pub use get_database::*; pub use get_document::*; pub use get_permission::*; pub use get_user::*; +pub use list_attachments::*; pub use list_collections::*; pub use list_databases::*; pub use list_documents::*; diff --git a/sdk/data_cosmos/src/requests/list_attachments_builder.rs b/sdk/data_cosmos/src/requests/list_attachments_builder.rs deleted file mode 100644 index 72f37c3b0d..0000000000 --- a/sdk/data_cosmos/src/requests/list_attachments_builder.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::prelude::*; -use crate::resources::ResourceType; -use crate::responses::ListAttachmentsResponse; -use azure_core::prelude::*; -use futures::stream::{unfold, Stream}; -use http::StatusCode; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ListAttachmentsBuilder<'a, 'b> { - document_client: &'a DocumentClient, - if_match_condition: Option, - user_agent: Option>, - activity_id: Option>, - consistency_level: Option, - continuation: Option>, - max_item_count: MaxItemCount, - a_im: ChangeFeed, -} - -impl<'a, 'b> ListAttachmentsBuilder<'a, 'b> { - pub(crate) fn new(document_client: &'a DocumentClient) -> Self { - Self { - document_client, - if_match_condition: None, - user_agent: None, - activity_id: None, - consistency_level: None, - continuation: None, - max_item_count: MaxItemCount::new(-1), - a_im: ChangeFeed::None, - } - } - - setters! { - user_agent: &'b str => Some(UserAgent::new(user_agent)), - activity_id: &'b str => Some(ActivityId::new(activity_id)), - consistency_level: ConsistencyLevel => Some(consistency_level), - if_match_condition: IfMatchCondition => Some(if_match_condition), - continuation: &'b str => Some(Continuation::new(continuation)), - max_item_count: i32 => MaxItemCount::new(max_item_count), - a_im: ChangeFeed, - } - - pub async fn execute(&self) -> crate::Result { - let mut req = self.document_client.cosmos_client().prepare_request( - &format!( - "dbs/{}/colls/{}/docs/{}/attachments", - self.document_client.database_client().database_name(), - self.document_client.collection_client().collection_name(), - self.document_client.document_name() - ), - http::Method::GET, - ResourceType::Attachments, - ); - - // add trait headers - req = azure_core::headers::add_optional_header(&self.if_match_condition, req); - req = azure_core::headers::add_optional_header(&self.user_agent, req); - req = azure_core::headers::add_optional_header(&self.activity_id, req); - req = azure_core::headers::add_optional_header(&self.consistency_level, req); - req = azure_core::headers::add_optional_header(&self.continuation, req); - req = azure_core::headers::add_mandatory_header(&self.max_item_count, req); - req = azure_core::headers::add_mandatory_header(&self.a_im, req); - - req = crate::cosmos_entity::add_as_partition_key_header_serialized( - self.document_client.partition_key_serialized(), - req, - ); - - let req = req.body(azure_core::EMPTY_BODY)?; - - Ok(self - .document_client - .http_client() - .execute_request_check_status(req, StatusCode::OK) - .await? - .try_into()?) - } - - pub fn stream(&self) -> impl Stream> + '_ { - #[derive(Debug, Clone, PartialEq)] - enum States { - Init, - Continuation(String), - } - - unfold( - Some(States::Init), - move |continuation_token: Option| { - async move { - debug!("continuation_token == {:?}", &continuation_token); - let response = match continuation_token { - Some(States::Init) => self.execute().await, - Some(States::Continuation(continuation_token)) => { - self.clone() - .continuation(continuation_token.as_str()) - .execute() - .await - } - None => return None, - }; - - // the ? operator does not work in async move (yet?) - // so we have to resort to this boilerplate - let response = match response { - Ok(response) => response, - Err(err) => return Some((Err(err), None)), - }; - - let continuation_token = response - .continuation_token - .as_ref() - .map(|ct| States::Continuation(ct.to_owned())); - - Some((Ok(response), continuation_token)) - } - }, - ) - } -} diff --git a/sdk/data_cosmos/src/requests/mod.rs b/sdk/data_cosmos/src/requests/mod.rs index 7da91f5a6a..34d54f14aa 100644 --- a/sdk/data_cosmos/src/requests/mod.rs +++ b/sdk/data_cosmos/src/requests/mod.rs @@ -16,7 +16,6 @@ mod delete_user_defined_function_builder; mod execute_stored_procedure_builder; mod get_attachment_builder; mod get_partition_key_ranges_builder; -mod list_attachments_builder; mod list_stored_procedures_builder; mod list_triggers_builder; mod list_user_defined_functions_builder; @@ -34,7 +33,6 @@ pub use delete_user_defined_function_builder::DeleteUserDefinedFunctionBuilder; pub use execute_stored_procedure_builder::ExecuteStoredProcedureBuilder; pub use get_attachment_builder::GetAttachmentBuilder; pub use get_partition_key_ranges_builder::GetPartitionKeyRangesBuilder; -pub use list_attachments_builder::ListAttachmentsBuilder; pub use list_stored_procedures_builder::ListStoredProceduresBuilder; pub use list_triggers_builder::ListTriggersBuilder; pub use list_user_defined_functions_builder::ListUserDefinedFunctionsBuilder; diff --git a/sdk/data_cosmos/src/responses/list_attachments_response.rs b/sdk/data_cosmos/src/responses/list_attachments_response.rs deleted file mode 100644 index e80018553b..0000000000 --- a/sdk/data_cosmos/src/responses/list_attachments_response.rs +++ /dev/null @@ -1,91 +0,0 @@ -use crate::headers::from_headers::*; -use crate::resources::Attachment; -use crate::ResourceQuota; -use azure_core::headers::{ - continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers, -}; -use azure_core::SessionToken; -use chrono::{DateTime, Utc}; -use http::response::Response; - -#[derive(Debug, Clone, Deserialize)] -struct JsonListAttachmentResponse { - #[serde(rename = "_rid")] - pub rid: String, - #[serde(rename = "_count")] - pub count: u64, - #[serde(rename = "Attachments")] - pub attachments: Vec, -} - -#[derive(Debug, Clone, PartialEq)] -pub struct ListAttachmentsResponse { - pub rid: String, - pub count: u64, - pub attachments: Vec, - - pub max_media_storage_usage_mb: u64, - pub media_storage_usage_mb: u64, - pub last_change: DateTime, - pub resource_quota: Vec, - pub resource_usage: Vec, - pub lsn: u64, - pub item_count: u32, - pub alt_content_path: String, - pub content_path: String, - pub global_committed_lsn: u64, - pub number_of_read_regions: u32, - pub transport_request_id: u64, - pub cosmos_llsn: u64, - pub session_token: SessionToken, - pub request_charge: f64, - pub service_version: String, - pub activity_id: uuid::Uuid, - pub gateway_version: String, - pub date: DateTime, - pub continuation_token: Option, -} - -impl std::convert::TryFrom> for ListAttachmentsResponse { - type Error = crate::Error; - - fn try_from(response: Response) -> Result { - let headers = response.headers(); - let body = response.body(); - - debug!("headers == {:#?}", headers); - debug!("body == {:#?}", body); - - let json: JsonListAttachmentResponse = serde_json::from_slice(body)?; - - Ok(Self { - rid: json.rid, - count: json.count, - attachments: json.attachments, - - max_media_storage_usage_mb: max_media_storage_usage_mb_from_headers(headers)?, - media_storage_usage_mb: media_storage_usage_mb_from_headers(headers)?, - last_change: last_state_change_from_headers(headers)?, - resource_quota: resource_quota_from_headers(headers)?, - resource_usage: resource_usage_from_headers(headers)?, - lsn: lsn_from_headers(headers)?, - item_count: item_count_from_headers(headers)?, - alt_content_path: alt_content_path_from_headers(headers)?.to_owned(), - content_path: content_path_from_headers(headers)?.to_owned(), - global_committed_lsn: global_committed_lsn_from_headers(headers)?, - number_of_read_regions: number_of_read_regions_from_headers(headers)?, - transport_request_id: transport_request_id_from_headers(headers)?, - cosmos_llsn: cosmos_llsn_from_headers(headers)?, - session_token: session_token_from_headers(headers)?, - request_charge: request_charge_from_headers(headers)?, - service_version: service_version_from_headers(headers)?.to_owned(), - activity_id: activity_id_from_headers(headers)?, - gateway_version: gateway_version_from_headers(headers)?.to_owned(), - date: date_from_headers(headers)?, - continuation_token: continuation_token_from_headers_optional(headers)?, - }) - } -} - -#[cfg(test)] -mod tests {} diff --git a/sdk/data_cosmos/src/responses/list_permissions_response.rs b/sdk/data_cosmos/src/responses/list_permissions_response.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/sdk/data_cosmos/src/responses/list_permissions_response.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/sdk/data_cosmos/src/responses/mod.rs b/sdk/data_cosmos/src/responses/mod.rs index dc6ef7ec2d..66a2de9484 100644 --- a/sdk/data_cosmos/src/responses/mod.rs +++ b/sdk/data_cosmos/src/responses/mod.rs @@ -13,8 +13,6 @@ mod delete_user_defined_function_response; mod execute_stored_procedure_response; mod get_attachment_response; mod get_partition_key_ranges_response; -mod list_attachments_response; -mod list_permissions_response; mod list_stored_procedures_response; mod list_triggers_response; mod list_user_defined_functions_response; @@ -32,7 +30,6 @@ pub use delete_user_defined_function_response::DeleteUserDefinedFunctionResponse pub use execute_stored_procedure_response::ExecuteStoredProcedureResponse; pub use get_attachment_response::GetAttachmentResponse; pub use get_partition_key_ranges_response::GetPartitionKeyRangesResponse; -pub use list_attachments_response::ListAttachmentsResponse; pub use list_stored_procedures_response::ListStoredProceduresResponse; pub use list_triggers_response::ListTriggersResponse; pub use list_user_defined_functions_response::ListUserDefinedFunctionsResponse; diff --git a/sdk/data_cosmos/tests/attachment_00.rs b/sdk/data_cosmos/tests/attachment_00.rs index e9a98589f0..b651eaf1b4 100644 --- a/sdk/data_cosmos/tests/attachment_00.rs +++ b/sdk/data_cosmos/tests/attachment_00.rs @@ -1,5 +1,6 @@ #![cfg(all(test, feature = "test_e2e"))] use azure_data_cosmos::prelude::*; +use futures::StreamExt; use serde::{Deserialize, Serialize}; mod setup; @@ -97,8 +98,10 @@ async fn attachment() -> Result<(), azure_data_cosmos::Error> { let ret = document_client .list_attachments() .consistency_level(session_token.clone()) - .execute() - .await?; + .into_stream() + .next() + .await + .unwrap()?; assert_eq!(0, ret.attachments.len()); // create reference attachment @@ -129,8 +132,10 @@ async fn attachment() -> Result<(), azure_data_cosmos::Error> { let ret = document_client .list_attachments() .consistency_level(&resp) - .execute() - .await?; + .into_stream() + .next() + .await + .unwrap()?; assert_eq!(2, ret.attachments.len()); // get reference attachment, it must have the updated media link @@ -169,8 +174,10 @@ async fn attachment() -> Result<(), azure_data_cosmos::Error> { let ret = document_client .list_attachments() .consistency_level(&resp_delete) - .execute() - .await?; + .into_stream() + .next() + .await + .unwrap()?; assert_eq!(1, ret.attachments.len()); // delete the database diff --git a/sdk/data_cosmos/tests/cosmos_collection.rs b/sdk/data_cosmos/tests/cosmos_collection.rs index 52d523f81e..fb7bb92279 100644 --- a/sdk/data_cosmos/tests/cosmos_collection.rs +++ b/sdk/data_cosmos/tests/cosmos_collection.rs @@ -3,7 +3,7 @@ mod setup; use azure_data_cosmos::prelude::*; use azure_data_cosmos::resources::collection::*; -use futures::stream::StreamExt; +use futures::StreamExt; #[tokio::test] async fn create_and_delete_collection() {