Skip to content

chore: basic segment info at snapshot level #14677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/fuse_compat/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ runs:
bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.306 rbac
bash ./tests/fuse-compat/test-fuse-forward-compat.sh 1.2.307 rbac
bash ./tests/fuse-compat/test-fuse-forward-compat.sh 1.2.318 rbac
bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.307 base
- name: Upload failure
if: failure()
uses: ./.github/actions/artifact_failure
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub async fn get_snapshot_referenced_files(
format_version: ver,
snapshot_id: root_snapshot.snapshot_id,
timestamp: root_snapshot.timestamp,
segments: HashSet::from_iter(root_snapshot.segments.clone()),
segments: HashSet::from_iter(root_snapshot.segments.iter().map(|v| v.location.clone())),
table_statistics_location: root_snapshot.table_statistics_location.clone(),
});
drop(root_snapshot);
Expand Down
6 changes: 5 additions & 1 deletion src/query/ee/src/storages/fuse/operations/virtual_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ pub async fn do_refresh_virtual_column(
let segment_locs = if let Some(segment_locs) = segment_locs {
segment_locs
} else {
snapshot.segments.clone()
snapshot
.segments
.iter()
.map(|v| v.location.clone())
.collect()
};

// Read source variant columns and extract inner fields as virtual columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,20 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
let write_settings = fuse_table.get_write_settings();
let storage_format = write_settings.storage_format;

let segment_locs = Some(snapshot.segments.clone());
let segment_locs = Some(
snapshot
.segments
.iter()
.map(|v| v.location.clone())
.collect(),
);
do_refresh_virtual_column(fuse_table, table_ctx, virtual_columns, segment_locs).await?;

let segment_reader =
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());

