Skip to content

into_future again #670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdk/data_cosmos/examples/attachments_00.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use azure_data_cosmos::prelude::*;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::error::Error;

Expand Down Expand Up @@ -62,7 +63,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let document_client = client.into_document_client(doc.id.clone(), &doc.id)?;

// list attachments
let ret = document_client.list_attachments().execute().await?;
let ret = document_client
.list_attachments()
.into_stream()
.next()
.await
.unwrap()?;
println!("list attachments == {:#?}", ret);

// reference attachment
Expand Down
6 changes: 4 additions & 2 deletions sdk/data_cosmos/examples/database_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {

let documents = collection_client
.list_documents()
.execute::<Value>()
.await?;
.into_stream::<Value>()
.next()
.await
.unwrap()?;
println!("\ndocuments as json == {:?}", documents);
}
}
Expand Down
6 changes: 4 additions & 2 deletions sdk/data_cosmos/examples/document_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Listing documents...");
let list_documents_response = collection_client
.list_documents()
.execute::<MySampleStruct>()
.await?;
.into_stream::<MySampleStruct>()
.next()
.await
.unwrap()?;
println!(
"list_documents_response contains {} documents",
list_documents_response.documents.len()
Expand Down
20 changes: 6 additions & 14 deletions sdk/data_cosmos/examples/document_entries_00.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
println!("Created 5 documents.");

// Let's get 3 entries at a time.
let response = client
let mut paged = client
.list_documents()
.consistency_level(response.unwrap())
.max_item_count(3i32)
.execute::<MySampleStruct>()
.await?;
.into_stream::<MySampleStruct>();

let response = paged.next().await.unwrap()?;

assert_eq!(response.documents.len(), 3);
println!("response == {:#?}", response);
Expand All @@ -72,16 +73,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// continuation_token must be present
assert!(response.continuation_token.is_some());

let session_token = &response;
let ct = response.continuation_token.clone().unwrap();
println!("ct == {}", ct);

let response = client
.list_documents()
.consistency_level(session_token)
.continuation(ct.as_str())
.execute::<MySampleStruct>()
.await?;
let response = paged.next().await.unwrap()?;

assert_eq!(response.documents.len(), 2);
println!("response == {:#?}", response);
Expand All @@ -101,7 +93,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.list_documents()
.consistency_level(session_token.clone())
.max_item_count(3);
let mut stream = Box::pin(stream.stream::<MySampleStruct>());
let mut stream = stream.into_stream::<MySampleStruct>();
// TODO: As soon as the streaming functionality is completed
// in Rust substitute this while let Some... into
// for each (or whatever the Rust team picks).
Expand Down
7 changes: 5 additions & 2 deletions sdk/data_cosmos/examples/document_entries_01.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use azure_data_cosmos::prelude::*;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::error::Error;

Expand Down Expand Up @@ -82,8 +83,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let list_documents_response = client
.list_documents()
.consistency_level(&get_document_response)
.execute::<serde_json::Value>()
.await?;
.into_stream::<serde_json::Value>()
.next()
.await
.unwrap()?;
println!("list_documents_response == {:#?}", list_documents_response);

let query_documents_response = client
Expand Down
7 changes: 5 additions & 2 deletions sdk/data_cosmos/examples/permission_00.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use azure_data_cosmos::prelude::*;
use futures::StreamExt;
use std::error::Error;

#[tokio::main]
Expand Down Expand Up @@ -92,8 +93,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.consistency_level(ConsistencyLevel::Session(
create_permission2_response.session_token,
))
.execute()
.await?;
.into_stream()
.next()
.await
.unwrap()?;
println!(
"list_permissions_response == {:#?}",
list_permissions_response
Expand Down
8 changes: 5 additions & 3 deletions sdk/data_cosmos/examples/readme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.list_documents()
.consistency_level(session_token.clone())
.max_item_count(3);
let mut stream = Box::pin(stream.stream::<MySampleStruct>());
let mut stream = stream.into_stream::<MySampleStruct>();
// TODO: As soon as the streaming functionality is stabilized
// in Rust we can substitute this while let Some... into
// for each (or whatever the Rust team picks).
Expand Down Expand Up @@ -158,8 +158,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let list_documents_response = collection_client
.list_documents()
.consistency_level(session_token)
.execute::<serde_json::Value>() // you can use this if you don't know/care about the return type!
.await?;
.into_stream::<serde_json::Value>() // you can use this if you don't know/care about the return type!
.next()
.await
.unwrap()?;
assert_eq!(list_documents_response.documents.len(), 4);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion sdk/data_cosmos/examples/remove_all_documents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let mut documents = Vec::new();

let stream = client.list_documents();
let mut stream = Box::pin(stream.stream::<serde_json::Value>());
let mut stream = stream.into_stream::<serde_json::Value>();
while let Some(res) = stream.next().await {
for doc in res?.documents {
documents.push(doc);
Expand Down
2 changes: 1 addition & 1 deletion sdk/data_cosmos/examples/stored_proc_01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {

let delete_stored_procedure_response = stored_procedure_client
.delete_stored_procedure()
.execute()
.into_future()
.await?;
println!(
"delete_stored_procedure_response == {:#?}",
Expand Down
11 changes: 7 additions & 4 deletions sdk/data_cosmos/examples/user_permission_token.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use azure_data_cosmos::prelude::*;
use futures::StreamExt;
use std::error::Error;

#[tokio::main]
Expand Down Expand Up @@ -42,9 +43,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
// test list documents
let list_documents_response = collection_client
.list_documents()
.execute::<serde_json::Value>()
.into_stream::<serde_json::Value>()
.next()
.await
.unwrap();
.unwrap()?;
println!(
"list_documents_response got {} document(s).",
list_documents_response.documents.len()
Expand Down Expand Up @@ -86,9 +88,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.into_database_client(database_name.clone())
.into_collection_client(collection_name.clone())
.list_documents()
.execute::<serde_json::Value>()
.into_stream::<serde_json::Value>()
.next()
.await
.unwrap();
.unwrap()?;
println!(
"second list_documents_response got {} document(s).",
list_documents_response.documents.len()
Expand Down
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/clients/collection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ impl CollectionClient {
}

/// list documents in a collection
pub fn list_documents(&self) -> requests::ListDocumentsBuilder<'_, '_> {
requests::ListDocumentsBuilder::new(self)
pub fn list_documents(&self) -> ListDocumentsBuilder {
ListDocumentsBuilder::new(self.clone())
}

/// create a document in a collection
Expand Down
12 changes: 4 additions & 8 deletions sdk/data_cosmos/src/clients/document_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{AttachmentClient, CollectionClient, CosmosClient, DatabaseClient};
use crate::operations::*;
use crate::{requests, ReadonlyString};
use azure_core::{HttpClient, Request};
use crate::ReadonlyString;
use azure_core::Request;
use serde::Serialize;

/// A client for Cosmos document resources.
Expand Down Expand Up @@ -75,8 +75,8 @@ impl DocumentClient {
}

/// List all attachments for a document
pub fn list_attachments(&self) -> requests::ListAttachmentsBuilder<'_, '_> {
requests::ListAttachmentsBuilder::new(self)
pub fn list_attachments(&self) -> ListAttachmentsBuilder {
ListAttachmentsBuilder::new(self.clone())
}

/// Convert into an [`AttachmentClient`]
Expand All @@ -101,8 +101,4 @@ impl DocumentClient {
method,
)
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.cosmos_client().http_client()
}
}
4 changes: 2 additions & 2 deletions sdk/data_cosmos/src/clients/stored_procedure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ impl StoredProcedureClient {
}

/// Delete the stored procedure
pub fn delete_stored_procedure(&self) -> requests::DeleteStoredProcedureBuilder<'_, '_> {
requests::DeleteStoredProcedureBuilder::new(self)
pub fn delete_stored_procedure(&self) -> DeleteStoredProcedureBuilder {
DeleteStoredProcedureBuilder::new(self.clone())
}

pub(crate) fn prepare_request_with_stored_procedure_name(
Expand Down
12 changes: 4 additions & 8 deletions sdk/data_cosmos/src/clients/user_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::prelude::*;
use crate::{requests, ReadonlyString};
use azure_core::{HttpClient, Pipeline, Request};
use crate::ReadonlyString;
use azure_core::{Pipeline, Request};

/// A client for Cosmos user resources.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -57,8 +57,8 @@ impl UserClient {
}

/// List the user's permissions
pub fn list_permissions(&self) -> requests::ListPermissionsBuilder<'_, '_> {
requests::ListPermissionsBuilder::new(self)
pub fn list_permissions(&self) -> ListPermissionsBuilder {
ListPermissionsBuilder::new(self.clone())
}

/// Convert into a [`PermissionClient`]
Expand All @@ -80,10 +80,6 @@ impl UserClient {
)
}

pub(crate) fn http_client(&self) -> &dyn HttpClient {
self.cosmos_client().http_client()
}

pub(crate) fn pipeline(&self) -> &Pipeline {
self.cosmos_client().pipeline()
}
Expand Down
87 changes: 87 additions & 0 deletions sdk/data_cosmos/src/operations/delete_stored_procedure.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use crate::headers::from_headers::*;
use crate::prelude::*;
use crate::ResourceQuota;
use azure_core::headers::session_token_from_headers;
use azure_core::prelude::*;
use azure_core::Response as HttpResponse;
use chrono::{DateTime, Utc};

#[derive(Debug, Clone)]
pub struct DeleteStoredProcedureBuilder {
client: StoredProcedureClient,
consistency_level: Option<ConsistencyLevel>,
context: Context,
}

impl DeleteStoredProcedureBuilder {
pub(crate) fn new(client: StoredProcedureClient) -> Self {
Self {
client,
consistency_level: None,
context: Context::new(),
}
}

setters! {
consistency_level: ConsistencyLevel => Some(consistency_level),
}

pub fn into_future(self) -> DeleteStoredProcedure {
Box::pin(async move {
let mut request = self
.client
.prepare_pipeline_with_stored_procedure_name(http::Method::DELETE);

azure_core::headers::add_optional_header2(&self.consistency_level, &mut request)?;

let response = self
.client
.pipeline()
.send(
self.context.clone().insert(ResourceType::Permissions),
&mut request,
)
.await?;

DeleteStoredProcedureResponse::try_from(response).await
})
}
}

/// The future returned by calling `into_future` on the builder.
pub type DeleteStoredProcedure =
futures::future::BoxFuture<'static, crate::Result<DeleteStoredProcedureResponse>>;

#[cfg(feature = "into_future")]
impl std::future::IntoFuture for DeleteStoredProcedureBuilder {
type Future = DeleteStoredProcedure;
type Output = <DeleteStoredProcedure as std::future::Future>::Output;
fn into_future(self) -> Self::Future {
Self::into_future(self)
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct DeleteStoredProcedureResponse {
pub charge: f64,
pub activity_id: uuid::Uuid,
pub session_token: String,
pub last_change: DateTime<Utc>,
pub resource_quota: Vec<ResourceQuota>,
pub resource_usage: Vec<ResourceQuota>,
}

impl DeleteStoredProcedureResponse {
pub async fn try_from(response: HttpResponse) -> crate::Result<Self> {
let headers = response.headers();

Ok(Self {
charge: request_charge_from_headers(headers)?,
activity_id: activity_id_from_headers(headers)?,
session_token: session_token_from_headers(headers)?,
last_change: last_state_change_from_headers(headers)?,
resource_quota: resource_quota_from_headers(headers)?,
resource_usage: resource_usage_from_headers(headers)?,
})
}
}
Loading