Skip to content

Commit 33444ba

Browse files
authored
High level interface for experimental PQ reader and implementation of metadata APIs (#18480)
Contributes to #17896. Part of #18011. This PR adds the high level interface (APIs) to a new experimental Parquet reader optimized for highly selective (hybrid scan) queries. The PR also adds implementations for the basic metadata related APIs of the new reader such as reading the file footer and PageIndex. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - David Wendt (https://github.com/davidwendt) - Vyas Ramasubramani (https://github.com/vyasr) - Vukasin Milovanovic (https://github.com/vuule) - https://github.com/nvdbaranec URL: #18480
1 parent 4a1c501 commit 33444ba

13 files changed

+1845
-19
lines changed

cpp/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,9 @@ add_library(
521521
src/io/parquet/compact_protocol_reader.cpp
522522
src/io/parquet/compact_protocol_writer.cpp
523523
src/io/parquet/decode_preprocess.cu
524+
src/io/parquet/experimental/hybrid_scan.cpp
525+
src/io/parquet/experimental/hybrid_scan_helpers.cpp
526+
src/io/parquet/experimental/hybrid_scan_impl.cpp
524527
src/io/parquet/page_data.cu
525528
src/io/parquet/chunk_dict.cu
526529
src/io/parquet/page_enc.cu

cpp/include/cudf/io/experimental/hybrid_scan.hpp

Lines changed: 457 additions & 0 deletions
Large diffs are not rendered by default.

cpp/include/cudf/io/parquet_schema.hpp

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,6 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17+
/**
18+
* @file parquet_schema.hpp
19+
* @brief Parquet footer schema structs
20+
*/
21+
1722
#pragma once
1823

1924
#include <cudf/types.hpp>
@@ -26,8 +31,12 @@
2631
#include <vector>
2732

2833
namespace CUDF_EXPORT cudf {
29-
3034
namespace io::parquet {
35+
/**
36+
* @addtogroup io_types
37+
* @{
38+
* @file
39+
*/
3140

3241
/**
3342
* @brief Basic data types in Parquet, determines how data is physically stored
@@ -195,7 +204,7 @@ struct DecimalType {
195204
*/
196205
struct TimeUnit {
197206
/// Available time units
198-
enum Type { UNDEFINED, MILLIS, MICROS, NANOS };
207+
enum Type : uint8_t { UNDEFINED, MILLIS, MICROS, NANOS };
199208
/// Time unit type
200209
Type type;
201210
};
@@ -210,7 +219,7 @@ struct TimeType {
210219
/// Writer option overrides this to default
211220
bool isAdjustedToUTC = true;
212221
/// Time unit type
213-
TimeUnit unit = {TimeUnit::MILLIS};
222+
TimeUnit unit = {TimeUnit::Type::MILLIS};
214223
};
215224

216225
/**
@@ -223,7 +232,7 @@ struct TimestampType {
223232
/// Writer option overrides this to default
224233
bool isAdjustedToUTC = true;
225234
/// Timestamp's time unit
226-
TimeUnit unit = {TimeUnit::MILLIS};
235+
TimeUnit unit = {TimeUnit::Type::MILLIS};
227236
};
228237

229238
/**
@@ -243,7 +252,7 @@ struct IntType {
243252
*/
244253
struct LogicalType {
245254
/// Logical type annotations to replace ConvertedType.
246-
enum Type {
255+
enum Type : uint8_t {
247256
UNDEFINED,
248257
STRING,
249258
MAP,
@@ -276,7 +285,7 @@ struct LogicalType {
276285
*
277286
* @param tp Logical type
278287
*/
279-
LogicalType(Type tp = UNDEFINED) : type(tp) {}
288+
LogicalType(Type tp = Type::UNDEFINED) : type(tp) {}
280289

281290
/**
282291
* @brief Constructor for Decimal logical type
@@ -409,7 +418,7 @@ struct LogicalType {
409418
*/
410419
struct ColumnOrder {
411420
/// Available column order types
412-
enum Type { UNDEFINED, TYPE_ORDER };
421+
enum Type : uint8_t { UNDEFINED, TYPE_ORDER };
413422
/// Column order type
414423
Type type;
415424
};
@@ -712,7 +721,7 @@ struct BloomFilterAlgorithm {
712721
/// Available bloom filter algorithms
713722
enum Algorithm : uint8_t { UNDEFINED, SPLIT_BLOCK };
714723
/// Bloom filter algorithm
715-
Algorithm algorithm{SPLIT_BLOCK};
724+
Algorithm algorithm{Algorithm::SPLIT_BLOCK};
716725
};
717726

718727
/**
@@ -722,7 +731,7 @@ struct BloomFilterHash {
722731
/// Available bloom filter hashers
723732
enum Hash : uint8_t { UNDEFINED, XXHASH };
724733
/// Bloom filter hasher
725-
Hash hash{XXHASH};
734+
Hash hash{Hash::XXHASH};
726735
};
727736

728737
/**
@@ -732,7 +741,7 @@ struct BloomFilterCompression {
732741
/// Available bloom filter compression types
733742
enum Compression : uint8_t { UNDEFINED, UNCOMPRESSED };
734743
/// Bloom filter compression type
735-
Compression compression{UNCOMPRESSED};
744+
Compression compression{Compression::UNCOMPRESSED};
736745
};
737746

738747
/**
@@ -924,5 +933,6 @@ struct PageHeader {
924933
DataPageHeaderV2 data_page_header_v2;
925934
};
926935

936+
/** @} */ // end of group
927937
} // namespace io::parquet
928938
} // namespace CUDF_EXPORT cudf

cpp/src/io/parquet/bloom_filter_reader.cu

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ std::vector<Type> aggregate_reader_metadata::get_parquet_types(
508508
}
509509

510510
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::apply_bloom_filters(
511-
std::vector<rmm::device_buffer>& bloom_filter_data,
511+
cudf::host_span<rmm::device_buffer> bloom_filter_data,
512512
host_span<std::vector<size_type> const> input_row_group_indices,
513513
host_span<std::vector<ast::literal*> const> literals,
514514
size_type total_row_groups,
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "hybrid_scan_impl.hpp"
18+
19+
#include <cudf/io/experimental/hybrid_scan.hpp>
20+
#include <cudf/utilities/error.hpp>
21+
22+
#include <thrust/host_vector.h>
23+
24+
namespace cudf::io::parquet::experimental {
25+
26+
hybrid_scan_reader::hybrid_scan_reader(cudf::host_span<uint8_t const> footer_bytes,
27+
parquet_reader_options const& options)
28+
: _impl{std::make_unique<detail::hybrid_scan_reader_impl>(footer_bytes, options)}
29+
{
30+
}
31+
32+
hybrid_scan_reader::~hybrid_scan_reader() = default;
33+
34+
[[nodiscard]] text::byte_range_info hybrid_scan_reader::page_index_byte_range() const
35+
{
36+
return _impl->page_index_byte_range();
37+
}
38+
39+
[[nodiscard]] FileMetaData hybrid_scan_reader::parquet_metadata() const
40+
{
41+
return _impl->parquet_metadata();
42+
}
43+
44+
void hybrid_scan_reader::setup_page_index(cudf::host_span<uint8_t const> page_index_bytes) const
45+
{
46+
return _impl->setup_page_index(page_index_bytes);
47+
}
48+
49+
std::vector<cudf::size_type> hybrid_scan_reader::all_row_groups(
50+
parquet_reader_options const& options) const
51+
{
52+
CUDF_EXPECTS(options.get_row_groups().size() <= 1,
53+
"Encountered invalid size of row group indices in parquet reader options");
54+
55+
// If row groups are specified in parquet reader options, return them as is
56+
if (options.get_row_groups().size() == 1) { return options.get_row_groups().front(); }
57+
58+
return _impl->all_row_groups(options);
59+
}
60+
61+
std::vector<cudf::size_type> hybrid_scan_reader::filter_row_groups_with_stats(
62+
cudf::host_span<size_type const> row_group_indices,
63+
parquet_reader_options const& options,
64+
rmm::cuda_stream_view stream) const
65+
{
66+
// Temporary vector with row group indices from the first source
67+
auto const input_row_group_indices =
68+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
69+
70+
return _impl->filter_row_groups_with_stats(input_row_group_indices, options, stream).front();
71+
}
72+
73+
std::pair<std::vector<text::byte_range_info>, std::vector<text::byte_range_info>>
74+
hybrid_scan_reader::secondary_filters_byte_ranges(
75+
cudf::host_span<size_type const> row_group_indices, parquet_reader_options const& options) const
76+
{
77+
// Temporary vector with row group indices from the first source
78+
auto const input_row_group_indices =
79+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
80+
81+
return _impl->secondary_filters_byte_ranges(input_row_group_indices, options);
82+
}
83+
84+
std::vector<cudf::size_type> hybrid_scan_reader::filter_row_groups_with_dictionary_pages(
85+
cudf::host_span<rmm::device_buffer> dictionary_page_data,
86+
cudf::host_span<size_type const> row_group_indices,
87+
parquet_reader_options const& options,
88+
rmm::cuda_stream_view stream) const
89+
{
90+
CUDF_EXPECTS(row_group_indices.size() == dictionary_page_data.size(),
91+
"Mismatch in size of input row group indices and dictionary page device buffers");
92+
93+
// Temporary vector with row group indices from the first source
94+
auto const input_row_group_indices =
95+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
96+
97+
return _impl
98+
->filter_row_groups_with_dictionary_pages(
99+
dictionary_page_data, input_row_group_indices, options, stream)
100+
.front();
101+
}
102+
103+
std::vector<cudf::size_type> hybrid_scan_reader::filter_row_groups_with_bloom_filters(
104+
cudf::host_span<rmm::device_buffer> bloom_filter_data,
105+
cudf::host_span<size_type const> row_group_indices,
106+
parquet_reader_options const& options,
107+
rmm::cuda_stream_view stream) const
108+
{
109+
CUDF_EXPECTS(row_group_indices.size() == bloom_filter_data.size(),
110+
"Mismatch in size of input row group indices and bloom filter device buffers");
111+
112+
// Temporary vector with row group indices from the first source
113+
auto const input_row_group_indices =
114+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
115+
116+
return _impl
117+
->filter_row_groups_with_bloom_filters(
118+
bloom_filter_data, input_row_group_indices, options, stream)
119+
.front();
120+
}
121+
122+
std::pair<std::unique_ptr<cudf::column>, std::vector<thrust::host_vector<bool>>>
123+
hybrid_scan_reader::filter_data_pages_with_stats(cudf::host_span<size_type const> row_group_indices,
124+
parquet_reader_options const& options,
125+
rmm::cuda_stream_view stream,
126+
rmm::device_async_resource_ref mr) const
127+
{
128+
// Temporary vector with row group indices from the first source
129+
auto const input_row_group_indices =
130+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
131+
132+
return _impl->filter_data_pages_with_stats(input_row_group_indices, options, stream, mr);
133+
}
134+
135+
[[nodiscard]] std::vector<text::byte_range_info>
136+
hybrid_scan_reader::filter_column_chunks_byte_ranges(
137+
cudf::host_span<size_type const> row_group_indices, parquet_reader_options const& options) const
138+
{
139+
// Temporary vector with row group indices from the first source
140+
auto const input_row_group_indices =
141+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
142+
143+
return _impl->filter_column_chunks_byte_ranges(input_row_group_indices, options).first;
144+
}
145+
146+
table_with_metadata hybrid_scan_reader::materialize_filter_columns(
147+
cudf::host_span<thrust::host_vector<bool> const> data_page_mask,
148+
cudf::host_span<size_type const> row_group_indices,
149+
std::vector<rmm::device_buffer> column_chunk_buffers,
150+
cudf::mutable_column_view row_mask,
151+
parquet_reader_options const& options,
152+
rmm::cuda_stream_view stream) const
153+
{
154+
// Temporary vector with row group indices from the first source
155+
auto const input_row_group_indices =
156+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
157+
158+
return _impl->materialize_filter_columns(data_page_mask,
159+
input_row_group_indices,
160+
std::move(column_chunk_buffers),
161+
row_mask,
162+
options,
163+
stream);
164+
}
165+
166+
[[nodiscard]] std::vector<text::byte_range_info>
167+
hybrid_scan_reader::payload_column_chunks_byte_ranges(
168+
cudf::host_span<size_type const> row_group_indices, parquet_reader_options const& options) const
169+
{
170+
auto const input_row_group_indices =
171+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
172+
173+
return _impl->payload_column_chunks_byte_ranges(input_row_group_indices, options).first;
174+
}
175+
176+
table_with_metadata hybrid_scan_reader::materialize_payload_columns(
177+
cudf::host_span<size_type const> row_group_indices,
178+
std::vector<rmm::device_buffer> column_chunk_buffers,
179+
cudf::column_view row_mask,
180+
parquet_reader_options const& options,
181+
rmm::cuda_stream_view stream) const
182+
{
183+
// Temporary vector with row group indices from the first source
184+
auto const input_row_group_indices =
185+
std::vector<std::vector<size_type>>{{row_group_indices.begin(), row_group_indices.end()}};
186+
187+
return _impl->materialize_payload_columns(
188+
input_row_group_indices, std::move(column_chunk_buffers), row_mask, options, stream);
189+
}
190+
191+
} // namespace cudf::io::parquet::experimental

0 commit comments

Comments
 (0)