for (location, ver) in &snapshot.segments {
for seg in &snapshot.segments {
let (location, ver) = &seg.location;
let segment_info = segment_reader
.read(&LoadParams {
location: location.to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ impl MergeIntoInterpreter {
.segments
.clone()
.into_iter()
.map(|v| v.location)
.enumerate()
.collect();

Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ impl ReplaceInterpreter {
.segments
.clone()
.into_iter()
.map(|v| v.location)
.enumerate()
.collect(),
block_slots: None,
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn generate_snapshot_with_segments(
let operator = fuse_table.get_operator();
let location_gen = fuse_table.meta_location_generator();
let mut new_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref());
new_snapshot.segments = segment_locations;
new_snapshot.segments = segment_locations.into_iter().map(|v| v.into()).collect();
let new_snapshot_location = location_gen
.snapshot_location_from_uuid(&new_snapshot.snapshot_id, TableSnapshot::VERSION)?;
if let Some(ts) = time_stamp {
Expand Down Expand Up @@ -199,6 +199,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {

// create snapshot 1, the format version is 3.
let locations = vec![segments_v3[0].0.clone(), segments_v2[0].0.clone()];
let locations = locations.into_iter().map(|v| v.into()).collect();
let mut snapshot_1 = TableSnapshot::new(
Uuid::new_v4(),
&snapshot_0.timestamp,
Expand All @@ -224,7 +225,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
segments_v2[0].0.clone(),
];
let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1);
snapshot_2.segments = locations;
snapshot_2.segments = locations.into_iter().map(|v| v.into()).collect();
snapshot_2.timestamp = Some(now);
snapshot_2.summary = merge_statistics(&snapshot_1.summary, &segments_v3[1].1.summary, None);
let new_snapshot_location = location_gen
Expand Down
46 changes: 23 additions & 23 deletions src/query/service/tests/it/storages/fuse/conflict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
fn test_unresolvable_delete_conflict() {
let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
base_snapshot.segments = vec![
("1".to_string(), 1),
("2".to_string(), 1),
("3".to_string(), 1),
("1".to_string(), 1).into(),
("2".to_string(), 1).into(),
("3".to_string(), 1).into(),
];

let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
latest_snapshot.segments = vec![("1".to_string(), 1), ("4".to_string(), 1)];
latest_snapshot.segments = vec![("1".to_string(), 1).into(), ("4".to_string(), 1).into()];

let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
appended_segments: vec![],
Expand Down Expand Up @@ -77,9 +77,9 @@ fn test_unresolvable_delete_conflict() {
fn test_resolvable_delete_conflict() {
let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
base_snapshot.segments = vec![
("1".to_string(), 1),
("2".to_string(), 1),
("3".to_string(), 1),
("1".to_string(), 1).into(),
("2".to_string(), 1).into(),
("3".to_string(), 1).into(),
];

base_snapshot.summary = Statistics {
Expand All @@ -95,9 +95,9 @@ fn test_resolvable_delete_conflict() {

let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
latest_snapshot.segments = vec![
("2".to_string(), 1),
("3".to_string(), 1),
("4".to_string(), 1),
("2".to_string(), 1).into(),
("3".to_string(), 1).into(),
("4".to_string(), 1).into(),
];

latest_snapshot.summary = Statistics {
Expand Down Expand Up @@ -135,7 +135,7 @@ fn test_resolvable_delete_conflict() {

let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
appended_segments: vec![],
replaced_segments: HashMap::from([(2, ("8".to_string(), 1))]),
replaced_segments: HashMap::from([(2, ("8".to_string(), 1).into())]),
removed_segment_indexes: vec![1],
removed_statistics,
merged_statistics,
Expand All @@ -150,7 +150,7 @@ fn test_resolvable_delete_conflict() {
Some(Arc::new(latest_snapshot)),
);
let snapshot = result.unwrap();
let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)];
let expected = vec![("8".to_string(), 1).into(), ("4".to_string(), 1).into()];
assert_eq!(snapshot.segments, expected);

let actual = snapshot.summary;
Expand Down Expand Up @@ -180,9 +180,9 @@ fn test_resolvable_delete_conflict() {
fn test_resolvable_replace_conflict() {
let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
base_snapshot.segments = vec![
("1".to_string(), 1),
("2".to_string(), 1),
("3".to_string(), 1),
("1".to_string(), 1).into(),
("2".to_string(), 1).into(),
("3".to_string(), 1).into(),
];

base_snapshot.summary = Statistics {
Expand All @@ -198,9 +198,9 @@ fn test_resolvable_replace_conflict() {

let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default());
latest_snapshot.segments = vec![
("2".to_string(), 1),
("3".to_string(), 1),
("4".to_string(), 1),
("2".to_string(), 1).into(),
("3".to_string(), 1).into(),
("4".to_string(), 1).into(),
];

latest_snapshot.summary = Statistics {
Expand Down Expand Up @@ -237,8 +237,8 @@ fn test_resolvable_replace_conflict() {
};

let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges {
appended_segments: vec![("6".to_string(), 1)],
replaced_segments: HashMap::from([(2, ("5".to_string(), 1))]),
appended_segments: vec![("6".to_string(), 1).into()],
replaced_segments: HashMap::from([(2, ("5".to_string(), 1).into())]),
removed_segment_indexes: vec![1],
removed_statistics,
merged_statistics,
Expand All @@ -254,9 +254,9 @@ fn test_resolvable_replace_conflict() {
);
let snapshot = result.unwrap();
let expected = vec![
("6".to_string(), 1),
("5".to_string(), 1),
("4".to_string(), 1),
("6".to_string(), 1).into(), // TODO
("5".to_string(), 1).into(),
("4".to_string(), 1).into(),
];
assert_eq!(snapshot.segments, expected);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ async fn check_segment_column_ids(
fuse_table.get_operator(),
TestFixture::default_table_schema(),
);
for (seg_loc, _) in &snapshot.segments {
for seg in &snapshot.segments {
let params = LoadParams {
location: seg_loc.clone(),
location: seg.location.0.clone(),
len_hint: None,
ver: SegmentInfo::VERSION,
put_cache: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ async fn test_commit_to_meta_server() -> Result<()> {
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;

let new_segments = vec![("do not care".to_string(), SegmentInfo::VERSION)];
let new_segments = vec![("do not care".to_string(), SegmentInfo::VERSION).into()];
let new_snapshot = TableSnapshot::new(
Uuid::new_v4(),
&None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<(

let snapshot = reader.read(&load_params).await?;
for segment in &snapshot.segments {
segment_name.insert(segment.0.clone());
segment_name.insert(segment.location.0.clone());

let compact_segment_reader = MetaReaders::segment_info_reader(
fuse_table.get_operator(),
TestFixture::default_table_schema(),
);
let params = LoadParams {
location: segment.0.clone(),
location: segment.location.0.clone(),
len_hint: None,
ver: SegmentInfo::VERSION,
put_cache: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ async fn test_safety() -> Result<()> {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
ctx.get_data_operator()?.operator(),
loc.clone(),
loc.location.clone(),
TestFixture::default_table_schema(),
false,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ async fn test_safety_for_recluster() -> Result<()> {
}

let ctx: Arc<dyn TableContext> = ctx.clone();
let locations = locations.into_iter().map(|v| v.location).collect();
let segment_locations = create_segment_location_vector(locations, None);
let compact_segments = FuseTable::segment_pruning(
&ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ClusterStatistics;
use databend_storages_common_table_meta::meta::Compression;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentDescriptor;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::SegmentSummary;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::Versioned;
use futures_util::TryStreamExt;
Expand Down Expand Up @@ -712,7 +714,7 @@ impl CompactSegmentTestFixture {
thresholds: BlockThresholds,
cluster_key_id: Option<u32>,
block_per_seg: usize,
) -> Result<(Vec<Location>, Vec<BlockMeta>, Vec<SegmentInfo>)> {
) -> Result<(Vec<SegmentDescriptor>, Vec<BlockMeta>, Vec<SegmentInfo>)> {
let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned());
let data_accessor = ctx.get_data_operator()?.operator();
let threads_nums = ctx.get_settings().get_max_threads()? as usize;
Expand Down Expand Up @@ -792,7 +794,15 @@ impl CompactSegmentTestFixture {
let segment_info = SegmentInfo::new(stats_acc.blocks_metas, summary);
let path = location_gen.gen_segment_info_location();
segment_info.write_meta(&data_accessor, &path).await?;
Ok::<_, ErrorCode>(((path, SegmentInfo::VERSION), collected_blocks, segment_info))

let segment_descr = SegmentDescriptor {
location: (path, SegmentInfo::VERSION),
summary: Some(SegmentSummary {
row_count: segment_info.summary.row_count,
block_count: segment_info.summary.block_count,
}),
};
Ok::<_, ErrorCode>((segment_descr, collected_blocks, segment_info))
});
}

Expand Down Expand Up @@ -908,9 +918,9 @@ impl CompactCase {
// 4. input blocks should be there and in the original order
for location in r.segments_locations.iter().rev() {
let load_params = LoadParams {
location: location.0.clone(),
location: location.location.0.clone(),
len_hint: None,
ver: location.1,
ver: location.location.1,
put_cache: false,
};

Expand Down Expand Up @@ -1061,15 +1071,15 @@ async fn test_compact_segment_with_cluster() -> Result<()> {
);
let mut statistics_of_segments: Statistics = Statistics::default();
let mut output_block_id = Vec::with_capacity(number_of_blocks);
for location in state.segments_locations.iter().rev() {
let load_params = LoadParams {
location: location.0.clone(),
for segment_descr in state.segments_locations.iter().rev() {
let location = LoadParams {
location: segment_descr.location.0.clone(),
len_hint: None,
ver: location.1,
ver: segment_descr.location.1,
put_cache: false,
};

let compact_segment = compact_segment_reader.read(&load_params).await?;
let compact_segment = compact_segment_reader.read(&location).await?;
let segment = SegmentInfo::try_from(compact_segment)?;
merge_statistics_mut(
&mut statistics_of_segments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ async fn test_table_update_analyze_statistics() -> Result<()> {
);
for segment in after_update.segments.iter() {
let param = LoadParams {
location: segment.0.clone(),
location: segment.location.0.clone(),
len_hint: None,
ver: segment.1,
ver: segment.location.1,
put_cache: false,
};
let compact_segment = segment_reader.read(&param).await?;
Expand Down
10 changes: 7 additions & 3 deletions src/query/service/tests/it/storages/fuse/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ async fn apply_block_pruning(
bloom_index_cols: BloomIndexColumns,
) -> Result<Vec<Arc<BlockMeta>>> {
let ctx: Arc<dyn TableContext> = ctx;
let segment_locs = table_snapshot.segments.clone();
let segment_locs = create_segment_location_vector(segment_locs, None);
let locations = table_snapshot
.segments
.iter()
.map(|v| v.location.clone())
.collect();
let indexed_segment_locations = create_segment_location_vector(locations, None);
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols)?
.read_pruning(segment_locs)
.read_pruning(indexed_segment_locations)
.await
.map(|v| v.into_iter().map(|(_, v)| v).collect())
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/common/table_meta/src/meta/current/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub use v2::ColumnMeta;
pub use v2::ColumnStatistics;
pub use v2::Statistics;
pub use v4::CompactSegmentInfo;
pub use v4::SegmentDescriptor;
pub use v4::SegmentInfo;
pub use v4::SegmentSummary;
pub use v4::TableSnapshot;
pub use v4::TableSnapshotLite;

Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/common/table_meta/src/meta/v4/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ mod snapshot;

pub use segment::CompactSegmentInfo;
pub use segment::SegmentInfo;
pub use snapshot::SegmentDescriptor;
pub use snapshot::SegmentSummary;
pub use snapshot::TableSnapshot;
pub use snapshot::TableSnapshotLite;
Loading