Skip to content

Always apply per-file schema during parquet read #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl FileOpener for ParquetOpener {

let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint);

let mut reader: Box<dyn AsyncFileReader> =
let mut async_file_reader: Box<dyn AsyncFileReader> =
self.parquet_file_reader_factory.create_reader(
self.partition_index,
file_meta,
Expand Down Expand Up @@ -121,23 +121,40 @@ impl FileOpener for ParquetOpener {
let enable_page_index = self.enable_page_index;

Ok(Box::pin(async move {
// Don't load the page index yet - we will decide later if we need it
let options = ArrowReaderOptions::new().with_page_index(false);

// Don't load the page index yet. Since it is not stored inline in
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is all comments and renaming variables so they better reflect what they are (metadata --> reader_metadata for example to distinguish between ParquetMetaData)

// the footer, loading the page index if it is not needed will do

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comments!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is some of the trickiest code in the parquet reader I think

// unecessary I/O. We decide later if it is needed to evaluate the
// pruning predicates. Thus default to not requesting if from the
// underlying reader.
let mut options = ArrowReaderOptions::new().with_page_index(false);
let mut metadata_timer = file_metrics.metadata_load_time.timer();
let mut metadata =
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;

// Begin by loading the metadata from the underlying reader (note
// the returned metadata may actually include page indexes as some
// readers may return page indexes even when not requested -- for
// example when they are cached)
let mut reader_metadata =
ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
.await?;

// Note about schemas: we are actually dealing with **3 different schemas** here:
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to.
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
let mut physical_file_schema = Arc::clone(metadata.schema());
let mut physical_file_schema = Arc::clone(reader_metadata.schema());

// read with view types
// The schema loaded from the file may not be the same as the
// desired schema (for example if we want to instruct the parquet
// reader to read strings using Utf8View instead). Update if necessary
if let Some(merged) =
apply_file_schema_type_coercions(&table_schema, &physical_file_schema)
{
physical_file_schema = Arc::new(merged);
options = options.with_schema(Arc::clone(&physical_file_schema));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ths is the actual fix -- to update reader_metadata here

reader_metadata = ArrowReaderMetadata::try_new(
Arc::clone(reader_metadata.metadata()),
options.clone(),
)?;
}

// Build predicates for this specific file
Expand All @@ -147,23 +164,25 @@ impl FileOpener for ParquetOpener {
&predicate_creation_errors,
);

// Now check if we should load the page index
// The page index is not stored inline in the parquet footer so the
// code above may not have raed the page index structures yet. If we
// need them for reading and they aren't yet loaded, we need to load them now.
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
metadata = load_page_index(
metadata,
&mut reader,
reader_metadata = load_page_index(
reader_metadata,
&mut async_file_reader,
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
ArrowReaderOptions::new()
.with_page_index(true)
.with_schema(Arc::clone(&physical_file_schema)),
options.with_page_index(true),
)
.await?;
}

metadata_timer.stop();

let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
async_file_reader,
reader_metadata,
);

let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(&physical_file_schema)?;
Expand Down Expand Up @@ -372,12 +391,14 @@ fn build_pruning_predicates(
(pruning_predicate, Some(page_pruning_predicate))
}

/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
/// it from the underlying `AsyncFileReader` if necessary.
async fn load_page_index<T: AsyncFileReader>(
arrow_reader: ArrowReaderMetadata,
reader_metadata: ArrowReaderMetadata,
input: &mut T,
options: ArrowReaderOptions,
) -> Result<ArrowReaderMetadata> {
let parquet_metadata = arrow_reader.metadata();
let parquet_metadata = reader_metadata.metadata();
let missing_column_index = parquet_metadata.column_index().is_none();
let missing_offset_index = parquet_metadata.offset_index().is_none();
// You may ask yourself: why are we even checking if the page index is already loaded here?
Expand All @@ -397,6 +418,6 @@ async fn load_page_index<T: AsyncFileReader>(
Ok(new_arrow_reader)
} else {
// No need to load the page index again, just return the existing metadata
Ok(arrow_reader)
Ok(reader_metadata)
}
}