Skip to content

Commit 892e89b

Browse files
committed
GetCollections::into_stream
1 parent b4eab38 commit 892e89b

File tree

9 files changed

+150
-114
lines changed

9 files changed

+150
-114
lines changed

sdk/data_cosmos/examples/collection.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use azure_core::prelude::*;
21
use azure_data_cosmos::prelude::*;
32
use futures::stream::StreamExt;
43
use std::error::Error;
@@ -60,12 +59,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
6059

6160
for db in databases.databases {
6261
let database_client = client.clone().into_database_client(db.id.clone());
63-
let collections = Box::pin(
64-
database_client.list_collections(Context::new(), ListCollectionsOptions::new()),
65-
)
66-
.next()
67-
.await
68-
.unwrap()?;
62+
let collections = Box::pin(database_client.list_collections().into_stream())
63+
.next()
64+
.await
65+
.unwrap()?;
6966
println!(
7067
"database {} has {} collection(s)",
7168
db.id,

sdk/data_cosmos/examples/create_delete_database.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use azure_core::Context;
21
use azure_data_cosmos::prelude::*;
32
use futures::stream::StreamExt;
43
use std::error::Error;
@@ -61,7 +60,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
6160
let get_collection_response = db_collection.get_collection().into_future().await?;
6261
println!("get_collection_response == {:#?}", get_collection_response);
6362

64-
let stream = db_client.list_collections(Context::new(), ListCollectionsOptions::new());
63+
let stream = db_client.list_collections().into_stream();
6564
let mut stream = Box::pin(stream);
6665
while let Some(res) = stream.next().await {
6766
let res = res?;

sdk/data_cosmos/examples/database_00.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
2525
println!("database == {:?}", db);
2626
let database = client.clone().into_database_client(db.name().to_owned());
2727

28-
let collections =
29-
Box::pin(database.list_collections(Context::new(), ListCollectionsOptions::new()))
30-
.next()
31-
.await
32-
.unwrap()?;
28+
let collections = Box::pin(database.list_collections().into_stream())
29+
.next()
30+
.await
31+
.unwrap()?;
3332
for collection in collections.collections {
3433
println!("collection == {:?}", collection);
3534
let collection_client = database.clone().into_collection_client(collection.id);

sdk/data_cosmos/examples/database_01.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use azure_core::Context;
21
use azure_data_cosmos::prelude::*;
32
use futures::stream::StreamExt;
43
use std::error::Error;
@@ -18,11 +17,10 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1817
let database_client = client.into_database_client("pollo");
1918
println!("database_name == {}", database_client.database_name());
2019

21-
let collections =
22-
Box::pin(database_client.list_collections(Context::new(), ListCollectionsOptions::new()))
23-
.next()
24-
.await
25-
.unwrap()?;
20+
let collections = Box::pin(database_client.list_collections().into_stream())
21+
.next()
22+
.await
23+
.unwrap()?;
2624
println!("collections == {:#?}", collections);
2725

2826
let collection_client = database_client.into_collection_client("cnt");

sdk/data_cosmos/examples/document_00.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
8585
client
8686
.clone()
8787
.into_database_client(database.id.clone())
88-
.list_collections(Context::new(), ListCollectionsOptions::new()),
88+
.list_collections()
89+
.into_stream(),
8990
)
9091
.next()
9192
.await

sdk/data_cosmos/src/clients/database_client.rs

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -68,61 +68,8 @@ impl DatabaseClient {
6868
}
6969

7070
/// List collections in the database
71-
pub fn list_collections(
72-
&self,
73-
ctx: Context,
74-
options: ListCollectionsOptions,
75-
) -> impl Stream<Item = crate::Result<ListCollectionsResponse>> + '_ {
76-
unfold(State::Init, move |state: State| {
77-
let this = self.clone();
78-
let ctx = ctx.clone();
79-
let options = options.clone();
80-
async move {
81-
let response = match state {
82-
State::Init => {
83-
let mut request = this.cosmos_client().prepare_request_pipeline(
84-
&format!("dbs/{}/colls", this.database_name()),
85-
http::Method::GET,
86-
);
87-
88-
r#try!(options.decorate_request(&mut request));
89-
let response = r#try!(
90-
this.pipeline()
91-
.send(ctx.clone().insert(ResourceType::Collections), &mut request)
92-
.await
93-
);
94-
ListCollectionsResponse::try_from(response).await
95-
}
96-
State::Continuation(continuation_token) => {
97-
let continuation = Continuation::new(continuation_token.as_str());
98-
let mut request = this.cosmos_client().prepare_request_pipeline(
99-
&format!("dbs/{}/colls", self.database_name()),
100-
http::Method::GET,
101-
);
102-
103-
r#try!(options.decorate_request(&mut request));
104-
r#try!(continuation.add_as_header2(&mut request));
105-
let response = r#try!(
106-
this.pipeline()
107-
.send(ctx.clone().insert(ResourceType::Collections), &mut request)
108-
.await
109-
);
110-
ListCollectionsResponse::try_from(response).await
111-
}
112-
State::Done => return None,
113-
};
114-
115-
let response = r#try!(response);
116-
117-
let next_state = response
118-
.continuation_token
119-
.clone()
120-
.map(State::Continuation)
121-
.unwrap_or(State::Done);
122-
123-
Some((Ok(response), next_state))
124-
}
125-
})
71+
pub fn list_collections(&self) -> ListCollectionsBuilder {
72+
ListCollectionsBuilder::new(self.clone())
12673
}
12774

12875
/// Create a collection

sdk/data_cosmos/src/operations/list_collections.rs

Lines changed: 109 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,122 @@ use crate::ResourceQuota;
55
use azure_core::collect_pinned_stream;
66
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
77
use azure_core::prelude::*;
8-
use azure_core::Request as HttpRequest;
98
use azure_core::Response as HttpResponse;
109
use chrono::{DateTime, Utc};
10+
use futures::stream::unfold;
11+
use futures::Stream;
1112

13+
/// Macro for short cutting a stream on error
14+
macro_rules! r#try {
15+
($expr:expr $(,)?) => {
16+
match $expr {
17+
Result::Ok(val) => val,
18+
Result::Err(err) => {
19+
return Some((Err(err.into()), State::Done));
20+
}
21+
}
22+
};
23+
}
24+
25+
/// Stream state
26+
#[derive(Debug, Clone, PartialEq)]
27+
enum State {
28+
Init,
29+
Continuation(String),
30+
Done,
31+
}
1232
#[derive(Debug, Clone)]
13-
pub struct ListCollectionsOptions {
33+
pub struct ListCollectionsBuilder {
34+
client: DatabaseClient,
1435
consistency_level: Option<ConsistencyLevel>,
1536
max_item_count: MaxItemCount,
37+
context: Context,
1638
}
1739

18-
impl ListCollectionsOptions {
19-
pub fn new() -> Self {
40+
impl ListCollectionsBuilder {
41+
pub(crate) fn new(client: DatabaseClient) -> Self {
2042
Self {
43+
client,
2144
max_item_count: MaxItemCount::new(-1),
2245
consistency_level: None,
46+
context: Context::new(),
2347
}
2448
}
2549

2650
setters! {
2751
consistency_level: ConsistencyLevel => Some(consistency_level),
2852
max_item_count: i32 => MaxItemCount::new(max_item_count),
53+
context: Context => context,
2954
}
3055

31-
pub fn decorate_request(&self, request: &mut HttpRequest) -> crate::Result<()> {
32-
azure_core::headers::add_optional_header2(&self.consistency_level, request)?;
33-
azure_core::headers::add_mandatory_header2(&self.max_item_count, request)?;
56+
pub fn into_stream(
57+
self,
58+
) -> impl Stream<Item = crate::Result<ListCollectionsResponse>> + 'static {
59+
unfold(State::Init, move |state: State| {
60+
let this = self.clone();
61+
let ctx = self.context.clone();
62+
async move {
63+
let response = match state {
64+
State::Init => {
65+
let mut request = this.client.cosmos_client().prepare_request_pipeline(
66+
&format!("dbs/{}/colls", this.client.database_name()),
67+
http::Method::GET,
68+
);
69+
70+
r#try!(azure_core::headers::add_optional_header2(
71+
&this.consistency_level,
72+
&mut request,
73+
));
74+
r#try!(azure_core::headers::add_mandatory_header2(
75+
&this.max_item_count,
76+
&mut request,
77+
));
78+
let response = r#try!(
79+
this.client
80+
.pipeline()
81+
.send(ctx.clone().insert(ResourceType::Collections), &mut request)
82+
.await
83+
);
84+
ListCollectionsResponse::try_from(response).await
85+
}
86+
State::Continuation(continuation_token) => {
87+
let continuation = Continuation::new(continuation_token.as_str());
88+
let mut request = this.client.cosmos_client().prepare_request_pipeline(
89+
&format!("dbs/{}/colls", this.client.database_name()),
90+
http::Method::GET,
91+
);
92+
93+
r#try!(azure_core::headers::add_optional_header2(
94+
&this.consistency_level,
95+
&mut request,
96+
));
97+
r#try!(azure_core::headers::add_mandatory_header2(
98+
&this.max_item_count,
99+
&mut request,
100+
));
101+
r#try!(continuation.add_as_header2(&mut request));
102+
let response = r#try!(
103+
this.client
104+
.pipeline()
105+
.send(ctx.clone().insert(ResourceType::Collections), &mut request)
106+
.await
107+
);
108+
ListCollectionsResponse::try_from(response).await
109+
}
110+
State::Done => return None,
111+
};
34112

