diff --git a/Cargo.toml b/Cargo.toml index 09c3ff75..595aeda5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ resolver = "2" version = "0.5.0-dev" edition = "2021" license = "Apache-2.0" -rust-version = "1.84" +rust-version = "1.85" keywords = ["apachehudi", "hudi", "datalake", "arrow"] readme = "README.md" description = "The native Rust implementation for Apache Hudi" diff --git a/crates/core/src/config/internal.rs b/crates/core/src/config/internal.rs index 61f74c68..ae9440a5 100644 --- a/crates/core/src/config/internal.rs +++ b/crates/core/src/config/internal.rs @@ -37,18 +37,26 @@ use crate::config::{ConfigParser, HudiConfigValue}; /// use hudi_core::table::Table as HudiTable; /// /// let options = [(SkipConfigValidation, "true")]; -/// HudiTable::new_with_options("/tmp/hudi_data", options) +/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options); /// ``` /// #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiInternalConfig { SkipConfigValidation, + /// Enable reading archived timeline (v1) and LSM history (v2) + TimelineArchivedReadEnabled, + /// Behavior when archived timeline is requested but not available/enabled: "error" or "continue" + TimelineArchivedUnavailableBehavior, } impl AsRef for HudiInternalConfig { fn as_ref(&self) -> &str { match self { Self::SkipConfigValidation => "hoodie.internal.skip.config.validation", + Self::TimelineArchivedReadEnabled => "hoodie.internal.timeline.archived.enabled", + Self::TimelineArchivedUnavailableBehavior => { + "hoodie.internal.timeline.archived.unavailable.behavior" + } } } } @@ -65,6 +73,11 @@ impl ConfigParser for HudiInternalConfig { fn default_value(&self) -> Option { match self { Self::SkipConfigValidation => Some(HudiConfigValue::Boolean(false)), + Self::TimelineArchivedReadEnabled => Some(HudiConfigValue::Boolean(false)), + // default to continue (graceful) when archived is unavailable/disabled + Self::TimelineArchivedUnavailableBehavior => { + Some(HudiConfigValue::String("continue".to_string())) + } } } @@ -80,6 +93,14 @@ impl ConfigParser for HudiInternalConfig { bool::from_str(v).map_err(|e| ParseBool(self.key(), v.to_string(), e)) }) .map(HudiConfigValue::Boolean), + Self::TimelineArchivedReadEnabled => get_result + .and_then(|v| { + bool::from_str(v).map_err(|e| ParseBool(self.key(), v.to_string(), e)) + }) + .map(HudiConfigValue::Boolean), + Self::TimelineArchivedUnavailableBehavior => { + get_result.map(|v| HudiConfigValue::String(v.to_string())) + } } } } diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs index f550724c..e4c14213 100644 --- a/crates/core/src/config/read.rs +++ b/crates/core/src/config/read.rs @@ -37,7 +37,7 @@ use crate::config::{ConfigParser, HudiConfigValue}; /// use hudi_core::table::Table as HudiTable; /// /// let options = [(InputPartitions, "2")]; -/// HudiTable::new_with_options("/tmp/hudi_data", options) +/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options); /// ``` /// diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs index 5264236c..092e29f7 100644 --- a/crates/core/src/config/table.rs +++ b/crates/core/src/config/table.rs @@ -117,6 +117,12 @@ pub enum HudiTableConfig { /// /// - [`TimelineTimezoneValue`] - Possible values for this configuration. TimelineTimezone, + + /// Folder for archived timeline files for layout v1 (default: .hoodie/archived) + ArchiveLogFolder, + + /// Path for LSM timeline history for layout v2 (default: .hoodie/timeline/history) + TimelineHistoryPath, } impl AsRef for HudiTableConfig { @@ -141,6 +147,8 @@ impl AsRef for HudiTableConfig { Self::TableVersion => "hoodie.table.version", Self::TimelineLayoutVersion => "hoodie.timeline.layout.version", Self::TimelineTimezone => "hoodie.table.timeline.timezone", + Self::ArchiveLogFolder => "hoodie.archivelog.folder", + Self::TimelineHistoryPath => "hoodie.timeline.history.path", } } } @@ -166,6 +174,9 @@ impl ConfigParser for HudiTableConfig { Self::TimelineTimezone => Some(HudiConfigValue::String( TimelineTimezoneValue::UTC.as_ref().to_string(), )), + // Fallbacks are handled in callers; no defaults here + Self::ArchiveLogFolder => None, + Self::TimelineHistoryPath => None, _ => None, } } @@ -238,6 +249,8 @@ impl ConfigParser for HudiTableConfig { Self::TimelineTimezone => get_result .and_then(TimelineTimezoneValue::from_str) .map(|v| HudiConfigValue::String(v.as_ref().to_string())), + Self::ArchiveLogFolder => get_result.map(|v| HudiConfigValue::String(v.to_string())), + Self::TimelineHistoryPath => get_result.map(|v| HudiConfigValue::String(v.to_string())), } } diff --git a/crates/core/src/file_group/reader.rs b/crates/core/src/file_group/reader.rs index b5623381..1b72f628 100644 --- a/crates/core/src/file_group/reader.rs +++ b/crates/core/src/file_group/reader.rs @@ -75,8 +75,8 @@ impl FileGroupReader { /// Creates a new reader with the given base URI and options. /// /// # Arguments - /// * `base_uri` - The base URI of the file group's residing table. - /// * `options` - Additional options for the reader. + /// * `base_uri` - The base URI of the file group's residing table. + /// * `options` - Additional options for the reader. /// /// # Notes /// This API uses [`OptionResolver`] that loads table properties from storage to resolve options. @@ -164,7 +164,7 @@ impl FileGroupReader { /// Reads the data from the base file at the given relative path. /// /// # Arguments - /// * `relative_path` - The relative path to the base file. + /// * `relative_path` - The relative path to the base file. /// /// # Returns /// A record batch read from the base file. @@ -216,7 +216,7 @@ impl FileGroupReader { /// Reads the data from the given file slice. /// /// # Arguments - /// * `file_slice` - The file slice to read. + /// * `file_slice` - The file slice to read. /// /// # Returns /// A record batch read from the file slice. diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 3ba206a6..9438db5a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -23,11 +23,11 @@ //! **Example** //! //! ```rust -//! use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp, InputPartitions}; +//! use hudi_core::config::read::HudiReadConfig::InputPartitions; //! use hudi_core::table::Table as HudiTable; //! -//! let options = [(InputPartitions, "2"), (AsOfTimestamp, "20240101010100000")]; -//! HudiTable::new_with_options("/tmp/hudi_data", options); +//! let options = [(InputPartitions, "2")]; +//! HudiTable::new_with_options_blocking("/tmp/hudi_data", options); //! ``` //! //! # The [table] module is responsible for managing Hudi tables. diff --git a/crates/core/src/timeline/builder.rs b/crates/core/src/timeline/builder.rs new file mode 100644 index 00000000..432a8aa1 --- /dev/null +++ b/crates/core/src/timeline/builder.rs @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config::table::HudiTableConfig::{TableVersion, TimelineLayoutVersion}; +use crate::config::HudiConfigs; +use crate::error::CoreError; +use crate::storage::Storage; +use crate::timeline::loader::TimelineLoader; +use crate::timeline::Timeline; +use crate::Result; +use std::sync::Arc; + +pub struct TimelineBuilder { + hudi_configs: Arc, + storage: Arc, +} + +impl TimelineBuilder { + pub fn new(hudi_configs: Arc, storage: Arc) -> Self { + Self { + hudi_configs, + storage, + } + } + + pub async fn build(self) -> Result { + let (active_loader, archived_loader) = self.resolve_loader_config()?; + let timeline = Timeline::new( + self.hudi_configs, + self.storage, + active_loader, + archived_loader, + ); + Ok(timeline) + } + + fn resolve_loader_config(&self) -> Result<(TimelineLoader, Option)> { + let table_version = match self.hudi_configs.get(TableVersion) { + Ok(v) => v.to::(), + Err(_) => { + return Ok(( + TimelineLoader::LayoutOneActive(self.storage.clone()), + Some(TimelineLoader::LayoutOneArchived(self.storage.clone())), + )) + } + }; + + let layout_version = self + .hudi_configs + .try_get(TimelineLayoutVersion) + .map(|v| v.to::()) + .unwrap_or_else(|| if table_version == 8 { 2 } else { 1 }); + + match layout_version { + 1 => { + if !(6..=8).contains(&table_version) { + return Err(CoreError::Unsupported(format!( + "Unsupported table version {} with timeline layout version {}", + table_version, layout_version + ))); + } + Ok(( + TimelineLoader::LayoutOneActive(self.storage.clone()), + Some(TimelineLoader::LayoutOneArchived(self.storage.clone())), + )) + } + 2 => { + if table_version != 8 { + return Err(CoreError::Unsupported(format!( + "Unsupported table version {} with timeline layout version {}", + table_version, layout_version + ))); + } + Ok(( + TimelineLoader::LayoutTwoActive(self.storage.clone()), + Some(TimelineLoader::LayoutTwoArchived(self.storage.clone())), + )) + } + _ => Err(CoreError::Unsupported(format!( + "Unsupported timeline layout version: {}", + layout_version + ))), + } + } +} diff --git a/crates/core/src/timeline/loader.rs b/crates/core/src/timeline/loader.rs new file mode 100644 index 00000000..730fb281 --- /dev/null +++ b/crates/core/src/timeline/loader.rs @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::config::internal::HudiInternalConfig::{ + TimelineArchivedReadEnabled, TimelineArchivedUnavailableBehavior, +}; +use crate::config::table::HudiTableConfig::ArchiveLogFolder; +use crate::config::HudiConfigs; +use crate::error::CoreError; +use crate::metadata::HUDI_METADATA_DIR; +use crate::storage::Storage; +use crate::timeline::instant::Instant; +use crate::timeline::lsm_tree::LSM_TIMELINE_DIR; +use crate::timeline::selector::TimelineSelector; +use crate::Result; +use log::debug; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub enum TimelineLoader { + LayoutOneActive(Arc), + LayoutOneArchived(Arc), + LayoutTwoActive(Arc), + LayoutTwoArchived(Arc), +} + +impl TimelineLoader { + pub async fn load_instants( + &self, + selector: &TimelineSelector, + desc: bool, + ) -> Result> { + match self { + TimelineLoader::LayoutOneActive(storage) => { + let files = storage.list_files(Some(HUDI_METADATA_DIR)).await?; + let mut instants = Vec::with_capacity(files.len() / 3); + + for file_info in files { + match selector.try_create_instant(file_info.name.as_str()) { + Ok(instant) => instants.push(instant), + Err(e) => { + debug!( + "Instant not created from file {:?} due to: {:?}", + file_info, e + ); + } + } + } + + instants.sort_unstable(); + instants.shrink_to_fit(); + + if desc { + Ok(instants.into_iter().rev().collect()) + } else { + Ok(instants) + } + } + TimelineLoader::LayoutTwoActive(storage) => { + let files = storage.list_files(Some(LSM_TIMELINE_DIR)).await?; + let mut instants = Vec::new(); + + for file_info in files { + if file_info.name.starts_with("history/") || file_info.name.ends_with(".crc") { + continue; + } + match selector.try_create_instant(file_info.name.as_str()) { + Ok(instant) => instants.push(instant), + Err(e) => { + debug!( + "Instant not created from file {:?} due to: {:?}", + file_info, e + ); + } + } + } + + instants.sort_unstable(); + instants.shrink_to_fit(); + + if desc { + Ok(instants.into_iter().rev().collect()) + } else { + Ok(instants) + } + } + _ => Err(CoreError::Unsupported( + "Loading from this timeline layout is not implemented yet.".to_string(), + )), + } + } + + pub(crate) async fn load_archived_instants( + &self, + selector: &TimelineSelector, + desc: bool, + ) -> Result> { + // Config wiring: if archived read not enabled, return behavior based on policy + let storage = match self { + TimelineLoader::LayoutOneArchived(storage) + | TimelineLoader::LayoutTwoArchived(storage) => storage.clone(), + _ => return Ok(Vec::new()), // Active loaders don't have archived parts + }; + + let configs: Arc = storage.hudi_configs.clone(); + let enabled = configs + .get_or_default(TimelineArchivedReadEnabled) + .to::(); + if !enabled { + let behavior = configs + .get_or_default(TimelineArchivedUnavailableBehavior) + .to::() + .to_ascii_lowercase(); + return match behavior.as_str() { + "error" => Err(CoreError::Unsupported( + "Archived timeline read is disabled; shorten time range or enable archived read" + .to_string(), + )), + _ => Ok(Vec::new()), // continue silently with empty archived + }; + } + + match self { + TimelineLoader::LayoutOneArchived(storage) => { + // Resolve archive folder from configs or fallback + let archive_dir = configs + .try_get(ArchiveLogFolder) + .map(|v| v.to::()) + .unwrap_or_else(|| ".hoodie/archived".to_string()); + + // List files and try creating instants through selector + let files = storage.list_files(Some(&archive_dir)).await?; + let mut instants = Vec::new(); + for file_info in files { + if file_info.name.ends_with(".crc") { + continue; + } + if let Ok(instant) = selector.try_create_instant(file_info.name.as_str()) { + instants.push(instant); + } + } + instants.sort_unstable(); + if desc { + instants.reverse(); + } + Ok(instants) + } + TimelineLoader::LayoutTwoArchived(storage) => { + // TODO: Implement v2 LSM history reader (hoodie.timeline.history.path). For now, return empty. + let _ = (storage, selector, desc); + Ok(Vec::new()) + } + _ => Ok(Vec::new()), + } + } +} diff --git a/crates/core/src/timeline/lsm_tree.rs b/crates/core/src/timeline/lsm_tree.rs new file mode 100644 index 00000000..5d4724bb --- /dev/null +++ b/crates/core/src/timeline/lsm_tree.rs @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::error::CoreError; +use crate::storage::Storage; +use crate::Result; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +pub const LSM_TIMELINE_DIR: &str = ".hoodie/timeline"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TimelineManifest { + pub version: i64, + pub entries: Vec, +} + +/// Entry in an LSM timeline manifest. +/// Each entry describes a compacted timeline file covering a time range +/// in the LSM history directory. +/// - `file_name`: relative path of the compacted timeline file under history +/// - `min_instant`/`max_instant`: smallest/largest instant timestamps covered +/// - `level`: LSM level where this file resides +/// - `file_size`: size in bytes of the file +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ManifestEntry { + pub file_name: String, + pub min_instant: String, + pub max_instant: String, + pub level: i32, + pub file_size: i64, +} + +pub struct LSMTree { + storage: Arc, +} + +impl LSMTree { + pub fn new(storage: Arc) -> Self { + Self { storage } + } + + pub async fn read_manifest(&self) -> Result> { + let version_path = format!("{}/history/_version_", LSM_TIMELINE_DIR); + if let Ok(data) = self.storage.get_file_data(&version_path).await { + let version_str = + String::from_utf8(data.to_vec()).map_err(|e| CoreError::Timeline(e.to_string()))?; + let version = version_str + .trim() + .parse::() + .map_err(|e| CoreError::Timeline(e.to_string()))?; + let manifest_path = format!("{}/history/manifest_{}", LSM_TIMELINE_DIR, version); + let data = self.storage.get_file_data(&manifest_path).await?; + let manifest: TimelineManifest = + serde_json::from_slice(&data).map_err(|e| CoreError::Timeline(e.to_string()))?; + Ok(Some(manifest)) + } else { + Ok(None) + } + } +} diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs index 094c07d4..87ddd6d3 100644 --- a/crates/core/src/timeline/mod.rs +++ b/crates/core/src/timeline/mod.rs @@ -16,7 +16,10 @@ * specific language governing permissions and limitations * under the License. */ +pub mod builder; pub mod instant; +pub mod loader; +pub mod lsm_tree; pub(crate) mod selector; pub(crate) mod util; @@ -24,17 +27,19 @@ use crate::config::HudiConfigs; use crate::error::CoreError; use crate::file_group::builder::{build_file_groups, build_replaced_file_groups, FileGroupMerger}; use crate::file_group::FileGroup; -use crate::metadata::HUDI_METADATA_DIR; + use crate::schema::resolver::{ resolve_avro_schema_from_commit_metadata, resolve_schema_from_commit_metadata, }; use crate::storage::Storage; +use crate::timeline::builder::TimelineBuilder; use crate::timeline::instant::Action; +use crate::timeline::loader::TimelineLoader; use crate::timeline::selector::TimelineSelector; use crate::Result; use arrow_schema::Schema; use instant::Instant; -use log::debug; + use serde_json::{Map, Value}; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; @@ -46,6 +51,8 @@ use std::sync::Arc; pub struct Timeline { hudi_configs: Arc, pub(crate) storage: Arc, + active_loader: TimelineLoader, + archived_loader: Option, pub completed_commits: Vec, } @@ -54,18 +61,19 @@ pub const DEFAULT_LOADING_ACTIONS: &[Action] = &[Action::Commit, Action::DeltaCommit, Action::ReplaceCommit]; impl Timeline { - #[cfg(test)] - pub(crate) async fn new_from_completed_commits( + pub(crate) fn new( hudi_configs: Arc, - storage_options: Arc>, - completed_commits: Vec, - ) -> Result { - let storage = Storage::new(storage_options.clone(), hudi_configs.clone())?; - Ok(Self { + storage: Arc, + active_loader: TimelineLoader, + archived_loader: Option, + ) -> Self { + Self { hudi_configs, storage, - completed_commits, - }) + active_loader, + archived_loader, + completed_commits: Vec::new(), + } } pub(crate) async fn new_from_storage( @@ -73,58 +81,51 @@ impl Timeline { storage_options: Arc>, ) -> Result { let storage = Storage::new(storage_options.clone(), hudi_configs.clone())?; + let mut timeline = TimelineBuilder::new(hudi_configs, storage).build().await?; let selector = TimelineSelector::completed_actions_in_range( DEFAULT_LOADING_ACTIONS, - hudi_configs.clone(), + timeline.hudi_configs.clone(), None, None, )?; - let completed_commits = Self::load_instants(&selector, &storage, false).await?; - Ok(Self { - hudi_configs, - storage, - completed_commits, - }) + timeline.completed_commits = timeline.load_instants(&selector, false).await?; + Ok(timeline) } - async fn load_instants( + pub async fn load_instants( + &self, selector: &TimelineSelector, - storage: &Storage, desc: bool, ) -> Result> { - let files = storage.list_files(Some(HUDI_METADATA_DIR)).await?; - - // For most cases, we load completed instants, so we can pre-allocate the vector with a - // capacity of 1/3 of the total number of listed files, - // ignoring requested and inflight instants. - let mut instants = Vec::with_capacity(files.len() / 3); - - for file_info in files { - match selector.try_create_instant(file_info.name.as_str()) { - Ok(instant) => instants.push(instant), - Err(e) => { - // Ignore files that are not valid or desired instants. - debug!( - "Instant not created from file {:?} due to: {:?}", - file_info, e - ); + // If a time filter is present and we have an archived loader, include archived as well. + if selector.has_time_filter() { + let mut instants = self.active_loader.load_instants(selector, desc).await?; + if let Some(archived_loader) = &self.archived_loader { + let mut archived = archived_loader + .load_archived_instants(selector, desc) + .await + .unwrap_or_default(); + if !archived.is_empty() { + // Both sides already sorted by loaders; append is fine for now. + instants.append(&mut archived); } } - } - - instants.sort_unstable(); - - // As of current impl., we don't mutate instants once timeline is created, - // so we can save some memory by shrinking the capacity. - instants.shrink_to_fit(); - - if desc { - Ok(instants.into_iter().rev().collect()) - } else { Ok(instants) + } else { + self.active_loader.load_instants(selector, desc).await } } + async fn load_instants_internal( + &self, + selector: &TimelineSelector, + desc: bool, + ) -> Result> { + // For now, just load active. Archived support will be added internally later + // based on selector ranges. + self.active_loader.load_instants(selector, desc).await + } + /// Get the completed commit [Instant]s in the timeline. /// /// * For Copy-on-write tables, this includes commit instants. @@ -136,7 +137,7 @@ impl Timeline { pub async fn get_completed_commits(&self, desc: bool) -> Result> { let selector = TimelineSelector::completed_commits_in_range(self.hudi_configs.clone(), None, None)?; - Self::load_instants(&selector, &self.storage, desc).await + self.load_instants_internal(&selector, desc).await } /// Get the completed deltacommit [Instant]s in the timeline. @@ -152,7 +153,7 @@ impl Timeline { None, None, )?; - Self::load_instants(&selector, &self.storage, desc).await + self.load_instants_internal(&selector, desc).await } /// Get the completed replacecommit [Instant]s in the timeline. @@ -166,7 +167,7 @@ impl Timeline { None, None, )?; - Self::load_instants(&selector, &self.storage, desc).await + self.load_instants_internal(&selector, desc).await } /// Get the completed clustering commit [Instant]s in the timeline. @@ -180,7 +181,7 @@ impl Timeline { None, None, )?; - let instants = Self::load_instants(&selector, &self.storage, desc).await?; + let instants = self.load_instants_internal(&selector, desc).await?; let mut clustering_instants = Vec::new(); for instant in instants { let metadata = self.get_instant_metadata(&instant).await?; @@ -322,14 +323,58 @@ mod tests { use crate::config::table::HudiTableConfig; use crate::metadata::meta_field::MetaField; + #[tokio::test] + async fn test_timeline_v8_nonpartitioned() { + let base_url = SampleTable::V8Nonpartitioned.url_to_cow(); + let timeline = create_test_timeline(base_url).await; + assert_eq!(timeline.completed_commits.len(), 2); + assert!(matches!( + timeline.active_loader, + TimelineLoader::LayoutTwoActive(_) + )); + assert!(matches!( + timeline.archived_loader, + Some(TimelineLoader::LayoutTwoArchived(_)) + )); + } async fn create_test_timeline(base_url: Url) -> Timeline { - Timeline::new_from_storage( - Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, base_url)])), + let storage = Storage::new( Arc::new(HashMap::new()), + Arc::new(HudiConfigs::new([( + HudiTableConfig::BasePath, + base_url.to_string(), + )])), + ) + .unwrap(); + + let hudi_configs = HudiConfigs::new([(HudiTableConfig::BasePath, base_url.to_string())]); + let table_properties = crate::config::util::parse_data_for_options( + &storage + .get_file_data(".hoodie/hoodie.properties") + .await + .unwrap(), + "=", ) - .await - .unwrap() + .unwrap(); + let mut hudi_configs_map = hudi_configs.as_options(); + hudi_configs_map.extend(table_properties); + let hudi_configs = Arc::new(HudiConfigs::new(hudi_configs_map)); + + let mut timeline = TimelineBuilder::new(hudi_configs, storage) + .build() + .await + .unwrap(); + + let selector = TimelineSelector::completed_actions_in_range( + DEFAULT_LOADING_ACTIONS, + timeline.hudi_configs.clone(), + None, + None, + ) + .unwrap(); + timeline.completed_commits = timeline.load_instants(&selector, false).await.unwrap(); + timeline } #[tokio::test] diff --git a/crates/core/src/timeline/selector.rs b/crates/core/src/timeline/selector.rs index 078ff05c..f231add6 100644 --- a/crates/core/src/timeline/selector.rs +++ b/crates/core/src/timeline/selector.rs @@ -201,6 +201,11 @@ impl TimelineSelector { Self::completed_actions_in_range(&[Action::ReplaceCommit], hudi_configs, start, end) } + /// Whether the selector has any time filter (start or end) applied. + pub fn has_time_filter(&self) -> bool { + self.start_datetime.is_some() || self.end_datetime.is_some() + } + pub fn should_include_action(&self, action: &Action) -> bool { self.actions.is_empty() || self.actions.contains(action) } @@ -306,6 +311,11 @@ mod tests { use super::*; use crate::config::table::HudiTableConfig; use crate::config::HudiConfigs; + use crate::storage::Storage; + use crate::timeline::builder::TimelineBuilder; + use crate::timeline::instant::{Action, Instant, State}; + use crate::timeline::Timeline; + use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -523,7 +533,25 @@ mod tests { } async fn create_test_timeline() -> Timeline { - let instants = vec![ + let storage = Storage::new( + Arc::new(HashMap::new()), + Arc::new(HudiConfigs::new([ + (HudiTableConfig::BasePath, "file:///tmp/base".to_string()), + (HudiTableConfig::TableVersion, "6".to_string()), + ])), + ) + .unwrap(); + let mut timeline = TimelineBuilder::new( + Arc::new(HudiConfigs::new([ + (HudiTableConfig::BasePath, "file:///tmp/base".to_string()), + (HudiTableConfig::TableVersion, "6".to_string()), + ])), + storage, + ) + .build() + .await + .unwrap(); + timeline.completed_commits = vec![ Instant::from_str("20240103153000.commit").unwrap(), Instant::from_str("20240103153010999.commit").unwrap(), Instant::from_str("20240103153020999.commit.requested").unwrap(), @@ -531,16 +559,7 @@ mod tests { Instant::from_str("20240103153020999.commit").unwrap(), Instant::from_str("20240103153030999.commit").unwrap(), ]; - Timeline::new_from_completed_commits( - Arc::new(HudiConfigs::new([( - HudiTableConfig::BasePath, - "file:///tmp/base", - )])), - Arc::new(HashMap::new()), - instants, - ) - .await - .unwrap() + timeline } #[tokio::test] diff --git a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties new file mode 100644 index 00000000..dd1798cd --- /dev/null +++ b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.table.name=commits_load_schema_from_base_file_cow +hoodie.table.type=COPY_ON_WRITE +hoodie.table.version=6 +hoodie.table.timeline.layout.version=1 diff --git a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties new file mode 100644 index 00000000..51853f95 --- /dev/null +++ b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.table.name=commits_load_schema_from_base_file_mor +hoodie.table.type=MERGE_ON_READ +hoodie.table.version=6 +hoodie.table.timeline.layout.version=1 diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 60778a16..92f40da7 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -54,26 +54,29 @@ use hudi_core::table::Table as HudiTable; /// /// # Examples /// -/// ```rust +/// ```rust,no_run /// use std::sync::Arc; /// /// use datafusion::error::Result; /// use datafusion::prelude::{DataFrame, SessionContext}; /// use hudi_datafusion::HudiDataSource; /// -/// // Initialize a new DataFusion session context -/// let ctx = SessionContext::new(); +/// #[tokio::main] +/// async fn main() -> Result<()> { +/// // Initialize a new DataFusion session context +/// let ctx = SessionContext::new(); /// -/// // Create a new HudiDataSource with specific read options -/// let hudi = HudiDataSource::new_with_options( -/// "/tmp/trips_table", -/// [("hoodie.read.input.partitions", 5)]).await?; -/// -/// // Register the Hudi table with the session context -/// ctx.register_table("trips_table", Arc::new(hudi))?; -/// let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 'san_francisco'").await?; -/// df.show().await?; +/// // Create a new HudiDataSource with specific read options +/// let hudi = HudiDataSource::new_with_options( +/// "/tmp/trips_table", +/// [("hoodie.read.input.partitions", "5")]).await?; /// +/// // Register the Hudi table with the session context +/// ctx.register_table("trips_table", Arc::new(hudi))?; +/// let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 'san_francisco'").await?; +/// df.show().await?; +/// Ok(()) +/// } /// ``` #[derive(Clone, Debug)] pub struct HudiDataSource { @@ -254,30 +257,27 @@ impl TableProvider for HudiDataSource { /// /// Creating a new `HudiTableFactory` instance: /// -/// ```rust +/// ```rust,no_run /// use datafusion::prelude::SessionContext; +/// use datafusion::catalog::TableProviderFactory; +/// use datafusion::sql::parser::CreateExternalTable; /// use hudi_datafusion::HudiTableFactory; /// -/// // Initialize a new HudiTableFactory -/// let factory = HudiTableFactory::new(); +/// #[tokio::main] +/// async fn main() -> datafusion::error::Result<()> { +/// // Initialize a new HudiTableFactory +/// let factory = HudiTableFactory::new(); /// -/// // The factory can be used to create Hudi tables -/// let table = factory.create_table(...)?; -/// -/// -/// // Using `CREATE EXTERNAL TABLE` to register Hudi table: -/// let test_table = factory.create_table(...)?; // Table with path + url -/// let ctx = SessionContext::new(); -/// -/// // Register table in session using `CREATE EXTERNAL TABLE` command -/// let create_table_sql = format!( -/// "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}", -/// test_table.as_ref(), -/// test_table.path(), -/// concat_as_sql_options(options) -/// ); -/// ctx.sql(create_table_sql.as_str()).await?; // Query against this table -/// +/// // Initialize a new DataFusion session context +/// let ctx = SessionContext::new(); +/// +/// // Register table using SQL command +/// let create_table_sql = +/// "CREATE EXTERNAL TABLE trips_table STORED AS HUDI LOCATION '/tmp/trips_table'"; +/// ctx.sql(create_table_sql).await?; +/// +/// Ok(()) +/// } /// ``` #[derive(Debug)] pub struct HudiTableFactory {} diff --git a/demo/infra/runner/Dockerfile b/demo/infra/runner/Dockerfile index 21c84193..0ff674bd 100644 --- a/demo/infra/runner/Dockerfile +++ b/demo/infra/runner/Dockerfile @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -FROM rust:1.84 +FROM rust:1.85 RUN apt-get update && apt-get install -y cmake curl ca-certificates diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 25c59591..f5dd0645 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -16,6 +16,6 @@ # under the License. [toolchain] -channel = "1.84" +channel = "1.85" components = ["rustfmt", "clippy"] profile = "minimal"