Skip to content

Commit f8862d2

Browse files
committed
add SegmentDescriptor
1 parent efa0e46 commit f8862d2

File tree

38 files changed

+269
-112
lines changed

38 files changed

+269
-112
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub async fn get_snapshot_referenced_files(
9797
format_version: ver,
9898
snapshot_id: root_snapshot.snapshot_id,
9999
timestamp: root_snapshot.timestamp,
100-
segments: HashSet::from_iter(root_snapshot.segments.clone()),
100+
segments: HashSet::from_iter(root_snapshot.segments.iter().map(|v| v.location.clone())),
101101
table_statistics_location: root_snapshot.table_statistics_location.clone(),
102102
});
103103
drop(root_snapshot);

src/query/ee/src/storages/fuse/operations/virtual_columns.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@ pub async fn do_refresh_virtual_column(
9999
let segment_locs = if let Some(segment_locs) = segment_locs {
100100
segment_locs
101101
} else {
102-
snapshot.segments.clone()
102+
snapshot
103+
.segments
104+
.iter()
105+
.map(|v| v.location.clone())
106+
.collect()
103107
};
104108

105109
// Read source variant columns and extract inner fields as virtual columns.

src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
5555
let write_settings = fuse_table.get_write_settings();
5656
let storage_format = write_settings.storage_format;
5757

58-
let segment_locs = Some(snapshot.segments.clone());
58+
let segment_locs = Some(snapshot.segments.iter().map(|v| v.location.clone()).collect());
5959
do_refresh_virtual_column(fuse_table, table_ctx, virtual_columns, segment_locs).await?;
6060

6161
let segment_reader =
6262
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());
6363

