Skip to content

TableProvider to skip files in the folder which non relevant to selected reader #16487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 24, 2025
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ impl TableProviderFactory for ListingTableFactory {
(Some(schema), table_partition_cols)
};

let table_path = ListingTableUrl::parse(&cmd.location)?;
let mut table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_file_extension(file_extension)
.with_file_extension(&file_extension)
.with_session_config_options(session_state.config())
.with_table_partition_cols(table_partition_cols);

Expand All @@ -125,6 +125,13 @@ impl TableProviderFactory for ListingTableFactory {
// specifically for parquet file format.
// See: https://github.com/apache/datafusion/issues/7317
None => {
// if the folder then rewrite a file path as 'path/*.parquet'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an actual fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will mean a directory of files like foo/my_file.parquet.snappy would not be readable anymore -- I think that spark creates files like my_file.snappy.parquet so it should be ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be ok, compressed files are usually *.codec.parquet and more broad wildcard *.parquet should read them. My local test I did against part-00000-9b95f137-d11f-44b6-84b7-d49c95bc7c5b-c000.snappy.parquet

// to only read the files the reader can understand
if table_path.is_folder() && table_path.get_glob().is_none() {
table_path = table_path.with_glob(
format!("*.{}", cmd.file_type.to_lowercase()).as_ref(),
)?;
}
let schema = options.infer_schema(session_state, &table_path).await?;
let df_schema = Arc::clone(&schema).to_dfschema()?;
let column_refs: HashSet<_> = cmd
Expand Down
126 changes: 124 additions & 2 deletions datafusion/core/src/execution/context/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ mod tests {
use crate::test_util::parquet_test_data;

use arrow::util::pretty::pretty_format_batches;
use datafusion_common::assert_contains;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, assert_contains,
};
use datafusion_execution::config::SessionConfig;

use tempfile::tempdir;
use tempfile::{tempdir, TempDir};

#[tokio::test]
async fn read_with_glob_path() -> Result<()> {
Expand Down Expand Up @@ -400,4 +402,124 @@ mod tests {
assert_eq!(total_rows, 5);
Ok(())
}

#[tokio::test]
async fn read_from_parquet_folder() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;
let test_path = tmp_dir.path().to_str().unwrap().to_string();

ctx.sql("SELECT 1 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

ctx.sql("SELECT 2 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

// Adding CSV to check it is not read with Parquet reader
ctx.sql("SELECT 3 a")
.await?
.write_csv(&test_path, DataFrameWriteOptions::default(), None)
.await?;

let actual = ctx
.read_parquet(&test_path, ParquetReadOptions::default())
.await?
.collect()
.await?;

#[cfg_attr(any(), rustfmt::skip)]
assert_batches_sorted_eq!(&[
"+---+",
"| a |",
"+---+",
"| 2 |",
"| 1 |",
"+---+",
], &actual);

let actual = ctx
.read_parquet(test_path, ParquetReadOptions::default())
.await?
.collect()
.await?;

#[cfg_attr(any(), rustfmt::skip)]
assert_batches_sorted_eq!(&[
"+---+",
"| a |",
"+---+",
"| 2 |",
"| 1 |",
"+---+",
], &actual);

Ok(())
}

#[tokio::test]
async fn read_from_parquet_folder_table() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;
let test_path = tmp_dir.path().to_str().unwrap().to_string();

ctx.sql("SELECT 1 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

ctx.sql("SELECT 2 a")
.await?
.write_parquet(&test_path, DataFrameWriteOptions::default(), None)
.await?;

// Adding CSV to check it is not read with Parquet reader
ctx.sql("SELECT 3 a")
.await?
.write_csv(&test_path, DataFrameWriteOptions::default(), None)
.await?;

ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref())
.await?;

let actual = ctx
.sql("select * from parquet_folder_t1")
.await?
.collect()
.await?;
#[cfg_attr(any(), rustfmt::skip)]
assert_batches_sorted_eq!(&[
"+---+",
"| a |",
"+---+",
"| 2 |",
"| 1 |",
"+---+",
], &actual);

Ok(())
}

#[tokio::test]
async fn read_dummy_folder() -> Result<()> {
let ctx = SessionContext::new();
let test_path = "/foo/";

let actual = ctx
.read_parquet(test_path, ParquetReadOptions::default())
.await?
.collect()
.await?;

#[cfg_attr(any(), rustfmt::skip)]
assert_batches_eq!(&[
"++",
"++",
], &actual);

Ok(())
}
}
22 changes: 22 additions & 0 deletions datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,28 @@ impl ListingTableUrl {
let url = &self.url[url::Position::BeforeScheme..url::Position::BeforePath];
ObjectStoreUrl::parse(url).unwrap()
}

/// Returns true if the [`ListingTableUrl`] points to the folder
pub fn is_folder(&self) -> bool {
self.url.scheme() == "file" && self.is_collection()
}

/// Return the `url` for [`ListingTableUrl`]
pub fn get_url(&self) -> &Url {
&self.url
}

/// Return the `glob` for [`ListingTableUrl`]
pub fn get_glob(&self) -> &Option<Pattern> {
&self.glob
}

/// Returns a copy of current [`ListingTableUrl`] with a specified `glob`
pub fn with_glob(self, glob: &str) -> Result<Self> {
let glob =
Pattern::new(glob).map_err(|e| DataFusionError::External(Box::new(e)))?;
Self::try_new(self.url, Some(glob))
}
}

/// Creates a file URL from a potentially relative filesystem path
Expand Down
48 changes: 48 additions & 0 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,54 @@ select count(*) from listing_table;
----
12

# Test table pointing to the folder with parquet files(ends with /)
statement ok
CREATE EXTERNAL TABLE listing_table_folder_0
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_table/';

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = true;

# scan file: 0.parquet 1.parquet 2.parquet
query I
select count(*) from listing_table_folder_0;
----
9

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = false;

# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
query I
select count(*) from listing_table_folder_0;
----
12

# Test table pointing to the folder with parquet files(doesn't end with /)
statement ok
CREATE EXTERNAL TABLE listing_table_folder_1
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_table';

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = true;

# scan file: 0.parquet 1.parquet 2.parquet
query I
select count(*) from listing_table_folder_1;
----
9

statement ok
set datafusion.execution.listing_table_ignore_subdirectory = false;

# scan file: 0.parquet 1.parquet 2.parquet 3.parquet
query I
select count(*) from listing_table_folder_1;
----
12

# Clean up
statement ok
DROP TABLE timestamp_with_tz;
Expand Down