Skip to content

into_future more #671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sdk/data_cosmos/examples/document_entries_01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("list_documents_response == {:#?}", list_documents_response);

let query_documents_response = client
.query_documents()
.query_documents("SELECT * FROM c WHERE c.a_number = 600")
.consistency_level(&list_documents_response)
.query_cross_partition(true)
.execute::<serde_json::Value, _>("SELECT * FROM c WHERE c.a_number = 600")
.await?;
.into_stream::<serde_json::Value>()
.next()
.await
.unwrap()?;
println!(
"query_documents_response == {:#?}",
query_documents_response
Expand Down
20 changes: 12 additions & 8 deletions sdk/data_cosmos/examples/query_document_00.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use azure_data_cosmos::prelude::*;
use azure_data_cosmos::responses::QueryDocumentsResponse;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::error::Error;

Expand Down Expand Up @@ -46,23 +46,27 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let client = client.into_database_client(database_name);
let client = client.into_collection_client(collection_name);

let query_obj = Query::new(&query);
let query_obj = Query::new(query);

let respo: QueryDocumentsResponse<serde_json::Value> = client
.query_documents()
.query_documents(query_obj.clone())
.query_cross_partition(true)
.max_item_count(3i32)
.execute(&query_obj)
.await?;
.into_stream()
.next()
.await
.unwrap()?;
println!("as json == {:?}", respo);

let respo: QueryDocumentsResponse<MySecondSampleStructOwned> = client
.query_documents()
.query_documents(query_obj)
.query_cross_partition(true)
.parallelize_cross_partition_query(true)
.max_item_count(2)
.execute(&query_obj)
.await?;
.into_stream()
.next()
.await
.unwrap()?;
println!("as items == {:?}", respo);

//let ret = client
Expand Down
8 changes: 5 additions & 3 deletions sdk/data_cosmos/examples/readme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// TASK 3
println!("\nQuerying documents");
let query_documents_response = collection_client
.query_documents()
.query_documents("SELECT * FROM A WHERE A.a_number < 600")
.query_cross_partition(true) // this will perform a cross partition query! notice how simple it is!
.consistency_level(session_token)
.execute::<MySampleStruct, _>("SELECT * FROM A WHERE A.a_number < 600") // there are other ways to construct a query, this is the simplest.
.await?
.into_stream::<MySampleStruct>() // there are other ways to construct a query, this is the simplest.
.next()
.await
.unwrap()?
.into_documents() // queries can return Documents or Raw json (ie without etag, _rid, etc...). Since our query return docs we convert with this function.
.unwrap(); // we know in advance that the conversion to Document will not fail since we SELECT'ed * FROM table

Expand Down
9 changes: 7 additions & 2 deletions sdk/data_cosmos/examples/stored_proc_01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/// response.setBody("Hello, " + personToGreet);
/// }
use azure_data_cosmos::prelude::*;
use futures::StreamExt;
use std::error::Error;

#[tokio::main]
Expand Down Expand Up @@ -46,8 +47,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.clone()
.into_stored_procedure_client(stored_procedure_name);

let list_stored_procedures_response =
collection_client.list_stored_procedures().execute().await?;
let list_stored_procedures_response = collection_client
.list_stored_procedures()
.into_stream()
.next()
.await
.unwrap()?;
println!(
"list_stored_procedures_response == {:#?}",
list_stored_procedures_response
Expand Down
10 changes: 6 additions & 4 deletions sdk/data_cosmos/examples/user_defined_function_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.list_user_defined_functions()
.max_item_count(3)
.consistency_level(&ret);
let mut stream = Box::pin(stream.stream());
let mut stream = stream.into_stream();
while let Some(ret) = stream.next().await {
let ret = ret.unwrap();
println!(
Expand All @@ -68,11 +68,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Replace response object:\n{:#?}", ret);

let ret = collection_client
.query_documents()
.query_documents("SELECT udf.test15(100)")
.consistency_level(&ret)
.max_item_count(2i32)
.execute::<serde_json::Value, _>("SELECT udf.test15(100)")
.await?
.into_stream::<serde_json::Value>()
.next()
.await
.unwrap()?
.into_raw();
println!("Query response object:\n{:#?}", ret);

Expand Down
13 changes: 7 additions & 6 deletions sdk/data_cosmos/src/clients/collection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::clients::*;
use crate::operations::*;
use crate::requests;
use crate::resources::collection::PartitionKey;
use crate::resources::document::Query;
use crate::CosmosEntity;
use crate::ReadonlyString;
use azure_core::{HttpClient, Pipeline, Request};
Expand Down Expand Up @@ -73,18 +74,18 @@ impl CollectionClient {
}

/// query documents in a collection
pub fn query_documents(&self) -> requests::QueryDocumentsBuilder<'_, '_> {
requests::QueryDocumentsBuilder::new(self)
pub fn query_documents<Q: Into<Query>>(&self, query: Q) -> QueryDocumentsBuilder {
QueryDocumentsBuilder::new(self.clone(), query.into())
}

/// list stored procedures in a collection
pub fn list_stored_procedures(&self) -> requests::ListStoredProceduresBuilder<'_, '_> {
requests::ListStoredProceduresBuilder::new(self)
pub fn list_stored_procedures(&self) -> ListStoredProceduresBuilder {
ListStoredProceduresBuilder::new(self.clone())
}

/// list user defined functions in a collection
pub fn list_user_defined_functions(&self) -> requests::ListUserDefinedFunctionsBuilder<'_, '_> {
requests::ListUserDefinedFunctionsBuilder::new(self)
pub fn list_user_defined_functions(&self) -> ListUserDefinedFunctionsBuilder {
ListUserDefinedFunctionsBuilder::new(self.clone())
}

/// list triggers in a collection
Expand Down
2 changes: 1 addition & 1 deletion sdk/data_cosmos/src/clients/document_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl DocumentClient {
GetDocumentBuilder::new(self.clone())
}

/// replace a document in a collection
/// Replace a document in a collection
pub fn replace_document<D: Serialize + Send + 'static>(
&self,
document: D,
Expand Down
122 changes: 122 additions & 0 deletions sdk/data_cosmos/src/operations/list_stored_procedures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::headers::from_headers::*;
use crate::prelude::*;
use crate::resources::ResourceType;
use crate::resources::StoredProcedure;
use crate::ResourceQuota;
use azure_core::collect_pinned_stream;
use azure_core::headers;
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
use azure_core::prelude::*;
use azure_core::{Pageable, Response as HttpResponse};
use chrono::{DateTime, Utc};

#[derive(Debug, Clone)]
pub struct ListStoredProceduresBuilder {
client: CollectionClient,
consistency_level: Option<ConsistencyLevel>,
max_item_count: MaxItemCount,
context: Context,
}

impl ListStoredProceduresBuilder {
pub(crate) fn new(client: CollectionClient) -> Self {
Self {
client,
consistency_level: None,
max_item_count: MaxItemCount::new(-1),
context: Context::new(),
}
}

setters! {
consistency_level: ConsistencyLevel => Some(consistency_level),
max_item_count: i32 => MaxItemCount::new(max_item_count),
context: Context => context,
}

pub fn into_stream(self) -> ListStoredProcedures {
let make_request = move |continuation: Option<String>| {
let this = self.clone();
let ctx = self.context.clone();
async move {
let mut request = this.client.cosmos_client().prepare_request_pipeline(
&format!(
"dbs/{}/colls/{}/sprocs",
this.client.database_client().database_name(),
this.client.collection_name(),
),
http::Method::GET,
);

azure_core::headers::add_optional_header2(&this.consistency_level, &mut request)?;
azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut request)?;

if let Some(c) = continuation {
let h = http::HeaderValue::from_str(c.as_str())
.map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?;
request.headers_mut().append(headers::CONTINUATION, h);
}

let response = this
.client
.pipeline()
.send(
ctx.clone().insert(ResourceType::StoredProcedures),
&mut request,
)
.await?;
ListStoredProceduresResponse::try_from(response).await
}
};

Pageable::new(make_request)
}
}

pub type ListStoredProcedures = Pageable<ListStoredProceduresResponse, crate::Error>;

#[derive(Debug, Clone, PartialEq)]
pub struct ListStoredProceduresResponse {
pub stored_procedures: Vec<StoredProcedure>,
pub charge: f64,
pub activity_id: uuid::Uuid,
pub session_token: String,
pub last_change: DateTime<Utc>,
pub resource_quota: Vec<ResourceQuota>,
pub resource_usage: Vec<ResourceQuota>,
pub gateway_version: String,
pub continuation_token: Option<String>,
}

impl ListStoredProceduresResponse {
pub async fn try_from(response: HttpResponse) -> crate::Result<Self> {
let (_status_code, headers, pinned_stream) = response.deconstruct();
let body = collect_pinned_stream(pinned_stream).await?;

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct Response {
pub _rid: String,
#[serde(rename = "StoredProcedures")]
pub stored_procedures: Vec<StoredProcedure>,
pub _count: u64,
}

Ok(Self {
stored_procedures: serde_json::from_slice::<Response>(&body)?.stored_procedures,
charge: request_charge_from_headers(&headers)?,
activity_id: activity_id_from_headers(&headers)?,
session_token: session_token_from_headers(&headers)?,
last_change: last_state_change_from_headers(&headers)?,
resource_quota: resource_quota_from_headers(&headers)?,
resource_usage: resource_usage_from_headers(&headers)?,
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
continuation_token: continuation_token_from_headers_optional(&headers)?,
})
}
}

impl Continuable for ListStoredProceduresResponse {
fn continuation(&self) -> Option<String> {
self.continuation_token.clone()
}
}
Loading