Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ homepage = "https://github.com/apache/hudi-rs"
repository = "https://github.com/apache/hudi-rs"

[workspace.dependencies]
log = "^0.4"

# arrow
arrow = { version = "= 52.2.0", features = ["pyarrow"] }
arrow-arith = { version = "= 52.2.0" }
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
log = { workspace = true}

[dev-dependencies]
hudi-tests = { path = "../tests" }
Expand All @@ -73,6 +74,5 @@ hudi-tests = { path = "../tests" }
datafusion = [
"dep:datafusion",
"datafusion-expr",
"datafusion-common",
"datafusion-physical-expr",
]
1 change: 1 addition & 0 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl ConfigParser for HudiTableConfig {
match self {
Self::DatabaseName => Some(HudiConfigValue::String("default".to_string())),
Self::DropsPartitionFields => Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec!["".to_string()])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
_ => None,
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ mod tests {
.base_file
.commit_time,
"20240402123035233"
)
);
assert!(fg.get_file_slice_as_of("-1").is_none());
}

#[test]
Expand Down
226 changes: 178 additions & 48 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use url::Url;

use crate::config::HudiConfigs;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partitions::{HudiTablePartition, PartitionFilter};
use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use dashmap::DashMap;
use log::warn;
use url::Url;

#[derive(Clone, Debug)]
#[allow(dead_code)]
Expand All @@ -45,18 +47,19 @@
configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, &storage_options)?;
let partition_paths = Self::load_partition_paths(&storage).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&storage, partition_paths).await?;
let partition_to_file_groups = Arc::new(DashMap::from_iter(partition_to_file_groups));
let partition_to_file_groups = Arc::new(DashMap::new());

Check warning on line 50 in crates/core/src/table/fs_view.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/fs_view.rs#L50

Added line #L50 was not covered by tests
Ok(FileSystemView {
configs,
storage,
partition_to_file_groups,
})
}

async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
async fn load_partition_paths(
storage: &Storage,
partition_filters: &[PartitionFilter],
partition_schema: &Schema,
) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await?
Expand All @@ -70,7 +73,31 @@
if partition_paths.is_empty() {
partition_paths.push("".to_string())
}
Ok(partition_paths)
if partition_filters.is_empty() {
return Ok(partition_paths);
}
let field_and_data_type: HashMap<_, _> = partition_schema
.fields()
.iter()
.map(|field| (field.name().to_string(), field.data_type().clone()))
.collect();

Ok(partition_paths
.into_iter()
.filter(|path_str| {
match path_str
.split('/')
.map(|s| HudiTablePartition::try_from((s, &field_and_data_type)))
.collect::<Result<Vec<_>>>()
{
Ok(parts) => partition_filters.iter().all(|filter| filter.match_partitions(&parts)),
Err(e) => {
warn!("Failed to parse partitions for path {}: {}. Including this partition by default.", path_str, e);
true // include the partition despite the error
},
}
})
.collect())
}

async fn load_file_groups_for_partitions(
Expand Down Expand Up @@ -121,47 +148,39 @@
Ok(file_groups)
}

pub fn get_file_slices_as_of(
pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
excluding_file_groups: &HashSet<FileGroup>,
partition_filter: &[PartitionFilter],
partition_schema: &Schema,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.iter() {
let fgs_ref = fgs.value();
for fg in fgs_ref {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
// TODO: pass ref instead of copying
file_slices.push(fsl.clone());
}
}
if self.partition_to_file_groups.is_empty() {
let partition_paths =
Self::load_partition_paths(&self.storage, partition_filter, partition_schema)
.await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&self.storage, partition_paths).await?;
partition_to_file_groups.into_iter().for_each(|pair| {
self.partition_to_file_groups.insert(pair.0, pair.1);
});
}
Ok(file_slices)
}

pub async fn load_file_slices_stats_as_of(
&self,
timestamp: &str,
exclude_file_groups: &HashSet<FileGroup>,
) -> Result<()> {
for mut fgs in self.partition_to_file_groups.iter_mut() {
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
if exclude_file_groups.contains(fg) {
if excluding_file_groups.contains(fg) {

Check warning on line 172 in crates/core/src/table/fs_view.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/fs_view.rs#L172

Added line #L172 was not covered by tests
continue;
}
if let Some(file_slice) = fg.get_file_slice_mut_as_of(timestamp) {
file_slice
.load_stats(&self.storage)
.await
.expect("Successful loading file stats.");
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
// TODO: pass ref instead of copying
fsl.load_stats(&self.storage).await?;
let immut_fsl: &FileSlice = fsl;
file_slices.push(immut_fsl.clone());
}
}
}
Ok(())
Ok(file_slices)
}

pub async fn read_file_slice_by_path_unchecked(
Expand All @@ -174,26 +193,36 @@
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
.await
}

pub fn reset(&mut self) {
self.partition_to_file_groups = Arc::new(DashMap::new())
}
}

#[cfg(test)]
mod tests {
use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use hudi_tests::TestTable;

use crate::config::HudiConfigs;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::partitions::PartitionFilter;
use crate::table::Table;

#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
.await
.unwrap();
let partition_paths = FileSystemView::load_partition_paths(
&storage,
&hudi_table.partition_filters,
&hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(partition_path_set, HashSet::from([""]))
Expand All @@ -202,10 +231,15 @@
#[tokio::test]
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
.await
.unwrap();
let partition_paths = FileSystemView::load_partition_paths(
&storage,
&hudi_table.partition_filters,
&hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(
Expand All @@ -221,6 +255,7 @@
#[tokio::test]
async fn fs_view_get_latest_file_slices() {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = FileSystemView::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Expand All @@ -229,15 +264,110 @@
.await
.unwrap();

assert!(fs_view.partition_to_file_groups.is_empty());
let excludes = HashSet::new();
let file_slices = fs_view
.get_file_slices_as_of("20240418173551906", &excludes)
.get_file_slices_as_of(
"20240418173551906",
&excludes,
&hudi_table.partition_filters,
&hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4);
}
}

#[tokio::test]
async fn fs_view_get_latest_file_slices_with_replace_commit() {
let base_url = TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = FileSystemView::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();

assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let excludes = &hudi_table
.timeline
.get_replaced_file_groups()
.await
.unwrap();
let file_slices = fs_view
.get_file_slices_as_of(
"20240707001303088",
excludes,
&hudi_table.partition_filters,
&hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 3);
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1);
}
}

#[tokio::test]
async fn fs_view_get_latest_file_slices_no_hive_style_filter() {
let base_url = TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = FileSystemView::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let excludes = &hudi_table
.timeline
.get_replaced_file_groups()
.await
.unwrap();
let partition_schema = hudi_table.get_partition_schema().await.unwrap();
let byte_field_data_type = partition_schema
.field_with_name("byteField")
.unwrap()
.data_type();
let short_eq_10 =
PartitionFilter::try_from(("byteField", "=", "10", byte_field_data_type)).unwrap();
let file_slices = fs_view
.get_file_slices_as_of(
"20240707001303088",
excludes,
&[short_eq_10],
&hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 3);
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1);
}
}
}
Loading
Loading