-
Notifications
You must be signed in to change notification settings - Fork 982
Description
(This is based on discussions with @crepererum and @XiangpengHao over the last few days)
Is your feature request related to a problem or challenge?
After working with the Parquet Reader, I have concluded it is not realistic to implement smart prefetching of row groups or pages because the knowledge of what data is needed next is not available from the reader.
This means in practice, users either have to
- buffer the entire file in memory or local SSD (which is what we do in InfluxDB)
- read the data serially with multiple smaller requests while decoding.
Buffering the entire file consumes significant resource and avoids some of the benefits inherent in columnar IO formats.
Reading the data with multiple smaller requests is ok on a low latency local file system or when you have a bunch of outstanding requests (where the latency can be hidden by other work), but it is pretty bad for remote object stores or remote filesystems (e.g. S3, HDFS, etc) where the latency of each request is high.
There are also several other things which I find annoying about the current design which i will rant about:
- Row filters are evaluated eagerly (before the reader is created) see docs ("Note: this will eagerly evaluate any RowFilter before returning") which can potentially "front load" significant IO and CPU operations before client code might expect
- There are different sync and async readers, which makes it hard (impossible?) to write tests that cover both without duplicating code (see Add option to control predicate cache, documentation,
ArrowReaderMetrics
and tests XiangpengHao/arrow-rs#7) - The internal implementation (e.g. for evaluating filters) is duplicated between the sync and async readers, which makes it hard to maintain and. Example: The initial predicate cache in Speed up Parquet filter pushdown v4 (Predicate evaluation cache for async_reader) #7850 does not work for the sync reader 😬
- The
ArrowReaderBuilder
is a templated typedef . So the innocent soundingParquetRecordBatchReaderBuilder
is actuallyArrowReaderBuilder<SyncReader<T>>;
and to actually use that you need to figure outT: ChunkReader
. The multiple template levels makes it challenging to use (why is the the underlying type of the IO reader needed to to configure things like aRowSelection
?) - I found the async API confusing: you have to implement
AsyncFileReader
which combines IO and optional Parquet metadata caching. It is quite tricky to implement if you want to read from some different remote source. - It is also a pain to test the async reader (as you can't easily do so with in memory
Bytes
). You can see this pain as there are at least two implementations of AsyncFileReader for Bytes in the codebase. (I also had to add a third in Add option to control predicate cache, documentation,ArrowReaderMetrics
and tests XiangpengHao/arrow-rs#7)
Root Cause: Pull Based Decoder ?
I think the core reason for many of the above challenges is the current design of the Parquet Reader as a "pull" based decoder where the CPU operations (decoding) drives the IO operations (e.g reading from the file).
The way the reader works is:
- the caller asks for the next batch,
- the reader internally (may) make an I/O call to the underyling reader (either async or non async) if it needs needs more data
- the reader then decodes from buffered data
let reader = ParquetRecordBatchBuilder::new(io_reader)
// more configuration here
.build();
// The call to `reader.next()` MAY do IO, depending on the readers state, but
// the outer loop has no way to know what IO will be needed next
while let Some(batch) = reader.next() {
// process
}
This design makes it challenging to add more sophisticated strategies. For example, @masonh22 proposed to add pre-fetching of row groups to the reader #6676. Among other challenges with that PR is that automatically pre-fetching row group data helps for some cases (e.g. reading remote object store), but can be pure cost on others (e.g. increases memory usage when the data is already in memory)
Given the current pull based design I do not think it is feasible to implement different strategies in a reasonable way with the current API.
Describe the solution you'd like
I would like a way for advanced users of the parquet crate to more carefully optimize the I/O patterns
Describe alternatives you've considered
Push Based Decoder
One potential solution (again, thanks to @crepererum and @XiangpengHao), is to make a "push based" decoder, similarly to the JSON decoder and the CSV decoder.
A (simplified) example of how the JSON Decoder
works shows illustrates the difference:
let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
let mut next = move || {
loop {
// load some more bytes from somewhere
let buf = read_input();
if buf.is_empty() {
break; // Input exhausted
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
// If the decoder couldn't read the whole buffer, it made an output batch
if decoded != read {
break;
}
}
decoder.flush() // retrieve completed batch
};
So the idea is that we could make a ParquetDecoder
that is "push" based, and did not have an underlying IO reader, but instead would be given a stream of data to decode. We could then implement the current sync and async APIs using that underlying Decoder
The challenge is likely with the API design: unlike streams of JSON / CSV, the data that the parquet reader needs next will need is not easy to predict as it depends on the filters, the row groups, which columns are requested, etc.
Here is a high level straw man design:
// Create a decoder for decoding parquet data (note it does NOT have any IO / readers)
let mut decoder = ParquetDecoderBuilder::new();
// provide metadata, if known (avoid having to read it each time)
.with_file_metadata(metadata)
.with_row_groups(row_groups)
.with_projection(projection)
.build();
// In a loop, ask the decoder what it needs next, and provide it with the required data
while let Some(next_request) = decoder.next_request() {
// next_request will tell us what data ranges it needs (details TBD)
let data: Bytes = fetch_data(next_request);
// provide the data to the decoder
decoder.push_data(next_request, data);
// Now the decoder may be able to decode a batch
// maybe it will return one or more batches, or it will ask for more data
while let Some(batch) = decoder.next_batch() {
// process the batch
}
}
Note that the above example is not async, but could be easily be used by an async API
With such a ParquetDecoder
I think we could then add things like prefetching of row groups, etc, with new APIs on the decoder
// Create a decoder for decoding parquet data as above
let mut decoder: ParquetDecoderBuilder = ...;
// As the decoder up from what data it will need, start prefetching data if desired
while let Some(pre_request) = decoder.peek_next_requests() {
// note that this is a peek and if we call peek again in the
// future, we may get a different set of pre_requests (for example
// if the decoder has applied a row filter and ruled out
// some row groups or data pages)
start_prefetch(pre_request);
}
// push data to the decoder as before, but hopefully the reader
// will have already prefetched some of the data
Additional context
I have now hit this while trying to write tests end to end for the parquet reader (also see #7971), and it was very annoying to have to duplicate the code for the sync and async readers.
- @tustvold 's suggestion Push-Based Parquet Reader #1605
- Here is another proposal for a low level API Low-Level Arrow Parquet Reader #5522