35-
Ok(())
113+
let response = r#try!(response);
114+
115+
let next_state = response
116+
.continuation_token
117+
.clone()
118+
.map(State::Continuation)
119+
.unwrap_or(State::Done);
120+
121+
Some((Ok(response), next_state))
122+
}
123+
})
36124
}
37125
}
38126

@@ -90,3 +178,16 @@ impl ListCollectionsResponse {
90178
})
91179
}
92180
}
181+
182+
/// The future returned by calling `into_future` on the builder.
183+
pub type ListCollections =
184+
futures::future::BoxFuture<'static, crate::Result<ListCollectionsResponse>>;
185+
186+
#[cfg(feature = "into_future")]
187+
impl std::future::IntoFuture for ListCollectionsBuilder {
188+
type Future = ListCollections;
189+
type Output = <ListCollections as std::future::Future>::Output;
190+
fn into_future(self) -> Self::Future {
191+
Self::into_future(self)
192+
}
193+
}

sdk/data_cosmos/tests/cosmos_collection.rs

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ async fn create_and_delete_collection() {
2727
.into_future()
2828
.await
2929
.unwrap();
30-
let collections =
31-
Box::pin(database_client.list_collections(Context::new(), ListCollectionsOptions::new()))
32-
.next()
33-
.await
34-
.unwrap()
35-
.unwrap();
30+
let collections = Box::pin(database_client.list_collections().into_stream())
31+
.next()
32+
.await
33+
.unwrap()
34+
.unwrap();
3635
assert!(collections.collections.len() == 1);
3736

3837
// try to get the previously created collection
@@ -60,12 +59,11 @@ async fn create_and_delete_collection() {
6059
.into_future()
6160
.await
6261
.unwrap();
63-
let collections =
64-
Box::pin(database_client.list_collections(Context::new(), ListCollectionsOptions::new()))
65-
.next()
66-
.await
67-
.unwrap()
68-
.unwrap();
62+
let collections = Box::pin(database_client.list_collections().into_stream())
63+
.next()
64+
.await
65+
.unwrap()
66+
.unwrap();
6967
assert!(collections.collections.len() == 0);
7068

7169
database_client
@@ -105,12 +103,11 @@ async fn replace_collection() {
105103
.await
106104
.unwrap();
107105

108-
let collections =
109-
Box::pin(database_client.list_collections(Context::new(), ListCollectionsOptions::new()))
110-
.next()
111-
.await
112-
.unwrap()
113-
.unwrap();
106+
let collections = Box::pin(database_client.list_collections().into_stream())
107+
.next()
108+
.await
109+
.unwrap()
110+
.unwrap();
114111
assert_eq!(collections.collections.len(), 1);
115112
assert_eq!(
116113
collection.collection.indexing_policy,
@@ -152,12 +149,11 @@ async fn replace_collection() {
152149
.await
153150
.unwrap();
154151

155-
let collections =
156-
Box::pin(database_client.list_collections(Context::new(), ListCollectionsOptions::new()))
157-
.next()
158-
.await
159-
.unwrap()
160-
.unwrap();
152+
let collections = Box::pin(database_client.list_collections().into_stream())
153+
.next()
154+
.await
155+
.unwrap()
156+
.unwrap();
161157
assert_eq!(collections.collections.len(), 1);
162158
let eps: Vec<&ExcludedPath> = collections.collections[0]
163159
.indexing_policy

sdk/data_cosmos/tests/create_database_and_collection.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ async fn create_database_and_collection() -> Result<(), BoxedError> {
1515

1616
let client = setup::initialize("create_database_and_collection")?;
1717
let database_name = "test-create-database-and-collection";
18-
let context = Context::new();
1918

2019
// create database!
2120
log::info!("Creating a database with name '{}'...", database_name);
@@ -39,11 +38,10 @@ async fn create_database_and_collection() -> Result<(), BoxedError> {
3938

4039
// list collections!
4140
log::info!("Listing all collections...");
42-
let collections =
43-
Box::pin(db_client.list_collections(context.clone(), ListCollectionsOptions::new()))
44-
.next()
45-
.await
46-
.expect("No collection page")?;
41+
let collections = Box::pin(db_client.list_collections().into_stream())
42+
.next()
43+
.await
44+
.expect("No collection page")?;
4745
assert_eq!(collections.count, 1);
4846
log::info!("Successfully listed collections");
4947
log::debug!("The list_collection response: {:#?}", collections);

0 commit comments

Comments
 (0)