Skip to content

Commit ee772bd

Browse files
authored
Merge pull request #7652 from ClSlaid/add-etag-to-stage-file
feat(query): Add etag to stage file
2 parents 14c6409 + f50a4c5 commit ee772bd

File tree

6 files changed

+106
-5
lines changed

6 files changed

+106
-5
lines changed

src/meta/proto-conv/src/user_from_to_protobuf_impl.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ impl FromToProto for mt::StageFile {
657657
Some(c) => Some(mt::UserIdentity::from_pb(c)?),
658658
None => None,
659659
},
660+
etag: p.etag.clone(),
660661
})
661662
}
662663

@@ -672,6 +673,7 @@ impl FromToProto for mt::StageFile {
672673
Some(c) => Some(mt::UserIdentity::to_pb(c)?),
673674
None => None,
674675
},
676+
etag: self.etag.clone(),
675677
})
676678
}
677679
}

src/meta/proto-conv/src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
4242
7,
4343
"2022-09-09: Add: table.proto/{TableCopiedFileInfo,TableCopiedFileLock} type",
4444
),
45+
(8, "2022-09-16: Add: users.proto/StageFile::entity_tag"),
4546
];
4647

4748
pub const VER: u64 = META_CHANGE_LOG.last().unwrap().0;

src/meta/proto-conv/tests/it/user_proto_conv.rs

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414

1515
use std::collections::HashSet;
1616

17+
use common_datavalues::chrono::DateTime;
18+
use common_datavalues::chrono::NaiveDate;
19+
use common_datavalues::chrono::NaiveDateTime;
20+
use common_datavalues::chrono::NaiveTime;
21+
use common_datavalues::chrono::Utc;
1722
use common_meta_types as mt;
1823
use common_meta_types::UserInfo;
1924
use common_meta_types::UserPrivilegeType;
@@ -148,6 +153,22 @@ pub(crate) fn test_gcs_stage_info() -> mt::UserStageInfo {
148153
}
149154
}
150155

