diff --git a/sdk/data_cosmos/examples/document_entries_01.rs b/sdk/data_cosmos/examples/document_entries_01.rs index f61cc905c4..f76270f0a4 100644 --- a/sdk/data_cosmos/examples/document_entries_01.rs +++ b/sdk/data_cosmos/examples/document_entries_01.rs @@ -90,11 +90,13 @@ async fn main() -> Result<(), Box> { println!("list_documents_response == {:#?}", list_documents_response); let query_documents_response = client - .query_documents() + .query_documents("SELECT * FROM c WHERE c.a_number = 600") .consistency_level(&list_documents_response) .query_cross_partition(true) - .execute::("SELECT * FROM c WHERE c.a_number = 600") - .await?; + .into_stream::() + .next() + .await + .unwrap()?; println!( "query_documents_response == {:#?}", query_documents_response diff --git a/sdk/data_cosmos/examples/query_document_00.rs b/sdk/data_cosmos/examples/query_document_00.rs index 4de20b0e97..d05c1689d5 100644 --- a/sdk/data_cosmos/examples/query_document_00.rs +++ b/sdk/data_cosmos/examples/query_document_00.rs @@ -1,5 +1,5 @@ use azure_data_cosmos::prelude::*; -use azure_data_cosmos::responses::QueryDocumentsResponse; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use std::error::Error; @@ -46,23 +46,27 @@ async fn main() -> Result<(), Box> { let client = client.into_database_client(database_name); let client = client.into_collection_client(collection_name); - let query_obj = Query::new(&query); + let query_obj = Query::new(query); let respo: QueryDocumentsResponse = client - .query_documents() + .query_documents(query_obj.clone()) .query_cross_partition(true) .max_item_count(3i32) - .execute(&query_obj) - .await?; + .into_stream() + .next() + .await + .unwrap()?; println!("as json == {:?}", respo); let respo: QueryDocumentsResponse = client - .query_documents() + .query_documents(query_obj) .query_cross_partition(true) .parallelize_cross_partition_query(true) .max_item_count(2) - .execute(&query_obj) - .await?; + .into_stream() + .next() + .await + .unwrap()?; println!("as items == {:?}", respo); //let ret = client diff --git a/sdk/data_cosmos/examples/readme.rs b/sdk/data_cosmos/examples/readme.rs index 318b6f67ff..9c9561f166 100644 --- a/sdk/data_cosmos/examples/readme.rs +++ b/sdk/data_cosmos/examples/readme.rs @@ -113,11 +113,13 @@ async fn main() -> Result<(), Box> { // TASK 3 println!("\nQuerying documents"); let query_documents_response = collection_client - .query_documents() + .query_documents("SELECT * FROM A WHERE A.a_number < 600") .query_cross_partition(true) // this will perform a cross partition query! notice how simple it is! .consistency_level(session_token) - .execute::("SELECT * FROM A WHERE A.a_number < 600") // there are other ways to construct a query, this is the simplest. - .await? + .into_stream::() // there are other ways to construct a query, this is the simplest. + .next() + .await + .unwrap()? .into_documents() // queries can return Documents or Raw json (ie without etag, _rid, etc...). Since our query return docs we convert with this function. .unwrap(); // we know in advance that the conversion to Document will not fail since we SELECT'ed * FROM table diff --git a/sdk/data_cosmos/examples/stored_proc_01.rs b/sdk/data_cosmos/examples/stored_proc_01.rs index 187e54f18c..ac3a54c2db 100644 --- a/sdk/data_cosmos/examples/stored_proc_01.rs +++ b/sdk/data_cosmos/examples/stored_proc_01.rs @@ -6,6 +6,7 @@ /// response.setBody("Hello, " + personToGreet); /// } use azure_data_cosmos::prelude::*; +use futures::StreamExt; use std::error::Error; #[tokio::main] @@ -46,8 +47,12 @@ async fn main() -> Result<(), Box> { .clone() .into_stored_procedure_client(stored_procedure_name); - let list_stored_procedures_response = - collection_client.list_stored_procedures().execute().await?; + let list_stored_procedures_response = collection_client + .list_stored_procedures() + .into_stream() + .next() + .await + .unwrap()?; println!( "list_stored_procedures_response == {:#?}", list_stored_procedures_response diff --git a/sdk/data_cosmos/examples/user_defined_function_00.rs b/sdk/data_cosmos/examples/user_defined_function_00.rs index 26788d4d12..b90ff2bb04 100644 --- a/sdk/data_cosmos/examples/user_defined_function_00.rs +++ b/sdk/data_cosmos/examples/user_defined_function_00.rs @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box> { .list_user_defined_functions() .max_item_count(3) .consistency_level(&ret); - let mut stream = Box::pin(stream.stream()); + let mut stream = stream.into_stream(); while let Some(ret) = stream.next().await { let ret = ret.unwrap(); println!( @@ -68,11 +68,13 @@ async fn main() -> Result<(), Box> { println!("Replace response object:\n{:#?}", ret); let ret = collection_client - .query_documents() + .query_documents("SELECT udf.test15(100)") .consistency_level(&ret) .max_item_count(2i32) - .execute::("SELECT udf.test15(100)") - .await? + .into_stream::() + .next() + .await + .unwrap()? .into_raw(); println!("Query response object:\n{:#?}", ret); diff --git a/sdk/data_cosmos/src/clients/collection_client.rs b/sdk/data_cosmos/src/clients/collection_client.rs index 4299da7524..1a1cba3452 100644 --- a/sdk/data_cosmos/src/clients/collection_client.rs +++ b/sdk/data_cosmos/src/clients/collection_client.rs @@ -3,6 +3,7 @@ use crate::clients::*; use crate::operations::*; use crate::requests; use crate::resources::collection::PartitionKey; +use crate::resources::document::Query; use crate::CosmosEntity; use crate::ReadonlyString; use azure_core::{HttpClient, Pipeline, Request}; @@ -73,18 +74,18 @@ impl CollectionClient { } /// query documents in a collection - pub fn query_documents(&self) -> requests::QueryDocumentsBuilder<'_, '_> { - requests::QueryDocumentsBuilder::new(self) + pub fn query_documents>(&self, query: Q) -> QueryDocumentsBuilder { + QueryDocumentsBuilder::new(self.clone(), query.into()) } /// list stored procedures in a collection - pub fn list_stored_procedures(&self) -> requests::ListStoredProceduresBuilder<'_, '_> { - requests::ListStoredProceduresBuilder::new(self) + pub fn list_stored_procedures(&self) -> ListStoredProceduresBuilder { + ListStoredProceduresBuilder::new(self.clone()) } /// list user defined functions in a collection - pub fn list_user_defined_functions(&self) -> requests::ListUserDefinedFunctionsBuilder<'_, '_> { - requests::ListUserDefinedFunctionsBuilder::new(self) + pub fn list_user_defined_functions(&self) -> ListUserDefinedFunctionsBuilder { + ListUserDefinedFunctionsBuilder::new(self.clone()) } /// list triggers in a collection diff --git a/sdk/data_cosmos/src/clients/document_client.rs b/sdk/data_cosmos/src/clients/document_client.rs index d3535c4fd7..45e52878e0 100644 --- a/sdk/data_cosmos/src/clients/document_client.rs +++ b/sdk/data_cosmos/src/clients/document_client.rs @@ -61,7 +61,7 @@ impl DocumentClient { GetDocumentBuilder::new(self.clone()) } - /// replace a document in a collection + /// Replace a document in a collection pub fn replace_document( &self, document: D, diff --git a/sdk/data_cosmos/src/operations/list_stored_procedures.rs b/sdk/data_cosmos/src/operations/list_stored_procedures.rs new file mode 100644 index 0000000000..d80b04b6bb --- /dev/null +++ b/sdk/data_cosmos/src/operations/list_stored_procedures.rs @@ -0,0 +1,122 @@ +use crate::headers::from_headers::*; +use crate::prelude::*; +use crate::resources::ResourceType; +use crate::resources::StoredProcedure; +use crate::ResourceQuota; +use azure_core::collect_pinned_stream; +use azure_core::headers; +use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers}; +use azure_core::prelude::*; +use azure_core::{Pageable, Response as HttpResponse}; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone)] +pub struct ListStoredProceduresBuilder { + client: CollectionClient, + consistency_level: Option, + max_item_count: MaxItemCount, + context: Context, +} + +impl ListStoredProceduresBuilder { + pub(crate) fn new(client: CollectionClient) -> Self { + Self { + client, + consistency_level: None, + max_item_count: MaxItemCount::new(-1), + context: Context::new(), + } + } + + setters! { + consistency_level: ConsistencyLevel => Some(consistency_level), + max_item_count: i32 => MaxItemCount::new(max_item_count), + context: Context => context, + } + + pub fn into_stream(self) -> ListStoredProcedures { + let make_request = move |continuation: Option| { + let this = self.clone(); + let ctx = self.context.clone(); + async move { + let mut request = this.client.cosmos_client().prepare_request_pipeline( + &format!( + "dbs/{}/colls/{}/sprocs", + this.client.database_client().database_name(), + this.client.collection_name(), + ), + http::Method::GET, + ); + + azure_core::headers::add_optional_header2(&this.consistency_level, &mut request)?; + azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut request)?; + + if let Some(c) = continuation { + let h = http::HeaderValue::from_str(c.as_str()) + .map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?; + request.headers_mut().append(headers::CONTINUATION, h); + } + + let response = this + .client + .pipeline() + .send( + ctx.clone().insert(ResourceType::StoredProcedures), + &mut request, + ) + .await?; + ListStoredProceduresResponse::try_from(response).await + } + }; + + Pageable::new(make_request) + } +} + +pub type ListStoredProcedures = Pageable; + +#[derive(Debug, Clone, PartialEq)] +pub struct ListStoredProceduresResponse { + pub stored_procedures: Vec, + pub charge: f64, + pub activity_id: uuid::Uuid, + pub session_token: String, + pub last_change: DateTime, + pub resource_quota: Vec, + pub resource_usage: Vec, + pub gateway_version: String, + pub continuation_token: Option, +} + +impl ListStoredProceduresResponse { + pub async fn try_from(response: HttpResponse) -> crate::Result { + let (_status_code, headers, pinned_stream) = response.deconstruct(); + let body = collect_pinned_stream(pinned_stream).await?; + + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + struct Response { + pub _rid: String, + #[serde(rename = "StoredProcedures")] + pub stored_procedures: Vec, + pub _count: u64, + } + + Ok(Self { + stored_procedures: serde_json::from_slice::(&body)?.stored_procedures, + charge: request_charge_from_headers(&headers)?, + activity_id: activity_id_from_headers(&headers)?, + session_token: session_token_from_headers(&headers)?, + last_change: last_state_change_from_headers(&headers)?, + resource_quota: resource_quota_from_headers(&headers)?, + resource_usage: resource_usage_from_headers(&headers)?, + gateway_version: gateway_version_from_headers(&headers)?.to_owned(), + continuation_token: continuation_token_from_headers_optional(&headers)?, + }) + } +} + +impl Continuable for ListStoredProceduresResponse { + fn continuation(&self) -> Option { + self.continuation_token.clone() + } +} diff --git a/sdk/data_cosmos/src/operations/list_user_defined_functions.rs b/sdk/data_cosmos/src/operations/list_user_defined_functions.rs new file mode 100644 index 0000000000..5cf2da3efd --- /dev/null +++ b/sdk/data_cosmos/src/operations/list_user_defined_functions.rs @@ -0,0 +1,160 @@ +use crate::headers::from_headers::*; +use crate::prelude::*; +use crate::resources::ResourceType; +use crate::resources::UserDefinedFunction; +use crate::ResourceQuota; +use azure_core::collect_pinned_stream; +use azure_core::headers::{ + continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers, +}; +use azure_core::{headers, prelude::*, Pageable, Response as HttpResponse}; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone)] +pub struct ListUserDefinedFunctionsBuilder { + client: CollectionClient, + if_match_condition: Option, + consistency_level: Option, + max_item_count: MaxItemCount, + context: Context, +} + +impl ListUserDefinedFunctionsBuilder { + pub(crate) fn new(client: CollectionClient) -> Self { + Self { + client, + if_match_condition: None, + consistency_level: None, + max_item_count: MaxItemCount::new(-1), + context: Context::new(), + } + } + + setters! { + consistency_level: ConsistencyLevel => Some(consistency_level), + max_item_count: i32 => MaxItemCount::new(max_item_count), + if_match_condition: IfMatchCondition => Some(if_match_condition), + context: Context => context, + } + + pub fn into_stream(self) -> ListUserDefinedFunctions { + let make_request = move |continuation: Option| { + let this = self.clone(); + let ctx = self.context.clone(); + async move { + let mut request = this.client.cosmos_client().prepare_request_pipeline( + &format!( + "dbs/{}/colls/{}/udfs", + this.client.database_client().database_name(), + this.client.collection_name() + ), + http::Method::GET, + ); + + azure_core::headers::add_optional_header2(&this.if_match_condition, &mut request)?; + azure_core::headers::add_optional_header2(&this.consistency_level, &mut request)?; + azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut request)?; + + if let Some(c) = continuation { + let h = http::HeaderValue::from_str(c.as_str()) + .map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?; + request.headers_mut().append(headers::CONTINUATION, h); + } + + let response = this + .client + .pipeline() + .send( + ctx.clone().insert(ResourceType::UserDefinedFunctions), + &mut request, + ) + .await?; + ListUserDefinedFunctionsResponse::try_from(response).await + } + }; + + Pageable::new(make_request) + } +} + +pub type ListUserDefinedFunctions = Pageable; + +#[derive(Debug, Clone, PartialEq)] +pub struct ListUserDefinedFunctionsResponse { + pub rid: String, + pub user_defined_functions: Vec, + pub content_location: String, + pub server: String, + pub last_state_change: DateTime, + pub continuation_token: Option, + pub resource_quota: Vec, + pub resource_usage: Vec, + pub lsn: u64, + pub item_count: u32, + pub schema_version: String, + pub alt_content_path: String, + pub content_path: String, + pub role: u32, + pub global_committed_lsn: u64, + pub number_of_read_regions: u32, + pub transport_request_id: u64, + pub cosmos_llsn: u64, + pub session_token: String, + pub charge: f64, + pub service_version: String, + pub activity_id: uuid::Uuid, + pub gateway_version: String, + pub date: DateTime, +} + +impl ListUserDefinedFunctionsResponse { + pub async fn try_from(response: HttpResponse) -> crate::Result { + let (_status_code, headers, pinned_stream) = response.deconstruct(); + let body = collect_pinned_stream(pinned_stream).await?; + + #[derive(Debug, Deserialize)] + struct Response<'a> { + #[serde(rename = "_rid")] + rid: &'a str, + #[serde(rename = "UserDefinedFunctions")] + user_defined_functions: Vec, + #[serde(rename = "_count")] + #[allow(unused)] + count: u32, + } + let response: Response = serde_json::from_slice(&body)?; + + Ok(Self { + rid: response.rid.to_owned(), + user_defined_functions: response.user_defined_functions, + content_location: content_location_from_headers(&headers)?.to_owned(), + server: server_from_headers(&headers)?.to_owned(), + last_state_change: last_state_change_from_headers(&headers)?, + continuation_token: continuation_token_from_headers_optional(&headers)?, + resource_quota: resource_quota_from_headers(&headers)?, + resource_usage: resource_usage_from_headers(&headers)?, + lsn: lsn_from_headers(&headers)?, + item_count: item_count_from_headers(&headers)?, + schema_version: schema_version_from_headers(&headers)?.to_owned(), + alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(), + content_path: content_path_from_headers(&headers)?.to_owned(), + role: role_from_headers(&headers)?, + global_committed_lsn: global_committed_lsn_from_headers(&headers)?, + number_of_read_regions: number_of_read_regions_from_headers(&headers)?, + transport_request_id: transport_request_id_from_headers(&headers)?, + cosmos_llsn: cosmos_llsn_from_headers(&headers)?, + session_token: session_token_from_headers(&headers)?, + charge: request_charge_from_headers(&headers)?, + service_version: service_version_from_headers(&headers)?.to_owned(), + activity_id: activity_id_from_headers(&headers)?, + gateway_version: gateway_version_from_headers(&headers)?.to_owned(), + date: date_from_headers(&headers)?, + }) + } +} + +impl Continuable for ListUserDefinedFunctionsResponse { + fn continuation(&self) -> Option { + self.continuation_token.clone() + } +} diff --git a/sdk/data_cosmos/src/operations/mod.rs b/sdk/data_cosmos/src/operations/mod.rs index 3b3cd17f40..edf562e87b 100644 --- a/sdk/data_cosmos/src/operations/mod.rs +++ b/sdk/data_cosmos/src/operations/mod.rs @@ -24,7 +24,10 @@ mod list_collections; mod list_databases; mod list_documents; mod list_permissions; +mod list_stored_procedures; +mod list_user_defined_functions; mod list_users; +mod query_documents; mod replace_collection; mod replace_document; mod replace_permission; @@ -53,7 +56,10 @@ pub use list_collections::*; pub use list_databases::*; pub use list_documents::*; pub use list_permissions::*; +pub use list_stored_procedures::*; +pub use list_user_defined_functions::*; pub use list_users::*; +pub use query_documents::*; pub use replace_collection::*; pub use replace_document::*; pub use replace_permission::*; diff --git a/sdk/data_cosmos/src/responses/query_documents_response.rs b/sdk/data_cosmos/src/operations/query_documents.rs similarity index 58% rename from sdk/data_cosmos/src/responses/query_documents_response.rs rename to sdk/data_cosmos/src/operations/query_documents.rs index 6012a3fac2..ff2f6d75e7 100644 --- a/sdk/data_cosmos/src/responses/query_documents_response.rs +++ b/sdk/data_cosmos/src/operations/query_documents.rs @@ -1,9 +1,17 @@ use crate::headers::from_headers::*; +use crate::prelude::*; use crate::resources::document::DocumentAttributes; +use crate::resources::document::Query; +use crate::resources::ResourceType; use crate::ResourceQuota; +use azure_core::collect_pinned_stream; use azure_core::headers::{ - continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers, + self, continuation_token_from_headers_optional, item_count_from_headers, + session_token_from_headers, }; +use azure_core::prelude::*; +use azure_core::Pageable; +use azure_core::Response as HttpResponse; use azure_core::SessionToken; use chrono::{DateTime, Utc}; use http::response::Response; @@ -11,6 +19,120 @@ use serde::de::DeserializeOwned; use serde_json::Value; use std::convert::TryInto; +#[derive(Debug, Clone)] +pub struct QueryDocumentsBuilder { + client: CollectionClient, + query: Query, + if_match_condition: Option, + if_modified_since: Option, + consistency_level: Option, + max_item_count: MaxItemCount, + partition_key_serialized: Option, + query_cross_partition: QueryCrossPartition, + #[allow(unused)] + parallelize_cross_partition_query: ParallelizeCrossPartition, + context: Context, +} + +impl QueryDocumentsBuilder { + pub(crate) fn new(client: CollectionClient, query: Query) -> Self { + Self { + client, + query, + if_match_condition: None, + if_modified_since: None, + consistency_level: None, + max_item_count: MaxItemCount::new(-1), + partition_key_serialized: None, + query_cross_partition: QueryCrossPartition::No, + // TODO: use this in request + parallelize_cross_partition_query: ParallelizeCrossPartition::No, + context: Context::new(), + } + } + + setters! { + consistency_level: ConsistencyLevel => Some(consistency_level), + if_match_condition: IfMatchCondition => Some(if_match_condition), + max_item_count: i32 => MaxItemCount::new(max_item_count), + if_modified_since: DateTime => Some(IfModifiedSince::new(if_modified_since)), + query_cross_partition: bool => if query_cross_partition { QueryCrossPartition::Yes } else { QueryCrossPartition::No }, + parallelize_cross_partition_query: bool => if parallelize_cross_partition_query { ParallelizeCrossPartition::Yes } else { ParallelizeCrossPartition::No }, + context: Context => context, + } + + pub fn partition_key(self, pk: &PK) -> Result { + Ok(Self { + partition_key_serialized: Some(crate::cosmos_entity::serialize_partition_key(pk)?), + ..self + }) + } + + pub fn into_stream(self) -> QueryDocuments + where + T: DeserializeOwned, + { + let make_request = move |continuation: Option| { + let this = self.clone(); + let ctx = self.context.clone(); + async move { + let mut request = this.client.cosmos_client().prepare_request_pipeline( + &format!( + "dbs/{}/colls/{}/docs", + this.client.database_client().database_name(), + this.client.collection_name() + ), + http::Method::POST, + ); + + // signal that this is a query + request.headers_mut().insert( + crate::headers::HEADER_DOCUMENTDB_ISQUERY, + http::HeaderValue::from_str("true").unwrap(), + ); + request.headers_mut().insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_str("application/query+json").unwrap(), + ); + + azure_core::headers::add_optional_header2(&this.if_match_condition, &mut request)?; + azure_core::headers::add_optional_header2(&this.if_modified_since, &mut request)?; + azure_core::headers::add_optional_header2(&this.consistency_level, &mut request)?; + azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut request)?; + azure_core::headers::add_mandatory_header2( + &this.query_cross_partition, + &mut request, + )?; + + request.set_body(bytes::Bytes::from(serde_json::to_string(&this.query)?).into()); + if let Some(partition_key_serialized) = this.partition_key_serialized.as_ref() { + crate::cosmos_entity::add_as_partition_key_header_serialized2( + partition_key_serialized, + &mut request, + ); + } + + if let Some(c) = continuation { + let h = http::HeaderValue::from_str(c.as_str()) + .map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?; + request.headers_mut().append(headers::CONTINUATION, h); + } + + let response = this + .client + .pipeline() + .send(ctx.clone().insert(ResourceType::Documents), &mut request) + .await?; + QueryDocumentsResponse::try_from(response).await + } + }; + + Pageable::new(make_request) + } +} + +pub type QueryDocuments = Pageable, crate::Error>; + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct DocumentQueryResult { #[serde(flatten)] @@ -38,14 +160,6 @@ pub struct QueryResponseMeta { pub count: u64, } -impl std::convert::TryFrom> for QueryResponseMeta { - type Error = crate::Error; - - fn try_from(response: Response) -> Result { - Ok(serde_json::from_slice(response.body())?) - } -} - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub enum QueryResult { Document(DocumentQueryResult), @@ -92,17 +206,15 @@ impl QueryDocumentsResponse { } } -impl std::convert::TryFrom> for QueryDocumentsResponse +impl QueryDocumentsResponse where T: DeserializeOwned, { - type Error = crate::Error; + pub async fn try_from(response: HttpResponse) -> crate::Result { + let (_status_code, headers, pinned_stream) = response.deconstruct(); + let body = collect_pinned_stream(pinned_stream).await?; - fn try_from(response: Response) -> Result { - let headers = response.headers(); - let body = response.body(); - - let inner: Value = serde_json::from_slice(body)?; + let inner: Value = serde_json::from_slice(&body)?; let mut results = Vec::new(); if let Value::Array(documents) = &inner["Documents"] { for doc in documents { @@ -131,32 +243,31 @@ where Ok(QueryDocumentsResponse { results, - last_state_change: last_state_change_from_headers(headers)?, - resource_quota: resource_quota_from_headers(headers)?, - resource_usage: resource_usage_from_headers(headers)?, - lsn: lsn_from_headers(headers)?, - item_count: item_count_from_headers(headers)?, - schema_version: schema_version_from_headers(headers)?.to_owned(), - alt_content_path: alt_content_path_from_headers(headers)?.to_owned(), - content_path: content_path_from_headers(headers)?.to_owned(), - quorum_acked_lsn: quorum_acked_lsn_from_headers_optional(headers)?, - current_write_quorum: current_write_quorum_from_headers_optional(headers)?, - current_replica_set_size: current_replica_set_size_from_headers_optional(headers)?, - role: role_from_headers(headers)?, - global_committed_lsn: global_committed_lsn_from_headers(headers)?, - number_of_read_regions: number_of_read_regions_from_headers(headers)?, - transport_request_id: transport_request_id_from_headers(headers)?, - cosmos_llsn: cosmos_llsn_from_headers(headers)?, - cosmos_quorum_acked_llsn: cosmos_quorum_acked_llsn_from_headers_optional(headers)?, - session_token: session_token_from_headers(headers)?, - charge: request_charge_from_headers(headers)?, - service_version: service_version_from_headers(headers)?.to_owned(), - activity_id: activity_id_from_headers(headers)?, - gateway_version: gateway_version_from_headers(headers)?.to_owned(), - continuation_token: continuation_token_from_headers_optional(headers)?, - date: date_from_headers(headers)?, - - query_response_meta: response.try_into()?, + last_state_change: last_state_change_from_headers(&headers)?, + resource_quota: resource_quota_from_headers(&headers)?, + resource_usage: resource_usage_from_headers(&headers)?, + lsn: lsn_from_headers(&headers)?, + item_count: item_count_from_headers(&headers)?, + schema_version: schema_version_from_headers(&headers)?.to_owned(), + alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(), + content_path: content_path_from_headers(&headers)?.to_owned(), + quorum_acked_lsn: quorum_acked_lsn_from_headers_optional(&headers)?, + current_write_quorum: current_write_quorum_from_headers_optional(&headers)?, + current_replica_set_size: current_replica_set_size_from_headers_optional(&headers)?, + role: role_from_headers(&headers)?, + global_committed_lsn: global_committed_lsn_from_headers(&headers)?, + number_of_read_regions: number_of_read_regions_from_headers(&headers)?, + transport_request_id: transport_request_id_from_headers(&headers)?, + cosmos_llsn: cosmos_llsn_from_headers(&headers)?, + cosmos_quorum_acked_llsn: cosmos_quorum_acked_llsn_from_headers_optional(&headers)?, + session_token: session_token_from_headers(&headers)?, + charge: request_charge_from_headers(&headers)?, + service_version: service_version_from_headers(&headers)?.to_owned(), + activity_id: activity_id_from_headers(&headers)?, + gateway_version: gateway_version_from_headers(&headers)?.to_owned(), + continuation_token: continuation_token_from_headers_optional(&headers)?, + date: date_from_headers(&headers)?, + query_response_meta: serde_json::from_slice(&body)?, }) } } @@ -310,3 +421,8 @@ impl std::convert::TryFrom> for QueryDocumentsRespo }) } } +impl Continuable for QueryDocumentsResponse { + fn continuation(&self) -> Option { + self.continuation_token.clone() + } +} diff --git a/sdk/data_cosmos/src/requests/list_stored_procedures_builder.rs b/sdk/data_cosmos/src/requests/list_stored_procedures_builder.rs deleted file mode 100644 index 475ea7adce..0000000000 --- a/sdk/data_cosmos/src/requests/list_stored_procedures_builder.rs +++ /dev/null @@ -1,109 +0,0 @@ -use crate::prelude::*; -use crate::resources::ResourceType; -use crate::responses::ListStoredProceduresResponse; -use azure_core::prelude::*; -use futures::stream::{unfold, Stream}; -use http::StatusCode; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ListStoredProceduresBuilder<'a, 'b> { - collection_client: &'a CollectionClient, - user_agent: Option>, - activity_id: Option>, - consistency_level: Option, - continuation: Option>, - max_item_count: MaxItemCount, -} - -impl<'a, 'b> ListStoredProceduresBuilder<'a, 'b> { - pub(crate) fn new(collection_client: &'a CollectionClient) -> Self { - Self { - collection_client, - user_agent: None, - activity_id: None, - consistency_level: None, - continuation: None, - max_item_count: MaxItemCount::new(-1), - } - } - - setters! { - user_agent: &'b str => Some(UserAgent::new(user_agent)), - activity_id: &'b str => Some(ActivityId::new(activity_id)), - consistency_level: ConsistencyLevel => Some(consistency_level), - continuation: &'b str => Some(Continuation::new(continuation)), - max_item_count: i32 => MaxItemCount::new(max_item_count), - } - - pub async fn execute(&self) -> crate::Result { - trace!("ListStoredProceduresBuilder::execute called"); - - let request = self.collection_client.cosmos_client().prepare_request( - &format!( - "dbs/{}/colls/{}/sprocs", - self.collection_client.database_client().database_name(), - self.collection_client.collection_name(), - ), - http::Method::GET, - ResourceType::StoredProcedures, - ); - - // add trait headers - let request = azure_core::headers::add_optional_header(&self.user_agent, request); - let request = azure_core::headers::add_optional_header(&self.activity_id, request); - let request = azure_core::headers::add_optional_header(&self.consistency_level, request); - let request = azure_core::headers::add_optional_header(&self.continuation, request); - let request = azure_core::headers::add_mandatory_header(&self.max_item_count, request); - - let request = request.body(azure_core::EMPTY_BODY)?; - - Ok(self - .collection_client - .http_client() - .execute_request_check_status(request, StatusCode::OK) - .await? - .try_into()?) - } - - pub fn stream(&self) -> impl Stream> + '_ { - #[derive(Debug, Clone, PartialEq)] - enum States { - Init, - Continuation(String), - } - - unfold( - Some(States::Init), - move |continuation_token: Option| { - async move { - debug!("continuation_token == {:?}", &continuation_token); - let response = match continuation_token { - Some(States::Init) => self.execute().await, - Some(States::Continuation(continuation_token)) => { - self.clone() - .continuation(continuation_token.as_str()) - .execute() - .await - } - None => return None, - }; - - // the ? operator does not work in async move (yet?) - // so we have to resort to this boilerplate - let response = match response { - Ok(response) => response, - Err(err) => return Some((Err(err), None)), - }; - - let continuation_token = response - .continuation_token - .as_ref() - .map(|ct| States::Continuation(ct.to_owned())); - - Some((Ok(response), continuation_token)) - } - }, - ) - } -} diff --git a/sdk/data_cosmos/src/requests/list_user_defined_functions_builder.rs b/sdk/data_cosmos/src/requests/list_user_defined_functions_builder.rs deleted file mode 100644 index 2dd1c8e9a1..0000000000 --- a/sdk/data_cosmos/src/requests/list_user_defined_functions_builder.rs +++ /dev/null @@ -1,115 +0,0 @@ -use crate::prelude::*; -use crate::resources::ResourceType; -use crate::responses::ListUserDefinedFunctionsResponse; -use azure_core::prelude::*; -use futures::stream::{unfold, Stream}; -use http::StatusCode; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct ListUserDefinedFunctionsBuilder<'a, 'b> { - collection_client: &'a CollectionClient, - if_match_condition: Option, - user_agent: Option>, - activity_id: Option>, - consistency_level: Option, - continuation: Option>, - max_item_count: MaxItemCount, -} - -impl<'a, 'b> ListUserDefinedFunctionsBuilder<'a, 'b> { - pub(crate) fn new(collection_client: &'a CollectionClient) -> Self { - Self { - collection_client, - if_match_condition: None, - user_agent: None, - activity_id: None, - consistency_level: None, - continuation: None, - max_item_count: MaxItemCount::new(-1), - } - } - - setters! { - user_agent: &'b str => Some(UserAgent::new(user_agent)), - activity_id: &'b str => Some(ActivityId::new(activity_id)), - consistency_level: ConsistencyLevel => Some(consistency_level), - continuation: &'b str => Some(Continuation::new(continuation)), - max_item_count: i32 => MaxItemCount::new(max_item_count), - if_match_condition: IfMatchCondition => Some(if_match_condition), - } - - pub async fn execute(&self) -> crate::Result { - trace!("ListUserDefinedFunctionsBuilder::execute called"); - - let request = self.collection_client.cosmos_client().prepare_request( - &format!( - "dbs/{}/colls/{}/udfs", - self.collection_client.database_client().database_name(), - self.collection_client.collection_name() - ), - http::Method::GET, - ResourceType::UserDefinedFunctions, - ); - - // add trait headers - let request = azure_core::headers::add_optional_header(&self.if_match_condition, request); - let request = azure_core::headers::add_optional_header(&self.user_agent, request); - let request = azure_core::headers::add_optional_header(&self.activity_id, request); - let request = azure_core::headers::add_optional_header(&self.consistency_level, request); - let request = azure_core::headers::add_optional_header(&self.continuation, request); - let request = azure_core::headers::add_mandatory_header(&self.max_item_count, request); - - let request = request.body(azure_core::EMPTY_BODY)?; - - Ok(self - .collection_client - .http_client() - .execute_request_check_status(request, StatusCode::OK) - .await? - .try_into()?) - } - - pub fn stream( - &self, - ) -> impl Stream> + '_ { - #[derive(Debug, Clone, PartialEq)] - enum States { - Init, - Continuation(String), - } - - unfold( - Some(States::Init), - move |continuation_token: Option| { - async move { - debug!("continuation_token == {:?}", &continuation_token); - let response = match continuation_token { - Some(States::Init) => self.execute().await, - Some(States::Continuation(continuation_token)) => { - self.clone() - .continuation(continuation_token.as_str()) - .execute() - .await - } - None => return None, - }; - - // the ? operator does not work in async move (yet?) - // so we have to resort to this boilerplate - let response = match response { - Ok(response) => response, - Err(err) => return Some((Err(err), None)), - }; - - let continuation_token = response - .continuation_token - .as_ref() - .map(|ct| States::Continuation(ct.to_owned())); - - Some((Ok(response), continuation_token)) - } - }, - ) - } -} diff --git a/sdk/data_cosmos/src/requests/mod.rs b/sdk/data_cosmos/src/requests/mod.rs index 34d54f14aa..5332c5ac54 100644 --- a/sdk/data_cosmos/src/requests/mod.rs +++ b/sdk/data_cosmos/src/requests/mod.rs @@ -16,10 +16,7 @@ mod delete_user_defined_function_builder; mod execute_stored_procedure_builder; mod get_attachment_builder; mod get_partition_key_ranges_builder; -mod list_stored_procedures_builder; mod list_triggers_builder; -mod list_user_defined_functions_builder; -mod query_documents_builder; mod replace_reference_attachment_builder; mod replace_slug_attachment_builder; @@ -33,9 +30,6 @@ pub use delete_user_defined_function_builder::DeleteUserDefinedFunctionBuilder; pub use execute_stored_procedure_builder::ExecuteStoredProcedureBuilder; pub use get_attachment_builder::GetAttachmentBuilder; pub use get_partition_key_ranges_builder::GetPartitionKeyRangesBuilder; -pub use list_stored_procedures_builder::ListStoredProceduresBuilder; pub use list_triggers_builder::ListTriggersBuilder; -pub use list_user_defined_functions_builder::ListUserDefinedFunctionsBuilder; -pub use query_documents_builder::QueryDocumentsBuilder; pub use replace_reference_attachment_builder::ReplaceReferenceAttachmentBuilder; pub use replace_slug_attachment_builder::ReplaceSlugAttachmentBuilder; diff --git a/sdk/data_cosmos/src/requests/query_documents_builder.rs b/sdk/data_cosmos/src/requests/query_documents_builder.rs deleted file mode 100644 index 9b041d4c23..0000000000 --- a/sdk/data_cosmos/src/requests/query_documents_builder.rs +++ /dev/null @@ -1,164 +0,0 @@ -use crate::prelude::*; -use crate::resources::document::Query; -use crate::resources::ResourceType; -use crate::responses::QueryDocumentsResponse; -use azure_core::prelude::*; -use chrono::{DateTime, Utc}; -use futures::stream::{unfold, Stream}; -use http::StatusCode; -use serde::de::DeserializeOwned; -use std::convert::TryInto; - -#[derive(Debug, Clone)] -pub struct QueryDocumentsBuilder<'a, 'b> { - collection_client: &'a CollectionClient, - if_match_condition: Option, - if_modified_since: Option, - user_agent: Option>, - activity_id: Option>, - consistency_level: Option, - continuation: Option>, - max_item_count: MaxItemCount, - partition_key_serialized: Option, - query_cross_partition: QueryCrossPartition, - #[allow(unused)] - parallelize_cross_partition_query: ParallelizeCrossPartition, -} - -impl<'a, 'b> QueryDocumentsBuilder<'a, 'b> { - pub(crate) fn new(collection_client: &'a CollectionClient) -> Self { - Self { - collection_client, - if_match_condition: None, - if_modified_since: None, - user_agent: None, - activity_id: None, - consistency_level: None, - continuation: None, - max_item_count: MaxItemCount::new(-1), - partition_key_serialized: None, - query_cross_partition: QueryCrossPartition::No, - // TODO: use this in request - parallelize_cross_partition_query: ParallelizeCrossPartition::No, - } - } -} - -impl<'a, 'b> QueryDocumentsBuilder<'a, 'b> { - setters! { - user_agent: &'b str => Some(UserAgent::new(user_agent)), - activity_id: &'b str => Some(ActivityId::new(activity_id)), - consistency_level: ConsistencyLevel => Some(consistency_level), - if_match_condition: IfMatchCondition => Some(if_match_condition), - continuation: &'b str => Some(Continuation::new(continuation)), - max_item_count: i32 => MaxItemCount::new(max_item_count), - if_modified_since: DateTime => Some(IfModifiedSince::new(if_modified_since)), - query_cross_partition: bool => if query_cross_partition { QueryCrossPartition::Yes } else { QueryCrossPartition::No }, - parallelize_cross_partition_query: bool => if parallelize_cross_partition_query { ParallelizeCrossPartition::Yes } else { ParallelizeCrossPartition::No }, - } - - pub fn partition_key(self, pk: &PK) -> Result { - Ok(Self { - partition_key_serialized: Some(crate::cosmos_entity::serialize_partition_key(pk)?), - ..self - }) - } - - pub async fn execute(&self, query: Q) -> crate::Result> - where - T: DeserializeOwned, - Q: Into>, - { - trace!("QueryDocumentBuilder::execute called"); - - let req = self.collection_client.cosmos_client().prepare_request( - &format!( - "dbs/{}/colls/{}/docs", - self.collection_client.database_client().database_name(), - self.collection_client.collection_name() - ), - http::Method::POST, - ResourceType::Documents, - ); - - let req = if let Some(partition_key_serialized) = self.partition_key_serialized.as_ref() { - crate::cosmos_entity::add_as_partition_key_header_serialized( - partition_key_serialized, - req, - ) - } else { - req - }; - - // signal that this is a query - let req = req.header(crate::headers::HEADER_DOCUMENTDB_ISQUERY, true.to_string()); - let req = req.header(http::header::CONTENT_TYPE, "application/query+json"); - - // add trait headers - let req = azure_core::headers::add_optional_header(&self.if_match_condition, req); - let req = azure_core::headers::add_optional_header(&self.if_modified_since, req); - let req = azure_core::headers::add_optional_header(&self.user_agent, req); - let req = azure_core::headers::add_optional_header(&self.activity_id, req); - let req = azure_core::headers::add_optional_header(&self.consistency_level, req); - let req = azure_core::headers::add_optional_header(&self.continuation, req); - let req = azure_core::headers::add_mandatory_header(&self.max_item_count, req); - let req = azure_core::headers::add_mandatory_header(&self.query_cross_partition, req); - - let body = azure_core::to_json(&query.into())?; - debug!("body == {:?}", body); - - let req = req.body(body)?; - debug!("{:?}", req); - - Ok(self - .collection_client - .http_client() - .execute_request_check_status(req, StatusCode::OK) - .await? - .try_into()?) - } - - pub fn stream( - &'a self, - query: Q, - ) -> impl Stream>> + 'a - where - T: DeserializeOwned, - Q: Into> + 'a + Copy, - { - #[derive(Debug, Clone, PartialEq)] - enum States { - Init, - Continuation(String), - } - - unfold( - Some(States::Init), - move |continuation_token: Option| async move { - debug!("continuation_token == {:?}", &continuation_token); - let response = match continuation_token { - Some(States::Init) => self.execute(query).await, - Some(States::Continuation(continuation_token)) => { - self.clone() - .continuation(continuation_token.as_str()) - .execute(query) - .await - } - None => return None, - }; - - let response = match response { - Ok(response) => response, - Err(err) => return Some((Err(err), None)), - }; - - let continuation_token = response - .continuation_token - .as_ref() - .map(|ct| States::Continuation(ct.to_owned())); - - Some((Ok(response), continuation_token)) - }, - ) - } -} diff --git a/sdk/data_cosmos/src/resources/document/query.rs b/sdk/data_cosmos/src/resources/document/query.rs index 1c650a41cb..05531b0aab 100644 --- a/sdk/data_cosmos/src/resources/document/query.rs +++ b/sdk/data_cosmos/src/resources/document/query.rs @@ -4,19 +4,19 @@ use serde_json::Value; /// /// You can learn more about how SQL queries work in Cosmos [here](https://docs.microsoft.com/azure/cosmos-db/sql-query-getting-started). #[derive(Debug, Serialize, Clone)] -pub struct Query<'a> { - query: &'a str, - parameters: Vec>, +pub struct Query { + query: String, + parameters: Vec, } -impl<'a> Query<'a> { +impl Query { /// A new SQL query with no parameters - pub fn new(query: &'a str) -> Self { + pub fn new(query: String) -> Self { Self::with_params(query, vec![]) } /// A new SQL query with the supplied parameters - pub fn with_params>>>(query: &'a str, params: T) -> Self { + pub fn with_params>>(query: String, params: T) -> Self { Self { query, parameters: params.into(), @@ -24,44 +24,32 @@ impl<'a> Query<'a> { } /// The query as a `&str` - pub fn query(&self) -> &'a str { - self.query + pub fn query(&self) -> &str { + &self.query } /// The supplied params - pub fn params(&self) -> &[Param<'a>] { + pub fn params(&self) -> &[Param] { &self.parameters } } -impl<'a> From<&'a str> for Query<'a> { - fn from(query: &'a str) -> Query<'a> { - Query::new(query) - } -} - -impl<'a> From<&'a Query<'a>> for Query<'a> { - fn from(query: &'a Query<'a>) -> Query<'a> { - query.clone() - } -} - -impl<'a> From<&'a String> for Query<'a> { - fn from(query: &'a String) -> Query<'a> { - query.as_str().into() +impl From<&'static str> for Query { + fn from(query: &'static str) -> Query { + Query::new(query.into()) } } /// A SQL query parameter #[derive(Debug, Serialize, Clone)] -pub struct Param<'a> { - name: &'a str, +pub struct Param { + name: String, value: Value, } -impl<'a> Param<'a> { +impl Param { /// Create a new `Param` with the supplied name and the JSON value - pub fn new>(name: &'a str, value: T) -> Self { + pub fn new>(name: String, value: T) -> Self { Self { name, value: value.into(), @@ -69,8 +57,8 @@ impl<'a> Param<'a> { } /// The param name - pub fn name(&self) -> &'a str { - self.name + pub fn name(&self) -> &str { + &self.name } /// The param value @@ -88,11 +76,11 @@ mod tests { fn tst_query() { let v3 = Value::from(vec![1, 2, 3]); let query = Query::with_params( - "SELECT * FROM t", + "SELECT * FROM t".into(), vec![ - Param::new("p1", "string"), - Param::new("p2", 100u64), - Param::new("p3", v3), + Param::new("p1".into(), "string"), + Param::new("p2".into(), 100u64), + Param::new("p3".into(), v3), ], ); diff --git a/sdk/data_cosmos/src/responses/list_stored_procedures_response.rs b/sdk/data_cosmos/src/responses/list_stored_procedures_response.rs deleted file mode 100644 index 58a51c4b3a..0000000000 --- a/sdk/data_cosmos/src/responses/list_stored_procedures_response.rs +++ /dev/null @@ -1,48 +0,0 @@ -use crate::headers::from_headers::*; -use crate::resources::StoredProcedure; -use crate::ResourceQuota; -use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers}; -use chrono::{DateTime, Utc}; -use http::response::Response; - -#[derive(Debug, Clone, PartialEq)] -pub struct ListStoredProceduresResponse { - pub stored_procedures: Vec, - pub charge: f64, - pub activity_id: uuid::Uuid, - pub session_token: String, - pub last_change: DateTime, - pub resource_quota: Vec, - pub resource_usage: Vec, - pub gateway_version: String, - pub continuation_token: Option, -} - -impl std::convert::TryFrom> for ListStoredProceduresResponse { - type Error = crate::Error; - - fn try_from(response: Response) -> Result { - let headers = response.headers(); - let body = response.body(); - - #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] - struct Response { - pub _rid: String, - #[serde(rename = "StoredProcedures")] - pub stored_procedures: Vec, - pub _count: u64, - } - - Ok(Self { - stored_procedures: serde_json::from_slice::(body)?.stored_procedures, - charge: request_charge_from_headers(headers)?, - activity_id: activity_id_from_headers(headers)?, - session_token: session_token_from_headers(headers)?, - last_change: last_state_change_from_headers(headers)?, - resource_quota: resource_quota_from_headers(headers)?, - resource_usage: resource_usage_from_headers(headers)?, - gateway_version: gateway_version_from_headers(headers)?.to_owned(), - continuation_token: continuation_token_from_headers_optional(headers)?, - }) - } -} diff --git a/sdk/data_cosmos/src/responses/list_user_defined_functions_response.rs b/sdk/data_cosmos/src/responses/list_user_defined_functions_response.rs deleted file mode 100644 index fbe5c594ed..0000000000 --- a/sdk/data_cosmos/src/responses/list_user_defined_functions_response.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::headers::from_headers::*; -use crate::resources::UserDefinedFunction; -use crate::ResourceQuota; -use azure_core::headers::{ - continuation_token_from_headers_optional, item_count_from_headers, session_token_from_headers, -}; -use chrono::{DateTime, Utc}; -use http::response::Response; - -#[derive(Debug, Clone, PartialEq)] -pub struct ListUserDefinedFunctionsResponse { - pub rid: String, - pub user_defined_functions: Vec, - pub content_location: String, - pub server: String, - pub last_state_change: DateTime, - pub continuation_token: Option, - pub resource_quota: Vec, - pub resource_usage: Vec, - pub lsn: u64, - pub item_count: u32, - pub schema_version: String, - pub alt_content_path: String, - pub content_path: String, - pub role: u32, - pub global_committed_lsn: u64, - pub number_of_read_regions: u32, - pub transport_request_id: u64, - pub cosmos_llsn: u64, - pub session_token: String, - pub charge: f64, - pub service_version: String, - pub activity_id: uuid::Uuid, - pub gateway_version: String, - pub date: DateTime, -} - -impl std::convert::TryFrom> for ListUserDefinedFunctionsResponse { - type Error = crate::Error; - - fn try_from(response: Response) -> Result { - let headers = response.headers(); - let body = response.body(); - - #[derive(Debug, Deserialize)] - struct Response<'a> { - #[serde(rename = "_rid")] - rid: &'a str, - #[serde(rename = "UserDefinedFunctions")] - user_defined_functions: Vec, - #[serde(rename = "_count")] - #[allow(unused)] - count: u32, - } - let response: Response = serde_json::from_slice(body)?; - - Ok(Self { - rid: response.rid.to_owned(), - user_defined_functions: response.user_defined_functions, - content_location: content_location_from_headers(headers)?.to_owned(), - server: server_from_headers(headers)?.to_owned(), - last_state_change: last_state_change_from_headers(headers)?, - continuation_token: continuation_token_from_headers_optional(headers)?, - resource_quota: resource_quota_from_headers(headers)?, - resource_usage: resource_usage_from_headers(headers)?, - lsn: lsn_from_headers(headers)?, - item_count: item_count_from_headers(headers)?, - schema_version: schema_version_from_headers(headers)?.to_owned(), - alt_content_path: alt_content_path_from_headers(headers)?.to_owned(), - content_path: content_path_from_headers(headers)?.to_owned(), - role: role_from_headers(headers)?, - global_committed_lsn: global_committed_lsn_from_headers(headers)?, - number_of_read_regions: number_of_read_regions_from_headers(headers)?, - transport_request_id: transport_request_id_from_headers(headers)?, - cosmos_llsn: cosmos_llsn_from_headers(headers)?, - session_token: session_token_from_headers(headers)?, - charge: request_charge_from_headers(headers)?, - service_version: service_version_from_headers(headers)?.to_owned(), - activity_id: activity_id_from_headers(headers)?, - gateway_version: gateway_version_from_headers(headers)?.to_owned(), - date: date_from_headers(headers)?, - }) - } -} diff --git a/sdk/data_cosmos/src/responses/mod.rs b/sdk/data_cosmos/src/responses/mod.rs index 66a2de9484..aac6522e4d 100644 --- a/sdk/data_cosmos/src/responses/mod.rs +++ b/sdk/data_cosmos/src/responses/mod.rs @@ -13,10 +13,7 @@ mod delete_user_defined_function_response; mod execute_stored_procedure_response; mod get_attachment_response; mod get_partition_key_ranges_response; -mod list_stored_procedures_response; mod list_triggers_response; -mod list_user_defined_functions_response; -mod query_documents_response; mod replace_reference_attachment_response; pub use create_collection_response::CreateCollectionResponse; @@ -30,11 +27,5 @@ pub use delete_user_defined_function_response::DeleteUserDefinedFunctionResponse pub use execute_stored_procedure_response::ExecuteStoredProcedureResponse; pub use get_attachment_response::GetAttachmentResponse; pub use get_partition_key_ranges_response::GetPartitionKeyRangesResponse; -pub use list_stored_procedures_response::ListStoredProceduresResponse; pub use list_triggers_response::ListTriggersResponse; -pub use list_user_defined_functions_response::ListUserDefinedFunctionsResponse; -pub use query_documents_response::{ - QueryDocumentsResponse, QueryDocumentsResponseDocuments, QueryDocumentsResponseRaw, - QueryResponseMeta, QueryResult, -}; pub use replace_reference_attachment_response::ReplaceReferenceAttachmentResponse; diff --git a/sdk/data_cosmos/tests/cosmos_document.rs b/sdk/data_cosmos/tests/cosmos_document.rs index 534f2dcc36..5251f90aff 100644 --- a/sdk/data_cosmos/tests/cosmos_document.rs +++ b/sdk/data_cosmos/tests/cosmos_document.rs @@ -179,11 +179,13 @@ async fn query_documents() { // now query all documents and see if we get the correct result let query_result = collection_client - .query_documents() + .query_documents("SELECT * FROM c") .query_cross_partition(true) - .execute::("SELECT * FROM c") + .into_stream::() + .next() .await .unwrap() + .unwrap() .into_documents() .unwrap() .results; diff --git a/sdk/data_cosmos/tests/user_defined_function00.rs b/sdk/data_cosmos/tests/user_defined_function00.rs index 97ec67bda9..d4aa517887 100644 --- a/sdk/data_cosmos/tests/user_defined_function00.rs +++ b/sdk/data_cosmos/tests/user_defined_function00.rs @@ -1,6 +1,5 @@ #![cfg(all(test, feature = "test_e2e"))] use azure_data_cosmos::prelude::*; -use azure_data_cosmos::responses::QueryDocumentsResponseRaw; use futures::stream::StreamExt; mod setup; @@ -57,7 +56,7 @@ async fn user_defined_function00() -> Result<(), azure_data_cosmos::Error> { .list_user_defined_functions() .max_item_count(3) .consistency_level(&ret); - let mut stream = Box::pin(stream.stream()); + let mut stream = stream.into_stream(); while let Some(ret) = stream.next().await { let ret = ret.unwrap(); assert_eq!(ret.item_count, 1); @@ -71,11 +70,13 @@ async fn user_defined_function00() -> Result<(), azure_data_cosmos::Error> { let query_stmt = format!("SELECT udf.{}(100)", USER_DEFINED_FUNCTION_NAME); let ret: QueryDocumentsResponseRaw = collection_client - .query_documents() + .query_documents(Query::new(query_stmt)) .consistency_level(&ret) .max_item_count(2i32) - .execute(&query_stmt) - .await? + .into_stream() + .next() + .await + .unwrap()? .into_raw(); assert_eq!(ret.item_count, 1); @@ -86,11 +87,13 @@ async fn user_defined_function00() -> Result<(), azure_data_cosmos::Error> { let query_stmt = format!("SELECT udf.{}(10000)", USER_DEFINED_FUNCTION_NAME); let ret: QueryDocumentsResponseRaw = collection_client - .query_documents() + .query_documents(Query::new(query_stmt)) .consistency_level(&ret) .max_item_count(2i32) - .execute(&query_stmt) - .await? + .into_stream() + .next() + .await + .unwrap()? .into_raw(); assert_eq!(ret.item_count, 1);