Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cdc3685
Passthrough unary ops through Parquet predicate pushdown
mhaseeb123 Sep 26, 2025
da7cf9a
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 26, 2025
0d55bd4
Update gtests
mhaseeb123 Sep 29, 2025
d7ff34b
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
204d746
Propagate `_always_true` to parent expression when expr can't be eval…
mhaseeb123 Sep 29, 2025
71c3212
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
1dfb9a4
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
0638922
Small update
mhaseeb123 Sep 29, 2025
0733944
Minor updates
mhaseeb123 Sep 29, 2025
c18d5dc
Comments update
mhaseeb123 Sep 29, 2025
88be6f4
Similar changes in dictionary and bloom filter `collectors`
mhaseeb123 Sep 29, 2025
efa8b16
Merge branch 'branch-25.12' into fea/passthrough-unary-ops-in-pq-pred…
mhaseeb123 Sep 29, 2025
bd2b32c
Evaluate `IS_NULL` at rg and page levels in parquet filtering
mhaseeb123 Sep 30, 2025
b41d806
Propagate `_always_true` up to the expression tree
mhaseeb123 Sep 30, 2025
b56d27a
Avoid dynamic cast and directly compare `new_operands` and `_always_t…
mhaseeb123 Oct 2, 2025
bd36563
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 8, 2025
4719b73
Minor comment update
mhaseeb123 Oct 9, 2025
845f097
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 13, 2025
c04ca48
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 13, 2025
9e96d8f
Make `is_null` evaluation optional
mhaseeb123 Oct 14, 2025
74c4d96
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 14, 2025
ccdc317
Merge branch 'branch-25.12' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 17, 2025
de9a887
Address reviews
mhaseeb123 Oct 20, 2025
a5cc174
Add [[nodiscard]] attribute to to_device method
mhaseeb123 Oct 20, 2025
e6d5221
Apply suggestion from @mhaseeb123
mhaseeb123 Oct 21, 2025
f6dc421
Apply suggestion from @mhaseeb123
mhaseeb123 Oct 21, 2025
fdc5f33
Merge branch 'main' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 21, 2025
106d252
style plz
mhaseeb123 Oct 21, 2025
f0fdf8b
Use correct `mr` for internals
mhaseeb123 Oct 22, 2025
7e496af
fix docstring
mhaseeb123 Oct 22, 2025
54812f1
Optimize for no nulls in page stats
mhaseeb123 Oct 22, 2025
7cd8c26
Minor docstring update
mhaseeb123 Oct 22, 2025
34372f9
Minor: make var names consistent
mhaseeb123 Oct 22, 2025
df48f02
Merge branch 'main' into fea/evaluate-is-null-using-stats
mhaseeb123 Oct 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 240 additions & 115 deletions cpp/src/io/parquet/experimental/page_index_filter.cu

Large diffs are not rendered by default.

66 changes: 47 additions & 19 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,26 @@ struct row_group_stats_caster : public stats_caster_base {
size_type total_row_groups;
std::vector<metadata> const& per_file_metadata;
host_span<std::vector<size_type> const> row_group_indices;

row_group_stats_caster(size_type total_row_groups,
std::vector<metadata> const& per_file_metadata,
host_span<std::vector<size_type> const> row_group_indices)
: total_row_groups{total_row_groups},
per_file_metadata{per_file_metadata},
row_group_indices{row_group_indices}
{
}
bool has_is_null_operator;

// Creates device columns from column statistics (min, max)
template <typename T>
std::pair<std::unique_ptr<column>, std::unique_ptr<column>> operator()(
int schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
std::
tuple<std::unique_ptr<column>, std::unique_ptr<column>, std::optional<std::unique_ptr<column>>>
operator()(int schema_idx,
cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
// List, Struct, Dictionary types are not supported
if constexpr (cudf::is_compound<T>() && !std::is_same_v<T, string_view>) {
CUDF_FAIL("Compound types do not have statistics");
} else {
host_column<T> min(total_row_groups, stream);
host_column<T> max(total_row_groups, stream);
std::optional<host_column<bool>> is_null;
if (has_is_null_operator) { is_null = host_column<bool>(total_row_groups, stream); }

size_type stats_idx = 0;
for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) {
for (auto const rg_idx : row_group_indices[src_idx]) {
Expand All @@ -96,15 +92,35 @@ struct row_group_stats_caster : public stats_caster_base {
// translate binary data to Type then to <T>
min.set_index(stats_idx, min_value, colchunk.meta_data.type);
max.set_index(stats_idx, max_value, colchunk.meta_data.type);
// Check the nullability of this column chunk
if (has_is_null_operator) {
if (colchunk.meta_data.statistics.null_count.has_value()) {
auto const& null_count = colchunk.meta_data.statistics.null_count.value();
if (null_count == 0) {
is_null->val[stats_idx] = false;
} else if (null_count < colchunk.meta_data.num_values) {
is_null->set_index(stats_idx, std::nullopt, {});
} else if (null_count == colchunk.meta_data.num_values) {
is_null->val[stats_idx] = true;
} else {
CUDF_FAIL("Invalid null count");
}
}
}
} else {
// Marking it null, if column present in row group
min.set_index(stats_idx, std::nullopt, {});
max.set_index(stats_idx, std::nullopt, {});
if (has_is_null_operator) { is_null->set_index(stats_idx, std::nullopt, {}); }
}
stats_idx++;
}
};
return {min.to_device(dtype, stream, mr), max.to_device(dtype, stream, mr)};
return {min.to_device(dtype, stream, mr),
max.to_device(dtype, stream, mr),
has_is_null_operator ? std::make_optional(is_null->to_device(
data_type{cudf::type_id::BOOL8}, stream, mr))
: std::nullopt};
}
}
};
Expand All @@ -122,7 +138,7 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
auto mr = cudf::get_current_device_resource_ref();

// Get a boolean mask indicating which columns can participate in stats based filtering
auto const stats_columns_mask =
auto const [stats_columns_mask, has_is_null_operator] =
stats_columns_collector{filter.get(), static_cast<size_type>(output_dtypes.size())}
.get_stats_columns_mask();

Expand All @@ -134,7 +150,11 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
// For each column, it contains #sources * #column_chunks_per_src rows
std::vector<std::unique_ptr<column>> columns;
row_group_stats_caster const stats_col{
static_cast<size_type>(total_row_groups), per_file_metadata, input_row_group_indices};
.total_row_groups = static_cast<size_type>(total_row_groups),
.per_file_metadata = per_file_metadata,
.row_group_indices = input_row_group_indices,
.has_is_null_operator = has_is_null_operator};

for (size_t col_idx = 0; col_idx < output_dtypes.size(); col_idx++) {
auto const schema_idx = output_column_schemas[col_idx];
auto const& dtype = output_dtypes[col_idx];
Expand All @@ -146,18 +166,26 @@ std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::ap
data_type{cudf::type_id::BOOL8}, total_row_groups, rmm::device_buffer{}, 0, stream, mr));
columns.push_back(cudf::make_numeric_column(
data_type{cudf::type_id::BOOL8}, total_row_groups, rmm::device_buffer{}, 0, stream, mr));
if (has_is_null_operator) {
columns.push_back(cudf::make_numeric_column(
data_type{cudf::type_id::BOOL8}, total_row_groups, rmm::device_buffer{}, 0, stream, mr));
}
continue;
}
auto [min_col, max_col] =
auto [min_col, max_col, is_null_col] =
cudf::type_dispatcher<dispatch_storage_type>(dtype, stats_col, schema_idx, dtype, stream, mr);
columns.push_back(std::move(min_col));
columns.push_back(std::move(max_col));
if (has_is_null_operator) {
CUDF_EXPECTS(is_null_col.has_value(), "is_null column must be present");
columns.push_back(std::move(is_null_col.value()));
}
}
auto stats_table = cudf::table(std::move(columns));

// Converts AST to StatsAST with reference to min, max columns in above `stats_table`.
stats_expression_converter const stats_expr{
filter.get(), static_cast<size_type>(output_dtypes.size()), stream};
filter.get(), static_cast<size_type>(output_dtypes.size()), has_is_null_operator, stream};

// Filter stats table with StatsAST expression and collect filtered row group indices
return collect_filtered_row_group_indices(
Expand Down
54 changes: 37 additions & 17 deletions cpp/src/io/parquet/stats_filter_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ std::reference_wrapper<ast::expression const> stats_columns_collector::visit(
v->accept(*this);

// Return early if this is a unary operation
if (operator_arity == 1) { return expr; }
if (operator_arity == 1 and op != ast_operator::IS_NULL) { return expr; }

// Else if this is a supported binary operation, mark the column as needed
if (op == ast_operator::EQUAL or op == ast_operator::NOT_EQUAL or op == ast_operator::LESS or
op == ast_operator::LESS_EQUAL or op == ast_operator::GREATER or
op == ast_operator::GREATER_EQUAL) {
op == ast_operator::GREATER_EQUAL or op == ast_operator::IS_NULL) {
_columns_mask[v->get_column_index()] = true;
if (op == ast_operator::IS_NULL) { _has_is_null_operator = true; }
} else {
CUDF_FAIL("Unsupported binary operation in Statistics AST");
}
Expand All @@ -91,9 +92,9 @@ std::reference_wrapper<ast::expression const> stats_columns_collector::visit(
return expr;
}

thrust::host_vector<bool> stats_columns_collector::get_stats_columns_mask() &&
std::pair<thrust::host_vector<bool>, bool> stats_columns_collector::get_stats_columns_mask() &&
{
return std::move(_columns_mask);
return {std::move(_columns_mask), _has_is_null_operator};
}

std::vector<std::reference_wrapper<ast::expression const>> stats_columns_collector::visit_operands(
Expand All @@ -110,11 +111,13 @@ std::vector<std::reference_wrapper<ast::expression const>> stats_columns_collect

stats_expression_converter::stats_expression_converter(ast::expression const& expr,
size_type num_columns,
bool has_is_null_operator,
rmm::cuda_stream_view stream)
: _always_true_scalar{std::make_unique<cudf::numeric_scalar<bool>>(true, true, stream)},
_always_true{std::make_unique<ast::literal>(*_always_true_scalar)}
{
_num_columns = num_columns;
_stats_cols_per_column = has_is_null_operator ? 3 : 2;
_num_columns = num_columns;
expr.accept(*this);
}

Expand All @@ -135,16 +138,27 @@ std::reference_wrapper<ast::expression const> stats_expression_converter::visit(
"Second operand of binary operation with column reference must be a literal");
v->accept(*this);

// For unary operators, push and return the `_always_true` expression
auto const col_index = v->get_column_index();

if (operator_arity == 1) {
_stats_expr.push(ast::operation{ast_operator::IDENTITY, *_always_true});
// Propagate the `_always_true` as expression to its unary operator parent
return *_always_true;
// Evaluate IS_NULL unary operator
if (op == ast_operator::IS_NULL) {
CUDF_EXPECTS(std::cmp_equal(_stats_cols_per_column, 3),
"IS_NULL operator cannot be evaluated without nullability information column");
auto const& vnull =
_stats_expr.push(ast::column_reference{col_index * _stats_cols_per_column + 2});
_stats_expr.push(ast::operation{ast_operator::IDENTITY, vnull});
return _stats_expr.back();
} // For all other unary operators, push and return the `_always_true` expression
else {
_stats_expr.push(ast::operation{ast_operator::IDENTITY, *_always_true});
// Propagate the `_always_true` as expression to its unary operator parent
return *_always_true;
}
}

// Push literal into the ast::tree
auto const& literal = _stats_expr.push(*dynamic_cast<ast::literal const*>(&operands[1].get()));
auto const col_index = v->get_column_index();
auto const& literal = _stats_expr.push(*dynamic_cast<ast::literal const*>(&operands[1].get()));
switch (op) {
/* transform to stats conditions. op(col, literal)
col1 == val --> vmin <= val && vmax >= val
Expand All @@ -155,17 +169,21 @@ std::reference_wrapper<ast::expression const> stats_expression_converter::visit(
col1 <= val --> vmin <= val
*/
case ast_operator::EQUAL: {
auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2});
auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1});
auto const& vmin =
_stats_expr.push(ast::column_reference{col_index * _stats_cols_per_column});
auto const& vmax =
_stats_expr.push(ast::column_reference{col_index * _stats_cols_per_column + 1});
_stats_expr.push(ast::operation{
ast::ast_operator::LOGICAL_AND,
_stats_expr.push(ast::operation{ast_operator::GREATER_EQUAL, vmax, literal}),
_stats_expr.push(ast::operation{ast_operator::LESS_EQUAL, vmin, literal})});
break;
}
case ast_operator::NOT_EQUAL: {
auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2});
auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1});
auto const& vmin =
_stats_expr.push(ast::column_reference{col_index * _stats_cols_per_column});
auto const& vmax =
_stats_expr.push(ast::column_reference{col_index * _stats_cols_per_column + 1});
_stats_expr.push(
ast::operation{ast_operator::LOGICAL_OR,
_stats_expr.push(ast::operation{ast_operator::NOT_EQUAL, vmin, vmax}),
Expand All @@ -174,13 +192,15 @@ std::reference_wrapper<ast::expression const> stats_expression_converter::visit(
}
case ast_operator::LESS: [[fallthrough]];
case ast_operator::LESS_EQUAL: {
auto const& vmin = _stats_expr.push(ast::column_reference{col_index * 2});
auto const& vmin =
_stats_expr.push(ast::column_reference{col_index * _stats_cols_per_column});
_stats_expr.push(ast::operation{op, vmin, literal});
break;
}
case ast_operator::GREATER: [[fallthrough]];
case ast_operator::GREATER_EQUAL: {
auto const& vmax = _stats_expr.push(ast::column_reference{col_index * 2 + 1});
auto const& vmax =
_stats_expr.push(ast::column_reference{col_index * _stats_cols_per_column + 1});
_stats_expr.push(ast::operation{op, vmax, literal});
break;
}
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/io/parquet/stats_filter_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ class stats_caster_base {
return {std::move(d_chars), std::move(d_offsets), std::move(d_sizes)};
}

std::unique_ptr<column> inline to_device(cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
[[nodiscard]] std::unique_ptr<column> inline to_device(cudf::data_type dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
if constexpr (std::is_same_v<T, string_view>) {
auto [d_chars, d_offsets, _] = make_strings_children(val, chars, stream, mr);
Expand Down Expand Up @@ -266,7 +266,7 @@ class stats_columns_collector : public ast::detail::expression_transformer {
*
* @return Boolean vector indicating input columns that can participate in stats based filtering
*/
thrust::host_vector<bool> get_stats_columns_mask() &&;
std::pair<thrust::host_vector<bool>, bool> get_stats_columns_mask() &&;

protected:
std::vector<std::reference_wrapper<ast::expression const>> visit_operands(
Expand All @@ -276,19 +276,22 @@ class stats_columns_collector : public ast::detail::expression_transformer {

private:
thrust::host_vector<bool> _columns_mask;
bool _has_is_null_operator = false;
};

/**
* @brief Converts AST expression to StatsAST for comparing with column statistics
*
* This is used in row group filtering based on predicate.
* statistics min value of a column is referenced by column_index*2
* statistics max value of a column is referenced by column_index*2+1
* statistics min value of a column is referenced by column_index*3
* statistics max value of a column is referenced by column_index*3+1
* statistics is_null value of a column is referenced by column_index*3+2
*/
class stats_expression_converter : public stats_columns_collector {
public:
stats_expression_converter(ast::expression const& expr,
size_type num_columns,
bool has_is_null_operator,
rmm::cuda_stream_view stream);

// Bring all overrides of `visit` from stats_columns_collector into scope
Expand All @@ -313,6 +316,7 @@ class stats_expression_converter : public stats_columns_collector {

private:
ast::tree _stats_expr;
cudf::size_type _stats_cols_per_column;
std::unique_ptr<cudf::numeric_scalar<bool>> _always_true_scalar;
std::unique_ptr<ast::literal> _always_true;
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/io/experimental/hybrid_scan_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ auto apply_parquet_filters(cudf::host_span<uint8_t const> file_buffer_span,
std::vector<rmm::device_buffer> dictionary_page_buffers =
fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream, mr);

// NOT YET IMPLEMENTED - Filter row groups with dictionary pages
// Filter row groups with dictionary pages
dictionary_page_filtered_row_group_indices = reader->filter_row_groups_with_dictionary_pages(
dictionary_page_buffers, current_row_group_indices, options, stream);

Expand Down
Loading
Loading