-
Notifications
You must be signed in to change notification settings - Fork 0
Always apply per-file schema during parquet read #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,7 +91,7 @@ impl FileOpener for ParquetOpener { | |
|
||
let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint); | ||
|
||
let mut reader: Box<dyn AsyncFileReader> = | ||
let mut async_file_reader: Box<dyn AsyncFileReader> = | ||
self.parquet_file_reader_factory.create_reader( | ||
self.partition_index, | ||
file_meta, | ||
|
@@ -121,23 +121,40 @@ impl FileOpener for ParquetOpener { | |
let enable_page_index = self.enable_page_index; | ||
|
||
Ok(Box::pin(async move { | ||
// Don't load the page index yet - we will decide later if we need it | ||
let options = ArrowReaderOptions::new().with_page_index(false); | ||
|
||
// Don't load the page index yet. Since it is not stored inline in | ||
// the footer, loading the page index if it is not needed will do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good comments! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is some of the trickiest code in the parquet reader I think |
||
// unecessary I/O. We decide later if it is needed to evaluate the | ||
// pruning predicates. Thus default to not requesting if from the | ||
// underlying reader. | ||
let mut options = ArrowReaderOptions::new().with_page_index(false); | ||
let mut metadata_timer = file_metrics.metadata_load_time.timer(); | ||
let mut metadata = | ||
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; | ||
|
||
// Begin by loading the metadata from the underlying reader (note | ||
// the returned metadata may actually include page indexes as some | ||
// readers may return page indexes even when not requested -- for | ||
// example when they are cached) | ||
let mut reader_metadata = | ||
ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone()) | ||
.await?; | ||
|
||
// Note about schemas: we are actually dealing with **3 different schemas** here: | ||
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. | ||
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. | ||
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. | ||
let mut physical_file_schema = Arc::clone(metadata.schema()); | ||
let mut physical_file_schema = Arc::clone(reader_metadata.schema()); | ||
|
||
// read with view types | ||
// The schema loaded from the file may not be the same as the | ||
// desired schema (for example if we want to instruct the parquet | ||
// reader to read strings using Utf8View instead). Update if necessary | ||
if let Some(merged) = | ||
apply_file_schema_type_coercions(&table_schema, &physical_file_schema) | ||
{ | ||
physical_file_schema = Arc::new(merged); | ||
options = options.with_schema(Arc::clone(&physical_file_schema)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ths is the actual fix -- to update |
||
reader_metadata = ArrowReaderMetadata::try_new( | ||
Arc::clone(reader_metadata.metadata()), | ||
options.clone(), | ||
)?; | ||
} | ||
|
||
// Build predicates for this specific file | ||
|
@@ -147,23 +164,25 @@ impl FileOpener for ParquetOpener { | |
&predicate_creation_errors, | ||
); | ||
|
||
// Now check if we should load the page index | ||
// The page index is not stored inline in the parquet footer so the | ||
// code above may not have raed the page index structures yet. If we | ||
// need them for reading and they aren't yet loaded, we need to load them now. | ||
if should_enable_page_index(enable_page_index, &page_pruning_predicate) { | ||
metadata = load_page_index( | ||
metadata, | ||
&mut reader, | ||
reader_metadata = load_page_index( | ||
reader_metadata, | ||
&mut async_file_reader, | ||
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency | ||
ArrowReaderOptions::new() | ||
.with_page_index(true) | ||
.with_schema(Arc::clone(&physical_file_schema)), | ||
options.with_page_index(true), | ||
) | ||
.await?; | ||
} | ||
|
||
metadata_timer.stop(); | ||
|
||
let mut builder = | ||
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); | ||
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( | ||
async_file_reader, | ||
reader_metadata, | ||
); | ||
|
||
let (schema_mapping, adapted_projections) = | ||
schema_adapter.map_schema(&physical_file_schema)?; | ||
|
@@ -372,12 +391,14 @@ fn build_pruning_predicates( | |
(pruning_predicate, Some(page_pruning_predicate)) | ||
} | ||
|
||
/// Returns a `ArrowReaderMetadata` with the page index loaded, loading | ||
/// it from the underlying `AsyncFileReader` if necessary. | ||
async fn load_page_index<T: AsyncFileReader>( | ||
arrow_reader: ArrowReaderMetadata, | ||
reader_metadata: ArrowReaderMetadata, | ||
input: &mut T, | ||
options: ArrowReaderOptions, | ||
) -> Result<ArrowReaderMetadata> { | ||
let parquet_metadata = arrow_reader.metadata(); | ||
let parquet_metadata = reader_metadata.metadata(); | ||
let missing_column_index = parquet_metadata.column_index().is_none(); | ||
let missing_offset_index = parquet_metadata.offset_index().is_none(); | ||
// You may ask yourself: why are we even checking if the page index is already loaded here? | ||
|
@@ -397,6 +418,6 @@ async fn load_page_index<T: AsyncFileReader>( | |
Ok(new_arrow_reader) | ||
} else { | ||
// No need to load the page index again, just return the existing metadata | ||
Ok(arrow_reader) | ||
Ok(reader_metadata) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is all comments and renaming variables so they better reflect what they are (
metadata
-->reader_metadata
for example to distinguish betweenParquetMetaData
)