64-
for (location, ver) in &snapshot.segments {
64+
for seg in &snapshot.segments {
65+
let (location, ver) = &seg.location;
6566
let segment_info = segment_reader
6667
.read(&LoadParams {
6768
location: location.to_string(),

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ impl MergeIntoInterpreter {
441441
.segments
442442
.clone()
443443
.into_iter()
444+
.map(|v| v.location)
444445
.enumerate()
445446
.collect();
446447

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ impl ReplaceInterpreter {
308308
.segments
309309
.clone()
310310
.into_iter()
311+
.map(|v| v.location)
311312
.enumerate()
312313
.collect(),
313314
block_slots: None,

src/query/service/src/test_kits/fuse.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub async fn generate_snapshot_with_segments(
6767
let operator = fuse_table.get_operator();
6868
let location_gen = fuse_table.meta_location_generator();
6969
let mut new_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref());
70-
new_snapshot.segments = segment_locations;
70+
new_snapshot.segments = segment_locations.into_iter().map(|v| v.into()).collect();
7171
let new_snapshot_location = location_gen
7272
.snapshot_location_from_uuid(&new_snapshot.snapshot_id, TableSnapshot::VERSION)?;
7373
if let Some(ts) = time_stamp {
@@ -199,6 +199,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
199199

200200
// create snapshot 1, the format version is 3.
201201
let locations = vec![segments_v3[0].0.clone(), segments_v2[0].0.clone()];
202+
let locations = locations.into_iter().map(|v| v.into()).collect();
202203
let mut snapshot_1 = TableSnapshot::new(
203204
Uuid::new_v4(),
204205
&snapshot_0.timestamp,
@@ -224,7 +225,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
224225
segments_v2[0].0.clone(),
225226
];
226227
let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1);
227-
snapshot_2.segments = locations;
228+
snapshot_2.segments = locations.into_iter().map(|v| v.into()).collect();
228229
snapshot_2.timestamp = Some(now);
229230
snapshot_2.summary = merge_statistics(&snapshot_1.summary, &segments_v3[1].1.summary, None);
230231
let new_snapshot_location = location_gen

src/query/service/tests/it/storages/fuse/conflict.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
3737
fn test_unresolvable_delete_conflict() {
3838
let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
3939
base_snapshot.segments = vec![
40-
("1".to_string(), 1),
41-
("2".to_string(), 1),
42-
("3".to_string(), 1),
40+
("1".to_string(), 1).into(),
41+
("2".to_string(), 1).into(),
42+
("3".to_string(), 1).into(),
4343
];
4444

4545
let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
46-
latest_snapshot.segments = vec![("1".to_string(), 1), ("4".to_string(), 1)];
46+
latest_snapshot.segments = vec![("1".to_string(), 1).into(), ("4".to_string(), 1).into()];
4747

4848
let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
4949
appended_segments: vec![],
@@ -77,9 +77,9 @@ fn test_unresolvable_delete_conflict() {
7777
fn test_resolvable_delete_conflict() {
7878
let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
7979
base_snapshot.segments = vec![
80-
("1".to_string(), 1),
81-
("2".to_string(), 1),
82-
("3".to_string(), 1),
80+
("1".to_string(), 1).into(),
81+
("2".to_string(), 1).into(),
82+
("3".to_string(), 1).into(),
8383
];
8484

8585
base_snapshot.summary = Statistics {
@@ -95,9 +95,9 @@ fn test_resolvable_delete_conflict() {
9595

9696
let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
9797
latest_snapshot.segments = vec![
98-
("2".to_string(), 1),
99-
("3".to_string(), 1),
100-
("4".to_string(), 1),
98+
("2".to_string(), 1).into(),
99+
("3".to_string(), 1).into(),
100+
("4".to_string(), 1).into(),
101101
];
102102

103103
latest_snapshot.summary = Statistics {
@@ -135,7 +135,7 @@ fn test_resolvable_delete_conflict() {
135135

136136
let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
137137
appended_segments: vec![],
138-
replaced_segments: HashMap::from([(2, ("8".to_string(), 1))]),
138+
replaced_segments: HashMap::from([(2, ("8".to_string(), 1).into())]),
139139
removed_segment_indexes: vec![1],
140140
removed_statistics,
141141
merged_statistics,
@@ -150,7 +150,7 @@ fn test_resolvable_delete_conflict() {
150150
Some(Arc::new(latest_snapshot)),
151151
);
152152
let snapshot = result.unwrap();
153-
let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)];
153+
let expected = vec![("8".to_string(), 1).into(), ("4".to_string(), 1).into()];
154154
assert_eq!(snapshot.segments, expected);
155155

156156
let actual = snapshot.summary;
@@ -180,9 +180,9 @@ fn test_resolvable_delete_conflict() {
180180
fn test_resolvable_replace_conflict() {
181181
let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
182182
base_snapshot.segments = vec![
183-
("1".to_string(), 1),
184-
("2".to_string(), 1),
185-
("3".to_string(), 1),
183+
("1".to_string(), 1).into(),
184+
("2".to_string(), 1).into(),
185+
("3".to_string(), 1).into(),
186186
];
187187

188188
base_snapshot.summary = Statistics {
@@ -198,9 +198,9 @@ fn test_resolvable_replace_conflict() {
198198

199199
let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
200200
latest_snapshot.segments = vec![
201-
("2".to_string(), 1),
202-
("3".to_string(), 1),
203-
("4".to_string(), 1),
201+
("2".to_string(), 1).into(),
202+
("3".to_string(), 1).into(),
203+
("4".to_string(), 1).into(),
204204
];
205205

206206
latest_snapshot.summary = Statistics {
@@ -237,8 +237,8 @@ fn test_resolvable_replace_conflict() {
237237
};
238238

239239
let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
240-
appended_segments: vec![("6".to_string(), 1)],
241-
replaced_segments: HashMap::from([(2, ("5".to_string(), 1))]),
240+
appended_segments: vec![("6".to_string(), 1).into()],
241+
replaced_segments: HashMap::from([(2, ("5".to_string(), 1).into())]),
242242
removed_segment_indexes: vec![1],
243243
removed_statistics,
244244
merged_statistics,
@@ -254,9 +254,9 @@ fn test_resolvable_replace_conflict() {
254254
);
255255
let snapshot = result.unwrap();
256256
let expected = vec![
257-
("6".to_string(), 1),
258-
("5".to_string(), 1),
259-
("4".to_string(), 1),
257+
("6".to_string(), 1).into(), // TODO
258+
("5".to_string(), 1).into(),
259+
("4".to_string(), 1).into(),
260260
];
261261
assert_eq!(snapshot.segments, expected);
262262

src/query/service/tests/it/storages/fuse/operations/alter_table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ async fn check_segment_column_ids(
9898
fuse_table.get_operator(),
9999
TestFixture::default_table_schema(),
100100
);
101-
for (seg_loc, _) in &snapshot.segments {
101+
for seg in &snapshot.segments {
102102
let params = LoadParams {
103-
location: seg_loc.clone(),
103+
location: seg.location.0.clone(),
104104
len_hint: None,
105105
ver: SegmentInfo::VERSION,
106106
put_cache: false,

src/query/service/tests/it/storages/fuse/operations/commit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ async fn test_commit_to_meta_server() -> Result<()> {
251251
let table = fixture.latest_default_table().await?;
252252
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
253253

254-
let new_segments = vec![("do not care".to_string(), SegmentInfo::VERSION)];
254+
let new_segments = vec![("do not care".to_string(), SegmentInfo::VERSION).into()];
255255
let new_snapshot = TableSnapshot::new(
256256
Uuid::new_v4(),
257257
&None,

src/query/service/tests/it/storages/fuse/operations/internal_column.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,14 @@ async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<(
103103

104104
let snapshot = reader.read(&load_params).await?;
105105
for segment in &snapshot.segments {
106-
segment_name.insert(segment.0.clone());
106+
segment_name.insert(segment.location.0.clone());
107107

108108
let compact_segment_reader = MetaReaders::segment_info_reader(
109109
fuse_table.get_operator(),
110110
TestFixture::default_table_schema(),
111111
);
112112
let params = LoadParams {
113-
location: segment.0.clone(),
113+
location: segment.location.0.clone(),
114114
len_hint: None,
115115
ver: SegmentInfo::VERSION,
116116
put_cache: false,

0 commit comments

Comments
 (0)