diff --git a/.github/actions/fuse_compat/action.yml b/.github/actions/fuse_compat/action.yml index f865b3ee69be3..d0aef614195fd 100644 --- a/.github/actions/fuse_compat/action.yml +++ b/.github/actions/fuse_compat/action.yml @@ -31,6 +31,7 @@ runs: bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.306 rbac bash ./tests/fuse-compat/test-fuse-forward-compat.sh 1.2.307 rbac bash ./tests/fuse-compat/test-fuse-forward-compat.sh 1.2.318 rbac + bash ./tests/fuse-compat/test-fuse-compat.sh 1.2.307 base - name: Upload failure if: failure() uses: ./.github/actions/artifact_failure diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_table.rs b/src/query/ee/src/storages/fuse/operations/vacuum_table.rs index 8a8b44dc81adf..8360f312c821a 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_table.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_table.rs @@ -97,7 +97,7 @@ pub async fn get_snapshot_referenced_files( format_version: ver, snapshot_id: root_snapshot.snapshot_id, timestamp: root_snapshot.timestamp, - segments: HashSet::from_iter(root_snapshot.segments.clone()), + segments: HashSet::from_iter(root_snapshot.segments.iter().map(|v| v.location.clone())), table_statistics_location: root_snapshot.table_statistics_location.clone(), }); drop(root_snapshot); diff --git a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs index 142cf0634885c..d6707e9e12153 100644 --- a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs @@ -99,7 +99,11 @@ pub async fn do_refresh_virtual_column( let segment_locs = if let Some(segment_locs) = segment_locs { segment_locs } else { - snapshot.segments.clone() + snapshot + .segments + .iter() + .map(|v| v.location.clone()) + .collect() }; // Read source variant columns and extract inner fields as virtual columns. diff --git a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs index fe997b45f41cd..575cb1673b2cc 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs @@ -55,13 +55,20 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { let write_settings = fuse_table.get_write_settings(); let storage_format = write_settings.storage_format; - let segment_locs = Some(snapshot.segments.clone()); + let segment_locs = Some( + snapshot + .segments + .iter() + .map(|v| v.location.clone()) + .collect(), + ); do_refresh_virtual_column(fuse_table, table_ctx, virtual_columns, segment_locs).await?; let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone()); - for (location, ver) in &snapshot.segments { + for seg in &snapshot.segments { + let (location, ver) = &seg.location; let segment_info = segment_reader .read(&LoadParams { location: location.to_string(), diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index f1afea69f5f8a..2f3dab9f2bc8f 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -441,6 +441,7 @@ impl MergeIntoInterpreter { .segments .clone() .into_iter() + .map(|v| v.location) .enumerate() .collect(); diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 644f0a06f8189..487dd8cdbcd91 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -308,6 +308,7 @@ impl ReplaceInterpreter { .segments .clone() .into_iter() + .map(|v| v.location) .enumerate() .collect(), block_slots: None, diff --git a/src/query/service/src/test_kits/fuse.rs b/src/query/service/src/test_kits/fuse.rs index 573647b3151df..3776c13651d6b 100644 --- a/src/query/service/src/test_kits/fuse.rs +++ b/src/query/service/src/test_kits/fuse.rs @@ -67,7 +67,7 @@ pub async fn generate_snapshot_with_segments( let operator = fuse_table.get_operator(); let location_gen = fuse_table.meta_location_generator(); let mut new_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref()); - new_snapshot.segments = segment_locations; + new_snapshot.segments = segment_locations.into_iter().map(|v| v.into()).collect(); let new_snapshot_location = location_gen .snapshot_location_from_uuid(&new_snapshot.snapshot_id, TableSnapshot::VERSION)?; if let Some(ts) = time_stamp { @@ -199,6 +199,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> { // create snapshot 1, the format version is 3. let locations = vec![segments_v3[0].0.clone(), segments_v2[0].0.clone()]; + let locations = locations.into_iter().map(|v| v.into()).collect(); let mut snapshot_1 = TableSnapshot::new( Uuid::new_v4(), &snapshot_0.timestamp, @@ -224,7 +225,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> { segments_v2[0].0.clone(), ]; let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1); - snapshot_2.segments = locations; + snapshot_2.segments = locations.into_iter().map(|v| v.into()).collect(); snapshot_2.timestamp = Some(now); snapshot_2.summary = merge_statistics(&snapshot_1.summary, &segments_v3[1].1.summary, None); let new_snapshot_location = location_gen diff --git a/src/query/service/tests/it/storages/fuse/conflict.rs b/src/query/service/tests/it/storages/fuse/conflict.rs index 6af61e6dd2e3f..ea58fef5a23c2 100644 --- a/src/query/service/tests/it/storages/fuse/conflict.rs +++ b/src/query/service/tests/it/storages/fuse/conflict.rs @@ -37,13 +37,13 @@ use databend_storages_common_table_meta::meta::TableSnapshot; fn test_unresolvable_delete_conflict() { let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); base_snapshot.segments = vec![ - ("1".to_string(), 1), - ("2".to_string(), 1), - ("3".to_string(), 1), + ("1".to_string(), 1).into(), + ("2".to_string(), 1).into(), + ("3".to_string(), 1).into(), ]; let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); - latest_snapshot.segments = vec![("1".to_string(), 1), ("4".to_string(), 1)]; + latest_snapshot.segments = vec![("1".to_string(), 1).into(), ("4".to_string(), 1).into()]; let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges { appended_segments: vec![], @@ -77,9 +77,9 @@ fn test_unresolvable_delete_conflict() { fn test_resolvable_delete_conflict() { let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); base_snapshot.segments = vec![ - ("1".to_string(), 1), - ("2".to_string(), 1), - ("3".to_string(), 1), + ("1".to_string(), 1).into(), + ("2".to_string(), 1).into(), + ("3".to_string(), 1).into(), ]; base_snapshot.summary = Statistics { @@ -95,9 +95,9 @@ fn test_resolvable_delete_conflict() { let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); latest_snapshot.segments = vec![ - ("2".to_string(), 1), - ("3".to_string(), 1), - ("4".to_string(), 1), + ("2".to_string(), 1).into(), + ("3".to_string(), 1).into(), + ("4".to_string(), 1).into(), ]; latest_snapshot.summary = Statistics { @@ -135,7 +135,7 @@ fn test_resolvable_delete_conflict() { let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges { appended_segments: vec![], - replaced_segments: HashMap::from([(2, ("8".to_string(), 1))]), + replaced_segments: HashMap::from([(2, ("8".to_string(), 1).into())]), removed_segment_indexes: vec![1], removed_statistics, merged_statistics, @@ -150,7 +150,7 @@ fn test_resolvable_delete_conflict() { Some(Arc::new(latest_snapshot)), ); let snapshot = result.unwrap(); - let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)]; + let expected = vec![("8".to_string(), 1).into(), ("4".to_string(), 1).into()]; assert_eq!(snapshot.segments, expected); let actual = snapshot.summary; @@ -180,9 +180,9 @@ fn test_resolvable_delete_conflict() { fn test_resolvable_replace_conflict() { let mut base_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); base_snapshot.segments = vec![ - ("1".to_string(), 1), - ("2".to_string(), 1), - ("3".to_string(), 1), + ("1".to_string(), 1).into(), + ("2".to_string(), 1).into(), + ("3".to_string(), 1).into(), ]; base_snapshot.summary = Statistics { @@ -198,9 +198,9 @@ fn test_resolvable_replace_conflict() { let mut latest_snapshot = TableSnapshot::new_empty_snapshot(TableSchema::default()); latest_snapshot.segments = vec![ - ("2".to_string(), 1), - ("3".to_string(), 1), - ("4".to_string(), 1), + ("2".to_string(), 1).into(), + ("3".to_string(), 1).into(), + ("4".to_string(), 1).into(), ]; latest_snapshot.summary = Statistics { @@ -237,8 +237,8 @@ fn test_resolvable_replace_conflict() { }; let ctx = ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges { - appended_segments: vec![("6".to_string(), 1)], - replaced_segments: HashMap::from([(2, ("5".to_string(), 1))]), + appended_segments: vec![("6".to_string(), 1).into()], + replaced_segments: HashMap::from([(2, ("5".to_string(), 1).into())]), removed_segment_indexes: vec![1], removed_statistics, merged_statistics, @@ -254,9 +254,9 @@ fn test_resolvable_replace_conflict() { ); let snapshot = result.unwrap(); let expected = vec![ - ("6".to_string(), 1), - ("5".to_string(), 1), - ("4".to_string(), 1), + ("6".to_string(), 1).into(), // TODO + ("5".to_string(), 1).into(), + ("4".to_string(), 1).into(), ]; assert_eq!(snapshot.segments, expected); diff --git a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs index b1beece109fe5..fff023713a0f8 100644 --- a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs +++ b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs @@ -98,9 +98,9 @@ async fn check_segment_column_ids( fuse_table.get_operator(), TestFixture::default_table_schema(), ); - for (seg_loc, _) in &snapshot.segments { + for seg in &snapshot.segments { let params = LoadParams { - location: seg_loc.clone(), + location: seg.location.0.clone(), len_hint: None, ver: SegmentInfo::VERSION, put_cache: false, diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index b5db2eaa35631..0d6e5c6f26e47 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -251,7 +251,7 @@ async fn test_commit_to_meta_server() -> Result<()> { let table = fixture.latest_default_table().await?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let new_segments = vec![("do not care".to_string(), SegmentInfo::VERSION)]; + let new_segments = vec![("do not care".to_string(), SegmentInfo::VERSION).into()]; let new_snapshot = TableSnapshot::new( Uuid::new_v4(), &None, diff --git a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs index 21970e7017292..583ca5be9c184 100644 --- a/src/query/service/tests/it/storages/fuse/operations/internal_column.rs +++ b/src/query/service/tests/it/storages/fuse/operations/internal_column.rs @@ -103,14 +103,14 @@ async fn check_partitions(parts: &Partitions, fixture: &TestFixture) -> Result<( let snapshot = reader.read(&load_params).await?; for segment in &snapshot.segments { - segment_name.insert(segment.0.clone()); + segment_name.insert(segment.location.0.clone()); let compact_segment_reader = MetaReaders::segment_info_reader( fuse_table.get_operator(), TestFixture::default_table_schema(), ); let params = LoadParams { - location: segment.0.clone(), + location: segment.location.0.clone(), len_hint: None, ver: SegmentInfo::VERSION, put_cache: false, diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 591d2c879de83..de5291ae1d9b1 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -268,7 +268,7 @@ async fn test_safety() -> Result<()> { let loc = locations.get(idx).unwrap(); let compact_segment = SegmentsIO::read_compact_segment( ctx.get_data_operator()?.operator(), - loc.clone(), + loc.location.clone(), TestFixture::default_table_schema(), false, ) diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 0ad5808e99da2..a4d9fe61ee7ba 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -235,6 +235,7 @@ async fn test_safety_for_recluster() -> Result<()> { } let ctx: Arc = ctx.clone(); + let locations = locations.into_iter().map(|v| v.location).collect(); let segment_locations = create_segment_location_vector(locations, None); let compact_segments = FuseTable::segment_pruning( &ctx, diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 6895e6d96252c..22f574b17daa6 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -59,7 +59,9 @@ use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::Compression; use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SegmentDescriptor; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::SegmentSummary; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::Versioned; use futures_util::TryStreamExt; @@ -712,7 +714,7 @@ impl CompactSegmentTestFixture { thresholds: BlockThresholds, cluster_key_id: Option, block_per_seg: usize, - ) -> Result<(Vec, Vec, Vec)> { + ) -> Result<(Vec, Vec, Vec)> { let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned()); let data_accessor = ctx.get_data_operator()?.operator(); let threads_nums = ctx.get_settings().get_max_threads()? as usize; @@ -792,7 +794,15 @@ impl CompactSegmentTestFixture { let segment_info = SegmentInfo::new(stats_acc.blocks_metas, summary); let path = location_gen.gen_segment_info_location(); segment_info.write_meta(&data_accessor, &path).await?; - Ok::<_, ErrorCode>(((path, SegmentInfo::VERSION), collected_blocks, segment_info)) + + let segment_descr = SegmentDescriptor { + location: (path, SegmentInfo::VERSION), + summary: Some(SegmentSummary { + row_count: segment_info.summary.row_count, + block_count: segment_info.summary.block_count, + }), + }; + Ok::<_, ErrorCode>((segment_descr, collected_blocks, segment_info)) }); } @@ -908,9 +918,9 @@ impl CompactCase { // 4. input blocks should be there and in the original order for location in r.segments_locations.iter().rev() { let load_params = LoadParams { - location: location.0.clone(), + location: location.location.0.clone(), len_hint: None, - ver: location.1, + ver: location.location.1, put_cache: false, }; @@ -1061,15 +1071,15 @@ async fn test_compact_segment_with_cluster() -> Result<()> { ); let mut statistics_of_segments: Statistics = Statistics::default(); let mut output_block_id = Vec::with_capacity(number_of_blocks); - for location in state.segments_locations.iter().rev() { - let load_params = LoadParams { - location: location.0.clone(), + for segment_descr in state.segments_locations.iter().rev() { + let location = LoadParams { + location: segment_descr.location.0.clone(), len_hint: None, - ver: location.1, + ver: segment_descr.location.1, put_cache: false, }; - let compact_segment = compact_segment_reader.read(&load_params).await?; + let compact_segment = compact_segment_reader.read(&location).await?; let segment = SegmentInfo::try_from(compact_segment)?; merge_statistics_mut( &mut statistics_of_segments, diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 4e68a763da9dd..1034f1f7935b6 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -133,9 +133,9 @@ async fn test_table_update_analyze_statistics() -> Result<()> { ); for segment in after_update.segments.iter() { let param = LoadParams { - location: segment.0.clone(), + location: segment.location.0.clone(), len_hint: None, - ver: segment.1, + ver: segment.location.1, put_cache: false, }; let compact_segment = segment_reader.read(¶m).await?; diff --git a/src/query/service/tests/it/storages/fuse/pruning.rs b/src/query/service/tests/it/storages/fuse/pruning.rs index 03d24183f44c4..e5f37f6954dbb 100644 --- a/src/query/service/tests/it/storages/fuse/pruning.rs +++ b/src/query/service/tests/it/storages/fuse/pruning.rs @@ -61,10 +61,14 @@ async fn apply_block_pruning( bloom_index_cols: BloomIndexColumns, ) -> Result>> { let ctx: Arc = ctx; - let segment_locs = table_snapshot.segments.clone(); - let segment_locs = create_segment_location_vector(segment_locs, None); + let locations = table_snapshot + .segments + .iter() + .map(|v| v.location.clone()) + .collect(); + let indexed_segment_locations = create_segment_location_vector(locations, None); FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols)? - .read_pruning(segment_locs) + .read_pruning(indexed_segment_locations) .await .map(|v| v.into_iter().map(|(_, v)| v).collect()) } diff --git a/src/query/storages/common/table_meta/src/meta/current/mod.rs b/src/query/storages/common/table_meta/src/meta/current/mod.rs index f7778cb977159..74fcb94b24b3c 100644 --- a/src/query/storages/common/table_meta/src/meta/current/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/current/mod.rs @@ -20,7 +20,9 @@ pub use v2::ColumnMeta; pub use v2::ColumnStatistics; pub use v2::Statistics; pub use v4::CompactSegmentInfo; +pub use v4::SegmentDescriptor; pub use v4::SegmentInfo; +pub use v4::SegmentSummary; pub use v4::TableSnapshot; pub use v4::TableSnapshotLite; diff --git a/src/query/storages/common/table_meta/src/meta/v4/mod.rs b/src/query/storages/common/table_meta/src/meta/v4/mod.rs index ca6e09db166df..921102f626916 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/mod.rs @@ -17,5 +17,7 @@ mod snapshot; pub use segment::CompactSegmentInfo; pub use segment::SegmentInfo; +pub use snapshot::SegmentDescriptor; +pub use snapshot::SegmentSummary; pub use snapshot::TableSnapshot; pub use snapshot::TableSnapshotLite; diff --git a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs index 582032fe1ea63..2b543a9418115 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs @@ -78,13 +78,34 @@ pub struct TableSnapshot { /// /// We rely on background merge tasks to keep merging segments, so that /// this the size of this vector could be kept reasonable - pub segments: Vec, + pub segments: Vec, // The metadata of the cluster keys. pub cluster_key_meta: Option, pub table_statistics_location: Option, } +// TODO remove PartialEq +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct SegmentSummary { + pub row_count: u64, + pub block_count: u64, +} +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct SegmentDescriptor { + pub location: Location, + pub summary: Option, +} + +impl From for SegmentDescriptor { + fn from(location: Location) -> Self { + SegmentDescriptor { + location, + summary: None, + } + } +} + impl TableSnapshot { pub fn new( snapshot_id: SnapshotId, @@ -92,7 +113,7 @@ impl TableSnapshot { prev_snapshot_id: Option<(SnapshotId, FormatVersion)>, schema: TableSchema, summary: Statistics, - segments: Vec, + segments: Vec, cluster_key_meta: Option, table_statistics_location: Option, ) -> Self { @@ -218,7 +239,7 @@ impl From for TableSnapshot { prev_snapshot_id: s.prev_snapshot_id, schema: s.schema, summary: s.summary, - segments: s.segments, + segments: s.segments.into_iter().map(|v| v.into()).collect(), cluster_key_meta: s.cluster_key_meta, table_statistics_location: s.table_statistics_location, } @@ -239,7 +260,7 @@ where T: Into prev_snapshot_id: s.prev_snapshot_id, schema: s.schema.into(), summary: s.summary.into(), - segments: s.segments, + segments: s.segments.into_iter().map(|v| v.into()).collect(), cluster_key_meta: s.cluster_key_meta, table_statistics_location: s.table_statistics_location, } diff --git a/src/query/storages/fuse/src/io/snapshots.rs b/src/query/storages/fuse/src/io/snapshots.rs index 9c187020643a0..543821134ee14 100644 --- a/src/query/storages/fuse/src/io/snapshots.rs +++ b/src/query/storages/fuse/src/io/snapshots.rs @@ -267,10 +267,10 @@ impl SnapshotsIO { let mut segments = HashSet::new(); // collects extended segments. for segment_location in &snapshot.segments { - if root_snapshot.segments.contains(segment_location) { + if root_snapshot.segments.contains(&segment_location.location) { continue; } - segments.insert(segment_location.clone()); + segments.insert(segment_location.location.clone()); } let table_statistics_location = if snapshot.table_statistics_location != root_snapshot.table_statistics_location { diff --git a/src/query/storages/fuse/src/operations/analyze.rs b/src/query/storages/fuse/src/operations/analyze.rs index 71c24c7f74213..815457a7b5848 100644 --- a/src/query/storages/fuse/src/operations/analyze.rs +++ b/src/query/storages/fuse/src/operations/analyze.rs @@ -72,8 +72,9 @@ impl FuseTable { blocks_cluster_stats.push(cluster_stats.clone()); } + let locations = chunk.iter().map(|v| v.location.clone()).collect::>(); let segments = segments_io - .read_segments::(chunk, true) + .read_segments::(&locations, true) .await?; for segment in segments { let segment = segment?; diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 99fc9ecd9f971..2b22c84397cad 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -37,7 +37,7 @@ use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransforme use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache_manager::CachedObject; -use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SegmentDescriptor; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::SnapshotId; use databend_storages_common_table_meta::meta::Statistics; @@ -287,7 +287,7 @@ impl FuseTable { &self, ctx: &Arc, base_snapshot: Arc, - base_segments: &[Location], + base_segments: &[SegmentDescriptor], base_summary: Statistics, abort_operation: AbortOperation, max_retry_elapsed: Option, @@ -303,7 +303,7 @@ impl FuseTable { let mut latest_table_ref: Arc; // potentially concurrently appended segments, init it to empty - let mut concurrently_appended_segment_locations: &[Location] = &[]; + let mut concurrently_appended_segment_locations: &[SegmentDescriptor] = &[]; // Status ctx.set_status_info("mutation: begin try to commit"); @@ -422,12 +422,12 @@ impl FuseTable { async fn merge_with_base( ctx: Arc, operator: Operator, - base_segments: &[Location], + base_segments: &[SegmentDescriptor], base_summary: &Statistics, - concurrently_appended_segment_locations: &[Location], + concurrently_appended_segment_locations: &[SegmentDescriptor], schema: TableSchemaRef, default_cluster_key_id: Option, - ) -> Result<(Vec, Statistics)> { + ) -> Result<(Vec, Statistics)> { if concurrently_appended_segment_locations.is_empty() { Ok((base_segments.to_owned(), base_summary.clone())) } else { @@ -439,8 +439,13 @@ impl FuseTable { .collect(); let fuse_segment_io = SegmentsIO::create(ctx, operator, schema); + // TODO + let locations = concurrently_appended_segment_locations + .iter() + .map(|v| v.location.clone()) + .collect::>(); let concurrent_appended_segment_infos = fuse_segment_io - .read_segments::(concurrently_appended_segment_locations, true) + .read_segments::(&locations, true) .await?; let mut new_statistics = base_summary.clone(); diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index bd1ed03e776d8..14d8344bb97cc 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -29,8 +29,9 @@ use databend_common_expression::TableSchemaRef; use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform; use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_table_meta::meta::BlockMeta; -use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SegmentDescriptor; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::SegmentSummary; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::Versioned; use itertools::Itertools; @@ -62,10 +63,10 @@ pub struct TableMutationAggregator { location_gen: TableMetaLocationGenerator, thresholds: BlockThresholds, default_cluster_key_id: Option, - base_segments: Vec, + base_segments: Vec, mutations: HashMap, - appended_segments: Vec, + appended_segments: Vec, appended_statistics: Statistics, removed_segment_indexes: Vec, removed_statistics: Statistics, @@ -105,7 +106,7 @@ impl TableMutationAggregator { pub fn new( table: &FuseTable, ctx: Arc, - base_segments: Vec, + base_segments: Vec, kind: MutationKind, ) -> Self { TableMutationAggregator { @@ -184,8 +185,16 @@ impl TableMutationAggregator { self.default_cluster_key_id, ); + let desc = SegmentDescriptor { + location: (segment_location, format_version), + summary: Some(SegmentSummary { + row_count: summary.row_count, + block_count: summary.block_count, + }), + }; self.appended_segments - .push((segment_location, format_version)) + //.push((segment_location, format_version)) + .push(desc) } MutationLogEntry::CompactExtras { extras } => { match self.mutations.entry(extras.segment_index) { @@ -241,8 +250,14 @@ impl TableMutationAggregator { &summary, self.default_cluster_key_id, ); - replaced_segments - .insert(result.index, (location, SegmentInfo::VERSION)); + let desc = SegmentDescriptor { + location: (location, SegmentInfo::VERSION), + summary: Some(SegmentSummary { + row_count: summary.row_count, + block_count: summary.block_count, + }), + }; + replaced_segments.insert(result.index, desc); } else { self.removed_segment_indexes.push(result.index); } @@ -311,7 +326,7 @@ impl TableMutationAggregator { let (new_blocks, origin_summary) = if let Some(loc) = location { // read the old segment let compact_segment_info = - SegmentsIO::read_compact_segment(op.clone(), loc, schema, false).await?; + SegmentsIO::read_compact_segment(op.clone(), loc.location, schema, false).await?; let mut segment_info = SegmentInfo::try_from(compact_segment_info)?; // take away the blocks, they are being mutated diff --git a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/snapshot_generator.rs index 41b51d70d68f7..487d952af10db 100644 --- a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/snapshot_generator.rs @@ -30,7 +30,7 @@ use databend_common_metrics::storage::*; use databend_common_sql::field_default_value; use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::ColumnStatistics; -use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SegmentDescriptor; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; use log::info; @@ -63,8 +63,8 @@ pub trait SnapshotGenerator { #[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq, Default)] pub struct SnapshotChanges { - pub appended_segments: Vec, - pub replaced_segments: HashMap, + pub appended_segments: Vec, + pub replaced_segments: HashMap, pub removed_segment_indexes: Vec, pub merged_statistics: Statistics, @@ -73,7 +73,20 @@ pub struct SnapshotChanges { impl SnapshotChanges { pub fn check_intersect(&self, other: &SnapshotChanges) -> bool { - if Self::is_slice_intersect(&self.appended_segments, &other.appended_segments) { + // TODO optimized + let left = self + .appended_segments + .iter() + .map(|i| i.location.clone()) + .collect::>(); + let right = other + .appended_segments + .iter() + .map(|i| i.location.clone()) + .collect::>(); + + // if Self::is_slice_intersect(&self.appended_segments, &other.appended_segments) { + if Self::is_slice_intersect(&left, &right) { return true; } for o in &other.replaced_segments { @@ -104,7 +117,7 @@ impl SnapshotChanges { #[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)] pub struct SnapshotMerged { - pub merged_segments: Vec, + pub merged_segments: Vec, pub merged_statistics: Statistics, } @@ -139,19 +152,19 @@ impl ConflictResolveContext { pub fn is_modified_segments_exists_in_latest( base: &TableSnapshot, latest: &TableSnapshot, - replaced_segments: &HashMap, + replaced_segments: &HashMap, removed_segments: &[usize], - ) -> Option<(Vec, HashMap)> { + ) -> Option<(Vec, HashMap)> { let latest_segments = latest .segments .iter() .enumerate() - .map(|(i, x)| (x, i)) + .map(|(i, x)| (&x.location, i)) .collect::>(); let mut removed = Vec::with_capacity(removed_segments.len()); for removed_segment in removed_segments { let removed_segment = &base.segments[*removed_segment]; - if let Some(position) = latest_segments.get(removed_segment) { + if let Some(position) = latest_segments.get(&removed_segment.location) { removed.push(*position); } else { return None; @@ -161,7 +174,7 @@ impl ConflictResolveContext { let mut replaced = HashMap::with_capacity(replaced_segments.len()); for (position, location) in replaced_segments { let origin_segment = &base.segments[*position]; - if let Some(position) = latest_segments.get(origin_segment) { + if let Some(position) = latest_segments.get(&origin_segment.location) { replaced.insert(*position, location.clone()); } else { return None; @@ -171,11 +184,11 @@ impl ConflictResolveContext { } pub fn merge_segments( - mut base_segments: Vec, - appended_segments: Vec, - replaced_segments: HashMap, + mut base_segments: Vec, + appended_segments: Vec, + replaced_segments: HashMap, removed_segment_indexes: Vec, - ) -> Vec { + ) -> Vec { replaced_segments .into_iter() .for_each(|(k, v)| base_segments[k] = v); diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index c8846a0d2d689..6312d4eb49f93 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -265,13 +265,23 @@ impl FuseTable { let partitions = if is_lazy { let mut segments = Vec::with_capacity(snapshot.segments.len()); for (idx, segment_location) in snapshot.segments.iter().enumerate() { - segments.push(FuseLazyPartInfo::create(idx, segment_location.clone())); + segments.push(FuseLazyPartInfo::create( + idx, + segment_location.location.clone(), + )); } Partitions::create(PartitionsShuffleKind::Mod, segments, true) } else { let projection = Projection::Columns(col_indices.clone()); let prune_ctx = MutationBlockPruningContext { - segment_locations: create_segment_location_vector(snapshot.segments.clone(), None), + segment_locations: create_segment_location_vector( + snapshot + .segments + .iter() + .map(|v| v.location.clone()) + .collect(), + None, + ), block_count: Some(snapshot.summary.block_count as usize), }; let (partitions, info) = self diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index 0be42a6d95863..c1a816e4d5ae3 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -293,14 +293,20 @@ impl FuseTable { }; // root snapshot cannot ignore storage not find error. + // TODO + let locations = root_snapshot + .segments + .iter() + .map(|v| v.location.clone()) + .collect::>(); let referenced_locations = self - .get_block_locations(ctx.clone(), &root_snapshot.segments, put_cache, false) + .get_block_locations(ctx.clone(), &locations, put_cache, false) .await?; let snapshot_lite = Arc::new(SnapshotLiteExtended { format_version: ver, snapshot_id: root_snapshot.snapshot_id, timestamp: root_snapshot.timestamp, - segments: HashSet::from_iter(root_snapshot.segments.clone()), + segments: HashSet::from_iter(root_snapshot.segments.iter().map(|v| v.location.clone())), table_statistics_location: root_snapshot.table_statistics_location.clone(), }); Ok(Some(RootSnapshotInfo { diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 4b4e58ac7cc53..7c864d8a2b483 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -109,6 +109,7 @@ impl BlockCompactMutator { let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4; for chunk in segment_locations.chunks(chunk_size) { // Read the segments information in parallel. + let chunk = &chunk.iter().map(|v| v.location.clone()).collect::>(); let mut segment_infos = segments_io .read_segments::>(chunk, false) .await? diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs index 9af8dc7a13ee8..787fededf3a3e 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs @@ -18,8 +18,9 @@ use std::time::Instant; use databend_common_catalog::lock::Lock; use databend_common_catalog::table::Table; use databend_common_exception::Result; -use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SegmentDescriptor; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::SegmentSummary; use databend_storages_common_table_meta::meta::Statistics; use log::info; use metrics::gauge; @@ -38,7 +39,7 @@ use crate::TableContext; #[derive(Default)] pub struct SegmentCompactionState { // locations of all the segments(compacted, and unchanged) - pub segments_locations: Vec, + pub segments_locations: Vec, // paths of all the newly created segments (which are compacted), need this to rollback the compaction pub new_segment_paths: Vec, // number of fragmented segments compacted @@ -177,7 +178,7 @@ pub struct SegmentCompactor<'a> { threshold: u64, default_cluster_key_id: Option, // fragmented segment collected so far, it will be reset to empty if compaction occurs - fragmented_segments: Vec<(SegmentInfo, Location)>, + fragmented_segments: Vec<(SegmentInfo, SegmentDescriptor)>, // state which keep the number of blocks of all the fragmented segment collected so far, // it will be reset to 0 if compaction occurs accumulated_num_blocks: u64, @@ -211,7 +212,7 @@ impl<'a> SegmentCompactor<'a> { #[async_backtrace::framed] pub async fn compact( mut self, - reverse_locations: Vec, + reverse_locations: Vec, limit: usize, status_callback: T, ) -> Result @@ -226,8 +227,9 @@ impl<'a> SegmentCompactor<'a> { let mut checked_end_at = 0; let mut is_end = false; for chunk in reverse_locations.chunks(chunk_size) { + let locations = chunk.iter().map(|v| v.location.clone()).collect::>(); let mut segment_infos = segments_io - .read_segments::(chunk, false) + .read_segments::(&locations, false) .await? .into_iter() .zip(chunk.iter()) @@ -301,7 +303,11 @@ impl<'a> SegmentCompactor<'a> { // accumulate one segment #[async_backtrace::framed] - pub async fn add(&mut self, segment_info: SegmentInfo, location: Location) -> Result<()> { + pub async fn add( + &mut self, + segment_info: SegmentInfo, + location: SegmentDescriptor, + ) -> Result<()> { let num_blocks_current_segment = segment_info.blocks.len() as u64; if num_blocks_current_segment == 0 { @@ -325,6 +331,14 @@ impl<'a> SegmentCompactor<'a> { // lesser than threshold. this happens if the size of segment BEFORE compaction // is already larger than threshold. self.compact_fragments().await?; + + // let desc = SegmentDescriptor { + // location: location.location, + // summary: Some(SegmentSummary { + // block_count: segment_info.summary.block_count, + // row_count: segment_info.summary.row_count, + // }), + //}; self.compacted_state.segments_locations.push(location); } @@ -367,11 +381,21 @@ impl<'a> SegmentCompactor<'a> { // 2.2 write down new segment let new_segment = SegmentInfo::new(blocks, new_statistics); + let seg_summary = SegmentSummary { + row_count: new_segment.summary.row_count, + block_count: new_segment.summary.block_count, + }; + let location = self.segment_writer.write_segment(new_segment).await?; self.compacted_state .new_segment_paths .push(location.0.clone()); - self.compacted_state.segments_locations.push(location); + self.compacted_state + .segments_locations + .push(SegmentDescriptor { + location, + summary: Some(seg_summary), + }); Ok(()) } diff --git a/src/query/storages/fuse/src/operations/mutation/processors/recluster_aggregator.rs b/src/query/storages/fuse/src/operations/mutation/processors/recluster_aggregator.rs index f6d878dae7e8a..d098a19d5c6d4 100644 --- a/src/query/storages/fuse/src/operations/mutation/processors/recluster_aggregator.rs +++ b/src/query/storages/fuse/src/operations/mutation/processors/recluster_aggregator.rs @@ -28,7 +28,9 @@ use databend_common_expression::DataBlock; use databend_common_metrics::storage::metrics_inc_recluster_write_block_nums; use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform; use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::SegmentDescriptor; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::SegmentSummary; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::Versioned; use itertools::Itertools; @@ -107,7 +109,15 @@ impl AsyncAccumulatingTransform for ReclusterAggregator { let appended = new_segments.split_off(removed_segments_len); for (location, stats) in appended.into_iter().rev() { self.abort_operation.add_segment(location.clone()); - appended_segments.push((location, SegmentInfo::VERSION)); + let desc = SegmentDescriptor { + location: (location, SegmentInfo::VERSION), + summary: Some(SegmentSummary { + block_count: stats.block_count, + row_count: stats.row_count, + }), + }; + + appended_segments.push(desc); merge_statistics_mut(&mut merged_statistics, &stats, default_cluster_key); } } @@ -115,9 +125,17 @@ impl AsyncAccumulatingTransform for ReclusterAggregator { for (i, (location, stats)) in new_segments.into_iter().enumerate() { // The old segments will be replaced with the news. self.abort_operation.add_segment(location.clone()); + let desc = SegmentDescriptor { + location: (location, SegmentInfo::VERSION), + summary: Some(SegmentSummary { + block_count: stats.block_count, + row_count: stats.row_count, + }), + }; replaced_segments.insert( self.removed_segment_indexes[i], - (location, SegmentInfo::VERSION), + //(location, SegmentInfo::VERSION), + desc, ); merge_statistics_mut(&mut merged_statistics, &stats, default_cluster_key); } diff --git a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs index e23d1a362e4c8..00dcfd1bf2e67 100644 --- a/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/native_rows_fetcher.rs @@ -197,7 +197,7 @@ impl NativeRowsFetcher { } let (segment, block) = split_prefix(prefix); - let (location, ver) = snapshot.segments[segment as usize].clone(); + let (location, ver) = snapshot.segments[segment as usize].location.clone(); let compact_segment_info = self .segment_reader .read(&LoadParams { diff --git a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs index e53c7b383a9d6..db4469f9a2e4f 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_rows_fetcher.rs @@ -190,7 +190,7 @@ impl ParquetRowsFetcher { } let (segment, block) = split_prefix(prefix); - let (location, ver) = snapshot.segments[segment as usize].clone(); + let (location, ver) = snapshot.segments[segment as usize].location.clone(); let compact_segment_info = self .segment_reader .read(&LoadParams { diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 85077576d0403..26eacd3dda75a 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -79,7 +79,10 @@ impl FuseTable { if (!dry_run && snapshot.segments.len() > nodes_num) || is_lazy { let mut segments = Vec::with_capacity(snapshot.segments.len()); for (idx, segment_location) in snapshot.segments.iter().enumerate() { - segments.push(FuseLazyPartInfo::create(idx, segment_location.clone())) + segments.push(FuseLazyPartInfo::create( + idx, + segment_location.location.clone(), + )) } return Ok(( @@ -101,7 +104,7 @@ impl FuseTable { for (idx, segment_location) in snapshot.segments.iter().enumerate() { segments_location.push(SegmentLocation { segment_idx: idx, - location: segment_location.clone(), + location: segment_location.location.clone(), snapshot_loc: snapshot_loc.clone(), }); } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index a09489bab4521..12dbe2529219f 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -114,7 +114,11 @@ impl FuseTable { max_tasks, )?; - let segment_locations = snapshot.segments.clone(); + let segment_locations = snapshot + .segments + .iter() + .map(|v| v.location.clone()) + .collect(); let segment_locations = create_segment_location_vector(segment_locations, None); let max_threads = settings.get_max_threads()? as usize; diff --git a/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information.rs b/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information.rs index db613bad5184d..4cba52c7922f5 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information.rs @@ -108,8 +108,10 @@ impl<'a> ClusteringInformation<'a> { let total_block_count = snapshot.summary.block_count; let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4; for chunk in snapshot.segments.chunks(chunk_size) { + // TODO + let locations = chunk.iter().map(|v| v.location.clone()).collect::>(); let segments = segments_io - .read_segments::(chunk, true) + .read_segments::(&locations, true) .await?; for segment in segments.into_iter().flatten() { diff --git a/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block.rs b/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block.rs index 65653201fa7c2..a90327634aeaf 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block.rs @@ -123,8 +123,10 @@ impl<'a> FuseBlock<'a> { let chunk_size = std::cmp::min(self.ctx.get_settings().get_max_threads()? as usize * 4, len).max(1); 'FOR: for chunk in snapshot.segments.chunks(chunk_size) { + // TODO + let locations = chunk.iter().map(|v| v.location.clone()).collect::>(); let segments = segments_io - .read_segments::(chunk, true) + .read_segments::(&locations, true) .await?; for segment in segments { let segment = segment?; diff --git a/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column.rs b/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column.rs index b4082995abbd8..c3290210d45c7 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column.rs @@ -131,8 +131,10 @@ impl<'a> FuseColumn<'a> { let mut end = false; 'FOR: for chunk in snapshot.segments.chunks(chunk_size) { + // TODO + let locations = chunk.iter().map(|v| v.location.clone()).collect::>(); let segments = segments_io - .read_segments::(chunk, true) + .read_segments::(&locations, true) .await?; for segment in segments { let segment = segment?; diff --git a/src/query/storages/fuse/src/table_functions/fuse_encodings/fuse_encoding.rs b/src/query/storages/fuse/src/table_functions/fuse_encodings/fuse_encoding.rs index fd88c487e9bd4..a91ed11eb992b 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_encodings/fuse_encoding.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_encodings/fuse_encoding.rs @@ -97,8 +97,10 @@ impl<'a> FuseEncoding<'a> { let schema = table.schema(); let fields = schema.fields(); for chunk in snapshot.segments.chunks(chunk_size) { + // TODO + let locations = chunk.iter().map(|v| v.location.clone()).collect::>(); let segments = segments_io - .read_segments::(chunk, false) + .read_segments::(&locations, false) .await?; for segment in segments { let segment = segment?; diff --git a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment.rs b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment.rs index 8c109b65251d7..d586f33f3436a 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment.rs @@ -25,7 +25,7 @@ use databend_common_expression::TableDataType; use databend_common_expression::TableField; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRefExt; -use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::SegmentDescriptor; use databend_storages_common_table_meta::meta::SegmentInfo; use futures_util::TryStreamExt; @@ -92,7 +92,7 @@ impl<'a> FuseSegment<'a> { } #[async_backtrace::framed] - async fn to_block(&self, segment_locations: &[Location]) -> Result { + async fn to_block(&self, segment_locations: &[SegmentDescriptor]) -> Result { let limit = self.limit.unwrap_or(usize::MAX); let len = std::cmp::min(segment_locations.len(), limit); @@ -114,18 +114,20 @@ impl<'a> FuseSegment<'a> { let chunk_size = std::cmp::min(self.ctx.get_settings().get_max_threads()? as usize * 4, len).max(1); for chunk in segment_locations.chunks(chunk_size) { + // TODO + let locations = chunk.iter().map(|v| v.location.clone()).collect::>(); let segments = segments_io - .read_segments::(chunk, true) + .read_segments::(&locations, true) .await?; for (idx, segment) in segments.into_iter().enumerate() { let segment = segment?; - format_versions.push(segment_locations[idx].1); + format_versions.push(segment_locations[idx].location.1); block_count.push(segment.summary.block_count); row_count.push(segment.summary.row_count); compressed.push(segment.summary.compressed_byte_size); uncompressed.push(segment.summary.uncompressed_byte_size); - file_location.push(segment_locations[idx].0.clone()); + file_location.push(segment_locations[idx].location.0.clone()); row_num += 1; if row_num >= limit { diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index 2bb9cf923f076..38f4fdbe27fb2 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -192,7 +192,8 @@ impl StreamTable { ) -> Result<(Vec>, Vec>)> { let operator = fuse_table.get_operator(); let latest_segments = if let Some(snapshot) = fuse_table.read_table_snapshot().await? { - HashSet::from_iter(snapshot.segments.clone()) + // TODO + HashSet::from_iter(snapshot.segments.iter().map(|v| v.location.clone())) } else { HashSet::new() }; @@ -200,7 +201,8 @@ impl StreamTable { let base_segments = if let Some(snapshot_location) = &self.snapshot_location { let (base_snapshot, _) = SnapshotsIO::read_snapshot(snapshot_location.clone(), operator.clone()).await?; - HashSet::from_iter(base_snapshot.segments.clone()) + // TODO + HashSet::from_iter(base_snapshot.segments.iter().map(|v| v.location.clone())) } else { HashSet::new() };