diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 6a88ad88e5d4..580fa4be47af 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -108,10 +108,10 @@ impl TableProviderFactory for ListingTableFactory { (Some(schema), table_partition_cols) }; - let table_path = ListingTableUrl::parse(&cmd.location)?; + let mut table_path = ListingTableUrl::parse(&cmd.location)?; let options = ListingOptions::new(file_format) - .with_file_extension(file_extension) + .with_file_extension(&file_extension) .with_session_config_options(session_state.config()) .with_table_partition_cols(table_partition_cols); @@ -125,6 +125,13 @@ impl TableProviderFactory for ListingTableFactory { // specifically for parquet file format. // See: https://github.com/apache/datafusion/issues/7317 None => { + // if the folder then rewrite a file path as 'path/*.parquet' + // to only read the files the reader can understand + if table_path.is_folder() && table_path.get_glob().is_none() { + table_path = table_path.with_glob( + format!("*.{}", cmd.file_type.to_lowercase()).as_ref(), + )?; + } let schema = options.infer_schema(session_state, &table_path).await?; let df_schema = Arc::clone(&schema).to_dfschema()?; let column_refs: HashSet<_> = cmd diff --git a/datafusion/core/src/execution/context/parquet.rs b/datafusion/core/src/execution/context/parquet.rs index 2fb763bee495..731f7e59ecfa 100644 --- a/datafusion/core/src/execution/context/parquet.rs +++ b/datafusion/core/src/execution/context/parquet.rs @@ -107,11 +107,13 @@ mod tests { use crate::test_util::parquet_test_data; use arrow::util::pretty::pretty_format_batches; - use datafusion_common::assert_contains; use datafusion_common::config::TableParquetOptions; + use datafusion_common::{ + assert_batches_eq, assert_batches_sorted_eq, assert_contains, + }; use datafusion_execution::config::SessionConfig; - use tempfile::tempdir; + use tempfile::{tempdir, TempDir}; #[tokio::test] async fn read_with_glob_path() -> Result<()> { @@ -400,4 +402,124 @@ mod tests { assert_eq!(total_rows, 5); Ok(()) } + + #[tokio::test] + async fn read_from_parquet_folder() -> Result<()> { + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + let test_path = tmp_dir.path().to_str().unwrap().to_string(); + + ctx.sql("SELECT 1 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + ctx.sql("SELECT 2 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + // Adding CSV to check it is not read with Parquet reader + ctx.sql("SELECT 3 a") + .await? + .write_csv(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + let actual = ctx + .read_parquet(&test_path, ParquetReadOptions::default()) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_sorted_eq!(&[ + "+---+", + "| a |", + "+---+", + "| 2 |", + "| 1 |", + "+---+", + ], &actual); + + let actual = ctx + .read_parquet(test_path, ParquetReadOptions::default()) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_sorted_eq!(&[ + "+---+", + "| a |", + "+---+", + "| 2 |", + "| 1 |", + "+---+", + ], &actual); + + Ok(()) + } + + #[tokio::test] + async fn read_from_parquet_folder_table() -> Result<()> { + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + let test_path = tmp_dir.path().to_str().unwrap().to_string(); + + ctx.sql("SELECT 1 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + ctx.sql("SELECT 2 a") + .await? + .write_parquet(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + // Adding CSV to check it is not read with Parquet reader + ctx.sql("SELECT 3 a") + .await? + .write_csv(&test_path, DataFrameWriteOptions::default(), None) + .await?; + + ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref()) + .await?; + + let actual = ctx + .sql("select * from parquet_folder_t1") + .await? + .collect() + .await?; + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_sorted_eq!(&[ + "+---+", + "| a |", + "+---+", + "| 2 |", + "| 1 |", + "+---+", + ], &actual); + + Ok(()) + } + + #[tokio::test] + async fn read_dummy_folder() -> Result<()> { + let ctx = SessionContext::new(); + let test_path = "/foo/"; + + let actual = ctx + .read_parquet(test_path, ParquetReadOptions::default()) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "++", + "++", + ], &actual); + + Ok(()) + } } diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index bddfdbcc06d1..348791be9828 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -282,6 +282,28 @@ impl ListingTableUrl { let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath]; ObjectStoreUrl::parse(url).unwrap() } + + /// Returns true if the [`ListingTableUrl`] points to the folder + pub fn is_folder(&self) -> bool { + self.url.scheme() == "file" && self.is_collection() + } + + /// Return the `url` for [`ListingTableUrl`] + pub fn get_url(&self) -> &Url { + &self.url + } + + /// Return the `glob` for [`ListingTableUrl`] + pub fn get_glob(&self) -> &Option { + &self.glob + } + + /// Returns a copy of current [`ListingTableUrl`] with a specified `glob` + pub fn with_glob(self, glob: &str) -> Result { + let glob = + Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?; + Self::try_new(self.url, Some(glob)) + } } /// Creates a file URL from a potentially relative filesystem path diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index abc6fdab3c8a..33bb052baa51 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -304,6 +304,54 @@ select count(*) from listing_table; ---- 12 +# Test table pointing to the folder with parquet files(ends with /) +statement ok +CREATE EXTERNAL TABLE listing_table_folder_0 +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/test_table/'; + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = true; + +# scan file: 0.parquet 1.parquet 2.parquet +query I +select count(*) from listing_table_folder_0; +---- +9 + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = false; + +# scan file: 0.parquet 1.parquet 2.parquet 3.parquet +query I +select count(*) from listing_table_folder_0; +---- +12 + +# Test table pointing to the folder with parquet files(doesn't end with /) +statement ok +CREATE EXTERNAL TABLE listing_table_folder_1 +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/test_table'; + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = true; + +# scan file: 0.parquet 1.parquet 2.parquet +query I +select count(*) from listing_table_folder_1; +---- +9 + +statement ok +set datafusion.execution.listing_table_ignore_subdirectory = false; + +# scan file: 0.parquet 1.parquet 2.parquet 3.parquet +query I +select count(*) from listing_table_folder_1; +---- +12 + # Clean up statement ok DROP TABLE timestamp_with_tz;