Skip to content

Commit 11fc4bb

Browse files
authored
into_future more (#671)
* QueryDocument::into_stream * ListStoredProcedures::into_stream * ListUserDefinedFunctions::into_stream * fmt * Remove empty file
1 parent 15de10c commit 11fc4bb

21 files changed

+526
-648
lines changed

sdk/data_cosmos/examples/document_entries_01.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
9090
println!("list_documents_response == {:#?}", list_documents_response);
9191

9292
let query_documents_response = client
93-
.query_documents()
93+
.query_documents("SELECT * FROM c WHERE c.a_number = 600")
9494
.consistency_level(&list_documents_response)
9595
.query_cross_partition(true)
96-
.execute::<serde_json::Value, _>("SELECT * FROM c WHERE c.a_number = 600")
97-
.await?;
96+
.into_stream::<serde_json::Value>()
97+
.next()
98+
.await
99+
.unwrap()?;
98100
println!(
99101
"query_documents_response == {:#?}",
100102
query_documents_response

sdk/data_cosmos/examples/query_document_00.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use azure_data_cosmos::prelude::*;
2-
use azure_data_cosmos::responses::QueryDocumentsResponse;
2+
use futures::StreamExt;
33
use serde::{Deserialize, Serialize};
44
use std::error::Error;
55

@@ -46,23 +46,27 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4646
let client = client.into_database_client(database_name);
4747
let client = client.into_collection_client(collection_name);
4848

49-
let query_obj = Query::new(&query);
49+
let query_obj = Query::new(query);
5050

5151
let respo: QueryDocumentsResponse<serde_json::Value> = client
52-
.query_documents()
52+
.query_documents(query_obj.clone())
5353
.query_cross_partition(true)
5454
.max_item_count(3i32)
55-
.execute(&query_obj)
56-
.await?;
55+
.into_stream()
56+
.next()
57+
.await
58+
.unwrap()?;
5759
println!("as json == {:?}", respo);
5860

5961
let respo: QueryDocumentsResponse<MySecondSampleStructOwned> = client
60-
.query_documents()
62+
.query_documents(query_obj)
6163
.query_cross_partition(true)
6264
.parallelize_cross_partition_query(true)
6365
.max_item_count(2)
64-
.execute(&query_obj)
65-
.await?;
66+
.into_stream()
67+
.next()
68+
.await
69+
.unwrap()?;
6670
println!("as items == {:?}", respo);
6771

6872
//let ret = client

sdk/data_cosmos/examples/readme.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
113113
// TASK 3
114114
println!("\nQuerying documents");
115115
let query_documents_response = collection_client
116-
.query_documents()
116+
.query_documents("SELECT * FROM A WHERE A.a_number < 600")
117117
.query_cross_partition(true) // this will perform a cross partition query! notice how simple it is!
118118
.consistency_level(session_token)
119-
.execute::<MySampleStruct, _>("SELECT * FROM A WHERE A.a_number < 600") // there are other ways to construct a query, this is the simplest.
120-
.await?
119+
.into_stream::<MySampleStruct>() // there are other ways to construct a query, this is the simplest.
120+
.next()
121+
.await
122+
.unwrap()?
121123
.into_documents() // queries can return Documents or Raw json (ie without etag, _rid, etc...). Since our query return docs we convert with this function.
122124
.unwrap(); // we know in advance that the conversion to Document will not fail since we SELECT'ed * FROM table
123125

sdk/data_cosmos/examples/stored_proc_01.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
/// response.setBody("Hello, " + personToGreet);
77
/// }
88
use azure_data_cosmos::prelude::*;
9+
use futures::StreamExt;
910
use std::error::Error;
1011

1112
#[tokio::main]
@@ -46,8 +47,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
4647
.clone()
4748
.into_stored_procedure_client(stored_procedure_name);
4849

49-
let list_stored_procedures_response =
50-
collection_client.list_stored_procedures().execute().await?;
50+
let list_stored_procedures_response = collection_client
51+
.list_stored_procedures()
52+
.into_stream()
53+
.next()
54+
.await
55+
.unwrap()?;
5156
println!(
5257
"list_stored_procedures_response == {:#?}",
5358
list_stored_procedures_response

sdk/data_cosmos/examples/user_defined_function_00.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
5151
.list_user_defined_functions()
5252
.max_item_count(3)
5353
.consistency_level(&ret);
54-
let mut stream = Box::pin(stream.stream());
54+
let mut stream = stream.into_stream();
5555
while let Some(ret) = stream.next().await {
5656
let ret = ret.unwrap();
5757
println!(
@@ -68,11 +68,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
6868
println!("Replace response object:\n{:#?}", ret);
6969

7070
let ret = collection_client
71-
.query_documents()
71+
.query_documents("SELECT udf.test15(100)")
7272
.consistency_level(&ret)
7373
.max_item_count(2i32)
74-
.execute::<serde_json::Value, _>("SELECT udf.test15(100)")
75-
.await?
74+
.into_stream::<serde_json::Value>()
75+
.next()
76+
.await
77+
.unwrap()?
7678
.into_raw();
7779
println!("Query response object:\n{:#?}", ret);
7880

sdk/data_cosmos/src/clients/collection_client.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::clients::*;
33
use crate::operations::*;
44
use crate::requests;
55
use crate::resources::collection::PartitionKey;
6+
use crate::resources::document::Query;
67
use crate::CosmosEntity;
78
use crate::ReadonlyString;
89
use azure_core::{HttpClient, Pipeline, Request};
@@ -73,18 +74,18 @@ impl CollectionClient {
7374
}
7475

7576
/// query documents in a collection
76-
pub fn query_documents(&self) -> requests::QueryDocumentsBuilder<'_, '_> {
77-
requests::QueryDocumentsBuilder::new(self)
77+
pub fn query_documents<Q: Into<Query>>(&self, query: Q) -> QueryDocumentsBuilder {
78+
QueryDocumentsBuilder::new(self.clone(), query.into())
7879
}
7980

8081
/// list stored procedures in a collection
81-
pub fn list_stored_procedures(&self) -> requests::ListStoredProceduresBuilder<'_, '_> {
82-
requests::ListStoredProceduresBuilder::new(self)
82+
pub fn list_stored_procedures(&self) -> ListStoredProceduresBuilder {
83+
ListStoredProceduresBuilder::new(self.clone())
8384
}
8485

8586
/// list user defined functions in a collection
86-
pub fn list_user_defined_functions(&self) -> requests::ListUserDefinedFunctionsBuilder<'_, '_> {
87-
requests::ListUserDefinedFunctionsBuilder::new(self)
87+
pub fn list_user_defined_functions(&self) -> ListUserDefinedFunctionsBuilder {
88+
ListUserDefinedFunctionsBuilder::new(self.clone())
8889
}
8990

9091
/// list triggers in a collection

sdk/data_cosmos/src/clients/document_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl DocumentClient {
6161
GetDocumentBuilder::new(self.clone())
6262
}
6363

64-
/// replace a document in a collection
64+
/// Replace a document in a collection
6565
pub fn replace_document<D: Serialize + Send + 'static>(
6666
&self,
6767
document: D,
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use crate::headers::from_headers::*;
2+
use crate::prelude::*;
3+
use crate::resources::ResourceType;
4+
use crate::resources::StoredProcedure;
5+
use crate::ResourceQuota;
6+
use azure_core::collect_pinned_stream;
7+
use azure_core::headers;
8+
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
9+
use azure_core::prelude::*;
10+
use azure_core::{Pageable, Response as HttpResponse};
11+
use chrono::{DateTime, Utc};
12+
13+
#[derive(Debug, Clone)]
14+
pub struct ListStoredProceduresBuilder {
15+
client: CollectionClient,
16+
consistency_level: Option<ConsistencyLevel>,
17+
max_item_count: MaxItemCount,
18+
context: Context,
19+
}
20+
21+
impl ListStoredProceduresBuilder {
22+
pub(crate) fn new(client: CollectionClient) -> Self {
23+
Self {
24+
client,
25+
consistency_level: None,
26+
max_item_count: MaxItemCount::new(-1),
27+
context: Context::new(),
28+
}
29+
}
30+
31+
setters! {
32+
consistency_level: ConsistencyLevel => Some(consistency_level),
33+
max_item_count: i32 => MaxItemCount::new(max_item_count),
34+
context: Context => context,
35+
}
36+
37+
pub fn into_stream(self) -> ListStoredProcedures {
38+
let make_request = move |continuation: Option<String>| {
39+
let this = self.clone();
40+
let ctx = self.context.clone();
41+
async move {
42+
let mut request = this.client.cosmos_client().prepare_request_pipeline(
43+
&format!(
44+
"dbs/{}/colls/{}/sprocs",
45+
this.client.database_client().database_name(),
46+
this.client.collection_name(),
47+
),
48+
http::Method::GET,
49+
);
50+
51+
azure_core::headers::add_optional_header2(&this.consistency_level, &mut request)?;
52+
azure_core::headers::add_mandatory_header2(&this.max_item_count, &mut request)?;
53+
54+
if let Some(c) = continuation {
55+
let h = http::HeaderValue::from_str(c.as_str())
56+
.map_err(azure_core::HttpHeaderError::InvalidHeaderValue)?;
57+
request.headers_mut().append(headers::CONTINUATION, h);
58+
}
59+
60+
let response = this
61+
.client
62+
.pipeline()
63+
.send(
64+
ctx.clone().insert(ResourceType::StoredProcedures),
65+
&mut request,
66+
)
67+
.await?;
68+
ListStoredProceduresResponse::try_from(response).await
69+
}
70+
};
71+
72+
Pageable::new(make_request)
73+
}
74+
}
75+
76+
pub type ListStoredProcedures = Pageable<ListStoredProceduresResponse, crate::Error>;
77+
78+
#[derive(Debug, Clone, PartialEq)]
79+
pub struct ListStoredProceduresResponse {
80+
pub stored_procedures: Vec<StoredProcedure>,
81+
pub charge: f64,
82+
pub activity_id: uuid::Uuid,
83+
pub session_token: String,
84+
pub last_change: DateTime<Utc>,
85+
pub resource_quota: Vec<ResourceQuota>,
86+
pub resource_usage: Vec<ResourceQuota>,
87+
pub gateway_version: String,
88+
pub continuation_token: Option<String>,
89+
}
90+
91+
impl ListStoredProceduresResponse {
92+
pub async fn try_from(response: HttpResponse) -> crate::Result<Self> {
93+
let (_status_code, headers, pinned_stream) = response.deconstruct();
94+
let body = collect_pinned_stream(pinned_stream).await?;
95+
96+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
97+
struct Response {
98+
pub _rid: String,
99+
#[serde(rename = "StoredProcedures")]
100+
pub stored_procedures: Vec<StoredProcedure>,
101+
pub _count: u64,
102+
}
103+
104+
Ok(Self {
105+
stored_procedures: serde_json::from_slice::<Response>(&body)?.stored_procedures,
106+
charge: request_charge_from_headers(&headers)?,
107+
activity_id: activity_id_from_headers(&headers)?,
108+
session_token: session_token_from_headers(&headers)?,
109+
last_change: last_state_change_from_headers(&headers)?,
110+
resource_quota: resource_quota_from_headers(&headers)?,
111+
resource_usage: resource_usage_from_headers(&headers)?,
112+
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
113+
continuation_token: continuation_token_from_headers_optional(&headers)?,
114+
})
115+
}
116+
}
117+
118+
impl Continuable for ListStoredProceduresResponse {
119+
fn continuation(&self) -> Option<String> {
120+
self.continuation_token.clone()
121+
}
122+
}

0 commit comments

Comments
 (0)