Skip to content

Commit 8edf216

Browse files
authored
Add more data_lake operations (#495)
* Initial implementation of FileSystemClient.update_path(). * Better names for handcrafted API. * Add FileSystemClient.create_file_if_not_exists() * Better name. * Make append_to_file work. * Add file_flush operation. * Make position a parameter for appending to file. * Better name. * Remove comment. * Add basic rename file operation. * Take "close" as parameter for flush_file. * Separate file system and file operations from each other in test and examples. * Align client names in data_lake examples to client names in data_lake e2e test. * PR feedback: less println!(). * Make rename source a local thing.
1 parent 1e525ae commit 8edf216

File tree

10 files changed

+547
-96
lines changed

10 files changed

+547
-96
lines changed

sdk/core/src/request_options/content_length.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use http::request::Builder;
55
pub struct ContentLength(i32);
66

77
impl ContentLength {
8-
/// Create a new `ContentLength`
98
pub fn new(count: i32) -> Self {
109
Self(count)
1110
}

sdk/storage/examples/data_lake_00.rs

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use azure_core::prelude::IfMatchCondition;
21
use azure_core::prelude::*;
32
use azure_identity::token_credentials::DefaultAzureCredential;
43
use azure_identity::token_credentials::TokenCredential;
@@ -17,7 +16,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1716
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");
1817

1918
let now = Utc::now();
20-
let file_system_name = format!("azurerustsdk-datalake-example-{}", now.timestamp());
19+
let file_system_name = format!("azurerustsdk-datalake-example00-{}", now.timestamp());
2120

2221
let http_client = new_http_client();
2322

@@ -29,107 +28,72 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
2928
let bearer_token = DefaultAzureCredential::default()
3029
.get_token(resource_id)
3130
.await?;
32-
println!("token expires on {}", bearer_token.expires_on);
33-
println!();
31+
println!("token expires on {}\n", bearer_token.expires_on);
3432

3533
let storage_client = storage_account_client.as_storage_client();
36-
let data_lake = DataLakeClient::new(
34+
let data_lake_client = DataLakeClient::new(
3735
storage_client,
3836
account,
3937
bearer_token.token.secret().to_owned(),
4038
None,
4139
);
4240

43-
let file_system = data_lake
41+
let file_system_client = data_lake_client
4442
.clone()
4543
.into_file_system_client(file_system_name.to_string());
4644

4745
let mut fs_properties = Properties::new();
4846
fs_properties.insert("AddedVia", "Azure SDK for Rust");
4947

5048
println!("creating file system '{}'...", &file_system_name);
51-
let create_fs_response = file_system
49+
let create_fs_response = file_system_client
5250
.create()
5351
.properties(&fs_properties)
5452
.execute()
5553
.await?;
56-
println!("create file system response == {:?}", create_fs_response);
57-
println!();
54+
println!("create file system response == {:?}\n", create_fs_response);
5855

5956
println!("listing file systems...");
6057
let mut stream = Box::pin(
61-
data_lake
58+
data_lake_client
6259
.list()
6360
.max_results(NonZeroU32::new(3).unwrap())
6461
.stream(),
6562
);
6663
while let Some(list_fs_response) = stream.next().await {
67-
println!("list file system response == {:?}", list_fs_response);
68-
println!();
64+
println!("list file system response == {:?}\n", list_fs_response);
6965
}
7066

7167
println!("getting file system properties...");
72-
let get_fs_props_response = file_system.get_properties().execute().await?;
68+
let get_fs_props_response = file_system_client.get_properties().execute().await?;
7369
println!(
74-
"get file system properties response == {:?}",
70+
"get file system properties response == {:?}\n",
7571
get_fs_props_response
7672
);
77-
println!();
78-
79-
let file_name = "example-file.txt";
80-
81-
println!("creating path '{}'...", file_name);
82-
let create_path_response = file_system
83-
.create_path(Context::default(), file_name, CreatePathOptions::default())
84-
.await?;
85-
println!("create path response == {:?}", create_path_response);
86-
println!();
87-
88-
println!("creating path '{}' (overwrite)...", file_name);
89-
let create_path_response = file_system
90-
.create_path(Context::default(), file_name, CreatePathOptions::default())
91-
.await?;
92-
println!("create path response == {:?}", create_path_response);
93-
println!();
94-
95-
println!("creating path '{}' (do not overwrite)...", file_name);
96-
let do_not_overwrite =
97-
CreatePathOptions::new().if_match_condition(IfMatchCondition::NotMatch("*"));
98-
let create_path_result = file_system
99-
.create_path(Context::default(), file_name, do_not_overwrite)
100-
.await;
101-
println!(
102-
"create path result (should fail) == {:?}",
103-
create_path_result
104-
);
105-
println!();
10673

10774
println!("setting file system properties...");
10875
fs_properties.insert("ModifiedBy", "Iota");
109-
let set_fs_props_response = file_system
76+
let set_fs_props_response = file_system_client
11077
.set_properties(Some(&fs_properties))
11178
.execute()
11279
.await?;
11380
println!(
114-
"set file system properties response == {:?}",
81+
"set file system properties response == {:?}\n",
11582
set_fs_props_response
11683
);
117-
println!();
11884

11985
println!("getting file system properties...");
120-
let get_fs_props_response = file_system.get_properties().execute().await?;
86+
let get_fs_props_response = file_system_client.get_properties().execute().await?;
12187
println!(
122-
"get file system properties response == {:?}",
88+
"get file system properties response == {:?}\n",
12389
get_fs_props_response
12490
);
125-
println!();
12691

12792
println!("deleting file system...");
128-
let delete_fs_response = file_system.delete().execute().await?;
129-
println!("delete file system response == {:?}", delete_fs_response);
130-
println!();
93+
let delete_fs_response = file_system_client.delete().execute().await?;
94+
println!("delete file system response == {:?}\n", delete_fs_response);
13195

132-
println!("data lake example done.");
96+
println!("data lake example 00 done.");
13397

13498
Ok(())
13599
}

sdk/storage/examples/data_lake_01.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
use azure_core::prelude::*;
2+
use azure_identity::token_credentials::DefaultAzureCredential;
3+
use azure_identity::token_credentials::TokenCredential;
4+
use azure_storage::core::prelude::*;
5+
use azure_storage::data_lake::prelude::*;
6+
use chrono::Utc;
7+
use std::error::Error;
8+
9+
#[tokio::main]
10+
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
11+
let account = std::env::var("ADLSGEN2_STORAGE_ACCOUNT")
12+
.expect("Set env variable ADLSGEN2_STORAGE_ACCOUNT first!");
13+
let master_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
14+
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");
15+
16+
let now = Utc::now();
17+
let file_system_name = format!("azurerustsdk-datalake-example01-{}", now.timestamp());
18+
19+
let http_client = new_http_client();
20+
21+
let storage_account_client =
22+
StorageAccountClient::new_access_key(http_client.clone(), &account, &master_key);
23+
24+
let resource_id = "https://storage.azure.com/";
25+
println!("getting bearer token for '{}'...", resource_id);
26+
let bearer_token = DefaultAzureCredential::default()
27+
.get_token(resource_id)
28+
.await?;
29+
println!("token expires on {}\n", bearer_token.expires_on);
30+
31+
let storage_client = storage_account_client.as_storage_client();
32+
let data_lake_client = DataLakeClient::new(
33+
storage_client,
34+
account,
35+
bearer_token.token.secret().to_owned(),
36+
None,
37+
);
38+
39+
let file_system_client = data_lake_client
40+
.clone()
41+
.into_file_system_client(file_system_name.to_string());
42+
43+
println!("creating file system '{}'...", &file_system_name);
44+
let create_fs_response = file_system_client.create().execute().await?;
45+
println!("create file system response == {:?}\n", create_fs_response);
46+
47+
let file_path = "some/path/example-file.txt";
48+
49+
println!("creating file '{}'...", file_path);
50+
let create_file_response = file_system_client
51+
.create_file(Context::default(), file_path, FileCreateOptions::default())
52+
.await?;
53+
println!("create file response == {:?}\n", create_file_response);
54+
55+
println!("creating file '{}' (overwrite)...", file_path);
56+
let create_file_response = file_system_client
57+
.create_file(Context::default(), file_path, FileCreateOptions::default())
58+
.await?;
59+
println!("create file response == {:?}\n", create_file_response);
60+
61+
println!("creating file '{}' if not exists...", file_path);
62+
let create_file_if_not_exists_result = file_system_client
63+
.create_file_if_not_exists(Context::default(), file_path)
64+
.await;
65+
println!(
66+
"create file result (should fail) == {:?}\n",
67+
create_file_if_not_exists_result
68+
);
69+
70+
println!("appending to file '{}'...", file_path);
71+
let bytes = bytes::Bytes::from("some data");
72+
let file_length = bytes.len() as i64;
73+
let append_to_file_response = file_system_client
74+
.append_to_file(
75+
Context::default(),
76+
file_path,
77+
bytes,
78+
0,
79+
FileAppendOptions::default(),
80+
)
81+
.await?;
82+
println!("append to file response == {:?}\n", append_to_file_response);
83+
84+
println!("flushing file '{}'...", file_path);
85+
let flush_file_response = file_system_client
86+
.flush_file(
87+
Context::default(),
88+
file_path,
89+
file_length,
90+
true,
91+
FileFlushOptions::default(),
92+
)
93+
.await?;
94+
println!("flush file response == {:?}\n", flush_file_response);
95+
96+
let destination_file_path = "some/path/example-file-renamed.txt";
97+
println!(
98+
"renaming file '{}' to '{}'...",
99+
file_path, destination_file_path
100+
);
101+
let rename_file_response = file_system_client
102+
.rename_file(
103+
Context::default(),
104+
file_path,
105+
destination_file_path,
106+
FileRenameOptions::default(),
107+
)
108+
.await?;
109+
println!("rename file response == {:?}\n", rename_file_response);
110+
111+
println!("deleting file system...");
112+
let delete_fs_response = file_system_client.delete().execute().await?;
113+
println!("delete file system response == {:?}\n", delete_fs_response);
114+
115+
println!("data lake example 01 done.");
116+
117+
Ok(())
118+
}

0 commit comments

Comments
 (0)