Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cdc3685
Passthrough unary ops through Parquet predicate pushdown
mhaseeb123 Sep 26, 2025
da7cf9a
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 26, 2025
0d55bd4
Update gtests
mhaseeb123 Sep 29, 2025
d7ff34b
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
204d746
Propagate `_always_true` to parent expression when expr can't be eval…
mhaseeb123 Sep 29, 2025
71c3212
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
1dfb9a4
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
0638922
Small update
mhaseeb123 Sep 29, 2025
0733944
Minor updates
mhaseeb123 Sep 29, 2025
c18d5dc
Comments update
mhaseeb123 Sep 29, 2025
88be6f4
Similar changes in dictionary and bloom filter `collectors`
mhaseeb123 Sep 29, 2025
efa8b16
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
bd2b32c
Evaluate `IS_NULL` at rg and page levels in parquet filtering
mhaseeb123 Sep 30, 2025
b41d806
Propagate `_always_true` up to the expression tree
mhaseeb123 Sep 30, 2025
b56d27a
Avoid dynamic cast and directly compare `new_operands` and `_always_t…
mhaseeb123 Oct 2, 2025
bd36563
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 8, 2025
4719b73
Minor comment update
mhaseeb123 Oct 9, 2025
845f097
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 13, 2025
c04ca48
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 13, 2025
9e96d8f
Make `is_null` evaluation optional
mhaseeb123 Oct 14, 2025
74c4d96
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 14, 2025
ccdc317
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 17, 2025
de9a887
Address reviews
mhaseeb123 Oct 20, 2025
a5cc174
Add [[nodiscard]] attribute to to_device method
mhaseeb123 Oct 20, 2025
e6d5221
Apply suggestion from @mhaseeb123
mhaseeb123 Oct 21, 2025
f6dc421
Apply suggestion from @mhaseeb123
mhaseeb123 Oct 21, 2025
fdc5f33
Merge branch 'main' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 21, 2025
106d252
style plz
mhaseeb123 Oct 21, 2025
f0fdf8b
Use correct `mr` for internals
mhaseeb123 Oct 22, 2025
7e496af
fix docstring
mhaseeb123 Oct 22, 2025
54812f1
Optimize for no nulls in page stats
mhaseeb123 Oct 22, 2025
7cd8c26
Minor docstring update
mhaseeb123 Oct 22, 2025
34372f9
Minor: make var names consistent
mhaseeb123 Oct 22, 2025
df48f02
Merge branch 'main' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 107 additions & 38 deletions cpp/src/io/parquet/experimental/page_index_filter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,7 @@ struct page_stats_caster : public stats_caster_base {
cudf::size_type total_rows;
cudf::host_span<metadata_base const> per_file_metadata;
cudf::host_span<std::vector<size_type> const> row_group_indices;

page_stats_caster(size_type total_rows,
cudf::host_span<metadata_base const> per_file_metadata,
cudf::host_span<std::vector<size_type> const> row_group_indices)
: total_rows{total_rows},
per_file_metadata{per_file_metadata},
row_group_indices{row_group_indices}
{
}
bool const has_is_null_operator;

/**
* @brief Transforms a page-level stats column to a row-level stats column for non-string types
Expand Down Expand Up @@ -316,6 +308,42 @@ struct page_stats_caster : public stats_caster_base {
return {std::move(output_data), std::move(output_nullmask)};
}

/**
* @brief Builds a device column containing each page's `is_null` statistic at
* respectively of a column at each row index.
*
* @param is_null Host column storing the page-level is_null statistics
* @param page_indices Device vector containing the page index for each row index
* @param page_row_offsets Host vector row offsets of each page
* @param stream CUDA stream
* @param mr Device memory resource
*
* @return A pair containing the output data buffer and nullmask
*/
[[nodiscard]] std::optional<std::unique_ptr<column>> build_is_null_device_column(
std::optional<host_column<bool>>& is_null,
cudf::device_span<size_type const> page_indices,
cudf::host_span<size_type const> page_row_offsets,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
CUDF_EXPECTS(has_is_null_operator and is_null.has_value(),
"is_null host column must be present");
auto const dtype = cudf::data_type{cudf::type_id::BOOL8};
auto is_nullcol = is_null->to_device(dtype, stream, mr);
auto [null_data, null_bitmask] = build_data_and_nullmask<bool>(is_nullcol->mutable_view(),
is_null->null_mask.data(),
page_indices,
page_row_offsets,
dtype,
stream,
mr);
auto const null_nulls = cudf::detail::null_count(
reinterpret_cast<bitmask_type*>(null_bitmask.data()), 0, total_rows, stream);
return std::make_optional(std::make_unique<column>(
dtype, total_rows, std::move(null_data), std::move(null_bitmask), null_nulls));
}

/**
* @brief Transforms a page-level stats column to a row-level stats column for string type
*
Expand Down Expand Up @@ -440,14 +468,16 @@ struct page_stats_caster : public stats_caster_base {
* @param stream CUDA stream
* @param mr Device memory resource
*
* @return A pair of device columns with min and max value from page statistics for each row
* @return A tuple of device columns with min, max and optionally is_null value from page
* statistics for each row
*/
template <typename T>
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> operator()(
cudf::size_type schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
std::
tuple<std::unique_ptr<column>, std::unique_ptr<column>, std::optional<std::unique_ptr<column>>>
operator()(cudf::size_type schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
// List, Struct, Dictionary types are not supported
if constexpr (cudf::is_compound<T>() and not cuda::std::is_same_v<T, string_view>) {
Expand All @@ -463,9 +493,11 @@ struct page_stats_caster : public stats_caster_base {

auto const total_pages = col_chunk_page_offsets.back();

// Create host columns with page-level min, max values
// Create host columns with page-level min, max and optionally is_null statistics
host_column<T> min(total_pages, stream);
host_column<T> max(total_pages, stream);
std::optional<host_column<bool>> is_null;
if (has_is_null_operator) { is_null = host_column<bool>(total_pages, stream); }

// Populate the host columns with page-level min, max statistics from the page index
auto page_offset_idx = 0;
Expand All @@ -491,18 +523,39 @@ struct page_stats_caster : public stats_caster_base {
auto const page_offset_in_colchunk = col_chunk_page_offsets[page_offset_idx++];

// For all pages in this column chunk
std::for_each(
thrust::counting_iterator<size_t>(0),
thrust::counting_iterator(num_pages_in_colchunk),
[&](auto page_idx) {
auto const& min_value = column_index.min_values[page_idx];
auto const& max_value = column_index.max_values[page_idx];
// Translate binary data to Type then to <T>
min.set_index(
page_offset_in_colchunk + page_idx, min_value, colchunk.meta_data.type);
max.set_index(
page_offset_in_colchunk + page_idx, max_value, colchunk.meta_data.type);
});
std::for_each(thrust::counting_iterator<size_t>(0),
thrust::counting_iterator(num_pages_in_colchunk),
[&](auto page_idx) {
auto const& min_value = column_index.min_values[page_idx];
auto const& max_value = column_index.max_values[page_idx];
auto const column_page_idx = page_offset_in_colchunk + page_idx;
// Translate binary data to Type then to <T>
min.set_index(column_page_idx, min_value, colchunk.meta_data.type);
max.set_index(column_page_idx, max_value, colchunk.meta_data.type);
if (has_is_null_operator) {
// Check if the page is completely null
if (column_index.null_pages[page_idx]) {
is_null->val[column_page_idx] = true;
return;
}
// Check if the page doesn't have a null count
if (not column_index.null_counts.has_value()) {
is_null->set_index(column_page_idx, std::nullopt, {});
return;
}
// Use the null count to determine if the page is completely null
auto const page_row_count = page_row_offsets[column_page_idx + 1] -
page_row_offsets[column_page_idx];
auto const& null_count = column_index.null_counts.value()[page_idx];
if (null_count == page_row_count) {
is_null->val[column_page_idx] = false;
} else if (null_count > 0 and null_count < page_row_count) {
is_null->set_index(column_page_idx, std::nullopt, {});
} else {
CUDF_FAIL("Invalid null count");
}
}
});
});
});

Expand All @@ -513,7 +566,7 @@ struct page_stats_caster : public stats_caster_base {
// For non-strings columns, directly gather the page-level column data and bitmask to the
// row-level.
if constexpr (not cuda::std::is_same_v<T, cudf::string_view>) {
// Move host columns to device
// Move host min/max columns to device
auto mincol = min.to_device(dtype, stream, mr);
auto maxcol = max.to_device(dtype, stream, mr);

Expand All @@ -540,11 +593,14 @@ struct page_stats_caster : public stats_caster_base {
auto const max_nulls = cudf::detail::null_count(
reinterpret_cast<bitmask_type*>(max_bitmask.data()), 0, total_rows, stream);

// Return min and max device columns
// Return min, max and is_null device columns
return {std::make_unique<column>(
dtype, total_rows, std::move(min_data), std::move(min_bitmask), min_nulls),
std::make_unique<column>(
dtype, total_rows, std::move(max_data), std::move(max_bitmask), max_nulls)};
dtype, total_rows, std::move(max_data), std::move(max_bitmask), max_nulls),
has_is_null_operator
? build_is_null_device_column(is_null, page_indices, page_row_offsets, stream, mr)
: std::nullopt};
}
// For strings columns, gather the page-level string offsets and bitmask to row-level
// directly and gather string chars using a batched memcpy.
Expand All @@ -560,7 +616,7 @@ struct page_stats_caster : public stats_caster_base {
auto const max_nulls = cudf::detail::null_count(
reinterpret_cast<bitmask_type*>(max_nullmask.data()), 0, total_rows, stream);

// Return min and max device strings columns
// Return min, max and is_null device strings columns
return {
cudf::make_strings_column(
total_rows,
Expand All @@ -573,7 +629,10 @@ struct page_stats_caster : public stats_caster_base {
std::make_unique<column>(std::move(max_offsets), rmm::device_buffer{0, stream, mr}, 0),
std::move(max_data),
max_nulls,
std::move(max_nullmask))};
std::move(max_nullmask)),
has_is_null_operator
? build_is_null_device_column(is_null, page_indices, page_row_offsets, stream, mr)
: std::nullopt};
}
}
}
Expand Down Expand Up @@ -622,7 +681,7 @@ std::unique_ptr<cudf::column> aggregate_reader_metadata::build_row_mask_with_pag
auto const num_columns = output_dtypes.size();

// Get a boolean mask indicating which columns will participate in stats based filtering
auto const stats_columns_mask =
auto const [stats_columns_mask, has_is_null_operator] =
parquet::detail::stats_columns_collector{filter.get(),
static_cast<size_type>(output_dtypes.size())}
.get_stats_columns_mask();
Expand All @@ -636,8 +695,10 @@ std::unique_ptr<cudf::column> aggregate_reader_metadata::build_row_mask_with_pag
// Convert page statistics to a table
// where min(col[i]) = columns[i*2], max(col[i])=columns[i*2+1]
// For each column, it contains total number of rows from all row groups.
page_stats_caster const stats_col{
static_cast<size_type>(total_rows), per_file_metadata, row_group_indices};
page_stats_caster const stats_col{.total_rows = static_cast<size_type>(total_rows),
.per_file_metadata = per_file_metadata,
.row_group_indices = row_group_indices,
.has_is_null_operator = has_is_null_operator};

std::vector<std::unique_ptr<column>> columns;
std::for_each(
Expand All @@ -654,19 +715,27 @@ std::unique_ptr<cudf::column> aggregate_reader_metadata::build_row_mask_with_pag
data_type{cudf::type_id::BOOL8}, total_rows, rmm::device_buffer{}, 0, stream, mr));
columns.push_back(cudf::make_numeric_column(
data_type{cudf::type_id::BOOL8}, total_rows, rmm::device_buffer{}, 0, stream, mr));
if (has_is_null_operator) {
columns.push_back(cudf::make_numeric_column(
data_type{cudf::type_id::BOOL8}, total_rows, rmm::device_buffer{}, 0, stream, mr));
}
return;
}
auto [min_col, max_col] = cudf::type_dispatcher<dispatch_storage_type>(
auto [min_col, max_col, is_null_col] = cudf::type_dispatcher<dispatch_storage_type>(
dtype, stats_col, schema_idx, dtype, stream, mr);
columns.push_back(std::move(min_col));
columns.push_back(std::move(max_col));
if (has_is_null_operator) {
CUDF_EXPECTS(is_null_col.has_value(), "is_null host column must be present");
columns.push_back(std::move(is_null_col.value()));
}
});

auto stats_table = cudf::table(std::move(columns));

// Converts AST to StatsAST with reference to min, max columns in above `stats_table`.
parquet::detail::stats_expression_converter const stats_expr{
filter.get(), static_cast<size_type>(output_dtypes.size()), stream};
filter.get(), static_cast<size_type>(output_dtypes.size()), has_is_null_operator, stream};

// Filter the input table using AST expression and return the (BOOL8) predicate column.
return cudf::detail::compute_column(stats_table, stats_expr.get_stats_expr().get(), stream, mr);
Expand Down
66 changes: 47 additions & 19 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,26 @@ struct row_group_stats_caster : public stats_caster_base {
size_type total_row_groups;
std::vector<metadata> const& per_file_metadata;
host_span<std::vector<size_type> const> row_group_indices;

row_group_stats_caster(size_type total_row_groups,
std::vector<metadata> const& per_file_metadata,
host_span<std::vector<size_type> const> row_group_indices)
: total_row_groups{total_row_groups},
per_file_metadata{per_file_metadata},
row_group_indices{row_group_indices}
{
}
bool has_is_null_operator;

// Creates device columns from column statistics (min, max)
template <typename T>
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> operator()(
int schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
std::
tuple<std::unique_ptr<column>, std::unique_ptr<column>, std::optional<std::unique_ptr<column>>>
operator()(int schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
// List, Struct, Dictionary types are not supported
if constexpr (cudf::is_compound<T>() && !std::is_same_v<T, string_view>) {
CUDF_FAIL("Compound types do not have statistics");
} else {
host_column<T> min(total_row_groups, stream);
host_column<T> max(total_row_groups, stream);
std::optional<host_column<bool>> is_null;
if (has_is_null_operator) { is_null = host_column<bool>(total_row_groups, stream); }

size_type stats_idx = 0;
for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) {
for (auto const rg_idx : row_group_indices[src_idx]) {
Expand All @@ -96,15 +92,35 @@ struct row_group_stats_caster : public stats_caster_base {
// translate binary data to Type then to <T>
min.set_index(stats_idx, min_value, colchunk.meta_data.type);
max.set_index(stats_idx, max_value, colchunk.meta_data.type);
// Check the nullability of this column chunk
if (has_is_null_operator) {
if (colchunk.meta_data.statistics.null_count.has_value()) {
auto const& null_count = colchunk.meta_data.statistics.null_count.value();
if (null_count == 0) {
is_null->val[stats_idx] = false;
} else if (null_count < colchunk.meta_data.num_values) {
is_null->set_index(stats_idx, std::nullopt, {});
} else if (null_count == colchunk.meta_data.num_values) {
is_null->val[stats_idx] = true;
} else {
CUDF_FAIL("Invalid null count");
}
}
}
} else {
// Marking it null, if column present in row group
min.set_index(stats_idx, std::nullopt, {});
max.set_index(stats_idx, std::nullopt, {});
if (has_is_null_operator) { is_null->set_index(stats_idx, std::nullopt, {}); }
}
stats_idx++;
}
};
return {min.to_device(dtype, stream, mr), max.to_device(dtype, stream, mr)};
return {min.to_device(dtype, stream, mr),
max.to_device(dtype, stream, mr),
has_is_null_operator ? std::make_optional(is_null->to_device(
data_type{cudf::type_id::BOOL8}, stream, mr))
: std::nullopt};
}
}
};
Expand All @@ -122,7 +138,7 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
auto mr = cudf::get_current_device_resource_ref();

// Get a boolean mask indicating which columns can participate in stats based filtering
auto const stats_columns_mask =
auto const [stats_columns_mask, has_is_null_operator] =
stats_columns_collector{filter.get(), static_cast<size_type>(output_dtypes.size())}
.get_stats_columns_mask();

Expand All @@ -134,7 +150,11 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
// For each column, it contains #sources * #column_chunks_per_src rows
std::vector<std::unique_ptr<column>> columns;
row_group_stats_caster const stats_col{
static_cast<size_type>(total_row_groups), per_file_metadata, input_row_group_indices};
.total_row_groups = static_cast<size_type>(total_row_groups),
.per_file_metadata = per_file_metadata,
.row_group_indices = input_row_group_indices,
.has_is_null_operator = has_is_null_operator};

for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
auto const schema_idx = output_column_schemas[col_idx];
auto const& dtype = output_dtypes[col_idx];
Expand All @@ -146,18 +166,26 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
data_type{cudf::type_id::BOOL8}, total_row_groups, rmm::device_buffer{}, 0, stream, mr));
columns.push_back(cudf::make_numeric_column(
data_type{cudf::type_id::BOOL8}, total_row_groups, rmm::device_buffer{}, 0, stream, mr));
if (has_is_null_operator) {
columns.push_back(cudf::make_numeric_column(
data_type{cudf::type_id::BOOL8}, total_row_groups, rmm::device_buffer{}, 0, stream, mr));
}
continue;
}
auto [min_col, max_col] =
auto [min_col, max_col, is_null_col] =
cudf::type_dispatcher<dispatch_storage_type>(dtype, stats_col, schema_idx, dtype, stream, mr);
columns.push_back(std::move(min_col));
columns.push_back(std::move(max_col));
if (has_is_null_operator) {
CUDF_EXPECTS(is_null_col.has_value(), "is_null column must be present");
columns.push_back(std::move(is_null_col.value()));
}
}
auto stats_table = cudf::table(std::move(columns));

// Converts AST to StatsAST with reference to min, max columns in above `stats_table`.
stats_expression_converter const stats_expr{
filter.get(), static_cast<size_type>(output_dtypes.size()), stream};
filter.get(), static_cast<size_type>(output_dtypes.size()), has_is_null_operator, stream};

// Filter stats table with StatsAST expression and collect filtered row group indices
return collect_filtered_row_group_indices(
Expand Down
Loading
Loading