156+
pub(crate) fn test_stage_file() -> mt::StageFile {
157+
let dt = NaiveDateTime::new(
158+
NaiveDate::from_ymd(2022, 9, 16),
159+
NaiveTime::from_hms(0, 1, 2),
160+
);
161+
let user_id = mt::UserIdentity::new("datafuselabs", "datafuselabs.rs");
162+
mt::StageFile {
163+
path: "/path/to/stage".to_string(),
164+
size: 233,
165+
md5: None,
166+
last_modified: DateTime::from_utc(dt, Utc),
167+
creator: Some(user_id),
168+
etag: None,
169+
}
170+
}
171+
151172
#[test]
152173
fn test_user_pb_from_to() -> anyhow::Result<()> {
153174
let test_user_info = test_user_info();
@@ -158,6 +179,16 @@ fn test_user_pb_from_to() -> anyhow::Result<()> {
158179
Ok(())
159180
}
160181

182+
#[test]
183+
fn test_stage_file_pb_from_to() -> anyhow::Result<()> {
184+
let test_stage_file = test_stage_file();
185+
let test_stage_file_pb = test_stage_file.to_pb()?;
186+
let got = mt::StageFile::from_pb(test_stage_file_pb)?;
187+
assert_eq!(got, test_stage_file);
188+
189+
Ok(())
190+
}
191+
161192
#[test]
162193
fn test_user_incompatible() -> anyhow::Result<()> {
163194
{
@@ -179,6 +210,25 @@ fn test_user_incompatible() -> anyhow::Result<()> {
179210
);
180211
}
181212

213+
{
214+
let stage_file = test_stage_file();
215+
let mut p = stage_file.to_pb()?;
216+
p.ver = VER + 1;
217+
p.min_compatible = VER + 1;
218+
219+
let res = mt::StageFile::from_pb(p);
220+
assert_eq!(
221+
Incompatible {
222+
reason: format!(
223+
"executable ver={} is smaller than the message min compatible ver: {}",
224+
VER,
225+
VER + 1
226+
)
227+
},
228+
res.unwrap_err()
229+
)
230+
}
231+
182232
{
183233
let fs_stage_info = test_fs_stage_info();
184234
let mut p = fs_stage_info.to_pb()?;
@@ -250,7 +300,16 @@ fn test_build_user_pb_buf() -> anyhow::Result<()> {
250300

251301
let mut buf = vec![];
252302
common_protos::prost::Message::encode(&p, &mut buf)?;
253-
println!("{:?}", buf);
303+
println!("user_info: {:?}", buf);
304+
}
305+
306+
// StageFile
307+
{
308+
let stage_file = test_stage_file();
309+
let p = stage_file.to_pb()?;
310+
let mut buf = vec![];
311+
common_protos::prost::Message::encode(&p, &mut buf)?;
312+
println!("stage_file: {:?}", buf);
254313
}
255314

256315
// Stage on local file system
@@ -261,7 +320,7 @@ fn test_build_user_pb_buf() -> anyhow::Result<()> {
261320

262321
let mut buf = vec![];
263322
common_protos::prost::Message::encode(&p, &mut buf)?;
264-
println!("{:?}", buf);
323+
println!("fs_stage_info: {:?}", buf);
265324
}
266325

267326
// Stage on S3
@@ -272,7 +331,7 @@ fn test_build_user_pb_buf() -> anyhow::Result<()> {
272331

273332
let mut buf = vec![];
274333
common_protos::prost::Message::encode(&p, &mut buf)?;
275-
println!("{:?}", buf);
334+
println!("s3_stage_info: {:?}", buf);
276335
}
277336

278337
// Stage on GCS, supported in version >=4.
@@ -281,7 +340,7 @@ fn test_build_user_pb_buf() -> anyhow::Result<()> {
281340
let p = gcs_stage_info.to_pb()?;
282341
let mut buf = vec![];
283342
common_protos::prost::Message::encode(&p, &mut buf)?;
284-
println!("{:?}", buf);
343+
println!("gcs_stage_info: {:?}", buf);
285344
}
286345

287346
Ok(())
@@ -360,3 +419,39 @@ fn test_load_old_user() -> anyhow::Result<()> {
360419

361420
Ok(())
362421
}
422+
423+
#[test]
424+
fn test_old_stage_file() -> anyhow::Result<()> {
425+
// Encoded data of version 7 of StageFile:
426+
// Generated with `test_build_user_pb_buf`
427+
{
428+
let stage_file_v7 = vec![
429+
10, 14, 47, 112, 97, 116, 104, 47, 116, 111, 47, 115, 116, 97, 103, 101, 16, 233, 1,
430+
34, 23, 50, 48, 50, 50, 45, 48, 57, 45, 49, 54, 32, 48, 48, 58, 48, 49, 58, 48, 50, 32,
431+
85, 84, 67, 42, 37, 10, 12, 100, 97, 116, 97, 102, 117, 115, 101, 108, 97, 98, 115, 18,
432+
15, 100, 97, 116, 97, 102, 117, 115, 101, 108, 97, 98, 115, 46, 114, 115, 160, 6, 8,
433+
168, 6, 1, 160, 6, 8, 168, 6, 1,
434+
];
435+
let p: pb::StageFile =
436+
common_protos::prost::Message::decode(stage_file_v7.as_slice()).map_err(print_err)?;
437+
let got = mt::StageFile::from_pb(p).map_err(print_err)?;
438+
439+
let dt = NaiveDateTime::new(
440+
NaiveDate::from_ymd(2022, 9, 16),
441+
NaiveTime::from_hms(0, 1, 2),
442+
);
443+
let user_id = mt::UserIdentity::new("datafuselabs", "datafuselabs.rs");
444+
let want = mt::StageFile {
445+
path: "/path/to/stage".to_string(),
446+
size: 233,
447+
md5: None,
448+
last_modified: DateTime::from_utc(dt, Utc),
449+
creator: Some(user_id),
450+
..Default::default()
451+
};
452+
453+
assert_eq!(got, want);
454+
}
455+
456+
Ok(())
457+
}

src/meta/protos/proto/user.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,5 @@ message StageFile {
222222
optional string md5 = 3;
223223
string last_modified = 4;
224224
optional UserIdentity creator = 5;
225+
optional string etag = 6;
225226
}

src/meta/types/src/user_stage.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,12 @@ impl UserStageInfo {
258258
}
259259
}
260260

261-
#[derive(Default, Debug, Clone)]
261+
#[derive(Default, Debug, Clone, PartialEq, Eq)]
262262
pub struct StageFile {
263263
pub path: String,
264264
pub size: u64,
265265
pub md5: Option<String>,
266266
pub last_modified: DateTime<Utc>,
267267
pub creator: Option<UserIdentity>,
268+
pub etag: Option<String>,
268269
}

src/query/service/src/interpreters/interpreter_common.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ pub async fn list_files_from_dal(
232232
.last_modified()
233233
.map_or(Utc::now(), |t| Utc.timestamp(t.unix_timestamp(), 0)),
234234
creator: None,
235+
etag: meta.etag().map(str::to_string),
235236
})
236237
.collect::<Vec<StageFile>>();
237238

0 commit comments

Comments
 (0)