-
Notifications
You must be signed in to change notification settings - Fork 981
Speed up Parquet filter pushdown v4 (Predicate evaluation cache for async_reader) #7850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
#[derive(Clone)] | ||
pub struct CacheOptions<'a> { | ||
pub projection_mask: &'a ProjectionMask, | ||
pub cache: Arc<Mutex<RowGroupCache>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Practically there's no contention because there's not parallelism in decoding one row group. we add mutex here because we need to use Arc.
let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new( | ||
batch_size, | ||
// None, | ||
Some(1024 * 1024 * 100), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently hard-coded, leave it a future work to make it configurable through user settings
@@ -613,8 +623,18 @@ where | |||
.fetch(&mut self.input, predicate.projection(), selection) | |||
.await?; | |||
|
|||
let mut cache_projection = predicate.projection().clone(); | |||
cache_projection.intersect(&projection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A column is cached if and only if it appears both in output projection and filter projection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So one thing I didn't understand after reading this PR in detail was how the relative row positions are updated after applying a filter.
For example if we are applying multiple filters, the first may reduce the original RowSelection down to [100->200]
, and now when the second filter runs it is only evaluated on the 100->200 rows , not the original selection
In other words I think there needs to be some sort of function equvalent to RowSelection::and_then
that applies to the cache
// Narrow the cache so that it only retains the results of evaluating the predicate
let row_group_cache = row_group_cache.and_then(resulting_selection)
Maybe this is the root cause of https://github.com/apache/datafusion/actions/runs/16302299778/job/46039904381?pr=16711
} | ||
|
||
fn get_def_levels(&self) -> Option<&[i16]> { | ||
None // we don't allow nullable parent for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nested columns not support yet
😮 -- My brain is likely too fried at the moment to review this properly but it is on my list for first thing tomorrow |
Thank you @XiangpengHao for amazing work, i will try to review and test this PR! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TLDR is I think this is really clever - very nice @XiangpengHao . I left some structural comments / suggestions but nothing major.
I will run some more benchmarks, but it was showing very nice improvements for Q21 locally for me (129ms --> 90ms)
If that looks good I'll wire it up in DataFusion and run those benchmarks
Some thoughts:
- I would be happy to wire in the buffering limit / API
- As you say, there are many more improvements possible -- specifically I suspect the
RowSelector
representation is going to cause us pain and suffering for filters that have many short selections when bitmaps would be a better choice
Buffering
I think buffering the intermediate filter results is unavoidable if we want to preserve the current behavior to minimizes the size of IO requests
If we want to reduce buffering I think we can only really do it by increasing the number of IO requests (so we can incrementally produce the final output). I think we should proceed with buffering and then tune if/when needed
CacheOptions { | ||
projection_mask: &cache_projection, | ||
cache: row_group_cache.clone(), | ||
role: crate::arrow::array_reader::CacheRole::Producer, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
structurally both here and below it might help to keep the creation ofthe CacheOptions
into the cache itself so a reader of this code doesn't have to understand the innards of the cache
CacheOptions { | |
projection_mask: &cache_projection, | |
cache: row_group_cache.clone(), | |
role: crate::arrow::array_reader::CacheRole::Producer, | |
}, | |
row_group_cache.producer_options(projection, predicate.proection()) |
|
||
let start_position = self.outer_position - row_count; | ||
|
||
let selection_buffer = row_selection_to_boolean_buffer(row_count, self.selections.iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is clever -- though it will likely suffer from the same "RowSelection is a crappy representation for small selection runs" problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is to alleviate the problem. If we have multiple small selection runs on the same cached batch, first combine them into a boolean buffer, and do boolean selection once.
.expect("data must be already cached in the read_records call, this is a bug"); | ||
let cached = cached.slice(overlap_start - batch_start, selection_length); | ||
let filtered = arrow_select::filter::filter(&cached, &mask_array)?; | ||
selected_arrays.push(filtered); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably use the new BatchCoalescer
here instead: https://docs.rs/arrow/latest/arrow/compute/struct.BatchCoalescer.html
It is definitely faster for primitive arrays and will save intermediate memory usage
It might have some trouble with StringView as it also tries to gc internally too -- we may need to optimize the output to avoid gc'ing if we see the same buffer from call to call
🤖 |
🤖: Benchmark completed Details
|
🤖 |
😎 -- very nice |
Great result! I am curious about the performance compared with no filter pushdown case, because previous try will also improve the performance for this benchmark. But compared to the no filter pushdown case, it has some regression. |
I will try and run this experiment later today |
Thank you @alamb , if it has no regression, i believe this PR will also resolve the adaptive selection cases, if it has regression, we can further combine the adaptive selection for final optimization. |
This comment was marked as resolved.
This comment was marked as resolved.
🤖: Benchmark completed Details
|
This comment was marked as resolved.
This comment was marked as resolved.
Thank you -- I will get back to this tomorrow or Monday |
I am beginning to look into this -- my planned contribution is to
|
I started writing some tests but it got somewhat more complicated than I expected. Here is the WIP PR Once that is in place then I hope to use the same pattern to verify the cache operations. I will continue tomorrow |
Update here: I have hooked up a configuration option, and tests in a PR: The tests are failing because the sync reader does not use the predicate cache yet. I will fix that tomorrow and get that PR up for review |
@@ -1832,6 +1882,7 @@ mod tests { | |||
assert_eq!(total_rows, 730); | |||
} | |||
|
|||
#[ignore] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess my point is we should either update the test or remove it -- leaving it ignored is likely not helping anything
@@ -597,11 +610,16 @@ where | |||
metadata: self.metadata.as_ref(), | |||
}; | |||
|
|||
let cache_options_builder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started looking into how to integrate this cache into the sync reader, and I got a bit stuck because the sync reader evaluates the predicates for all RowGroups first -- so if we put a predicate cache in the sync reader, it would end up having to cache results from all row groups, not just a single row group the way the async reader does
arrow-rs/parquet/src/arrow/arrow_reader/mod.rs
Lines 793 to 818 in 8c75ad9
let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect()); | |
let reader = ReaderRowGroups { | |
reader: Arc::new(input.0), | |
metadata, | |
row_groups, | |
}; | |
let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); | |
// Update selection based on any filters | |
if let Some(filter) = filter.as_mut() { | |
for predicate in filter.predicates.iter_mut() { | |
// break early if we have ruled out all rows | |
if !plan_builder.selects_any() { | |
break; | |
} | |
let mut cache_projection = predicate.projection().clone(); | |
cache_projection.intersect(&projection); | |
let array_reader = ArrayReaderBuilder::new(&reader, &metrics) | |
.build_array_reader(fields.as_deref(), predicate.projection())?; | |
plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: @XiangpengHao and I think it is not worth integrating the cache into the sync reader as the cache would have to cache all the rows across all row groups which is likely not very useful in practice. We'll instead focus on a more flexible Push Decode #7983
Update: Here is a proposed PR to add a config option for predicate evaluation: The only thing left is to figure out how to integrate this caching into the sync reader. Some options I can think of:
So now I am stumped 🤔 Maybe I'll try and hack out a ParquetDecoder and see what that would look like 🤔 |
Co-authored-by: Xiangpeng Hao <haoxiangpeng123@gmail.com>
Add option to control predicate cache, documentation, `ArrowReaderMetrics` and tests
I thought a lot about next steps last night and here is a summary:
Thus I propose the following steps for this PR:
Once we have the end to end tests and statistics broken out, then we can get this PR ready to merge |
I will do this once the above two PRs are merged |
Revert backwards incompatible changes to the Parquet reader API
Clarify in documentation that cache is only for async decoder
I really like #8000, thank you @alamb for writing it up! I'll think about it over the next couple of days. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank yoU @XiangpengHao -- I think we should proceed with this PR
I broke out some of the infrastructure into a new PR in case that is easier for other reviewers
What I think we should do is wait until after we cut the next release (eta early next week) and then merge it in
This is my latest attempt to make pushdown faster. Prior art: #6921
cc @alamb @zhuqi-lucas
filter_pushdown
) by default datafusion#3463Problems of #6921
This PR takes a different approach, it does not change the decoding pipeline, so we avoid the problem 1. It also caches the arrow record batch, so avoid problem 2.
But this means we need to use more memory to cache data.
How it works?
array_readers
with a transparentcached_array_reader
.RowGroupCache
to look for a batch, and only reads from underlying reader on a cache miss.In a concurrent setup, not all reader may reach the peak point at the same time, so the peak system memory usage might be lower.
cached_array_reader
will fallback to read and decode from Parquet.Other benefits
How does it perform?
My criterion somehow won't produces a result from
--save-baseline
, so I asked llm to generate a table from this benchmark:Baseline
is the implementation for current main branch.New Unlimited
is the new pushdown with unlimited memory budget.New 100MB
is the new pushdown but the memory budget for a row group caching is 100MB.Limitations
Next steps?
This pr is largely proof of concept, I want to collect some feedback before sending a multi-thousands pr :)
Some items I can think of: