Skip to content

Commit 6086725

Browse files
authored
brnach-3.0: [Test][Fix](parquet-reader) Add parquet decoder unit tests and fix bugs by these tests. (#50452)
Cherry-pick from #49182
1 parent 7167b16 commit 6086725

29 files changed

+4077
-243
lines changed

be/src/vec/exec/format/parquet/bool_plain_decoder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ class BoolPlainDecoder final : public Decoder {
4343
~BoolPlainDecoder() override = default;
4444

4545
// Set the data to be decoded
46-
void set_data(Slice* data) override {
46+
Status set_data(Slice* data) override {
4747
bool_values_.Reset((const uint8_t*)data->data, data->size);
4848
num_unpacked_values_ = 0;
4949
unpacked_value_idx_ = 0;
5050
_offset = 0;
51+
return Status::OK();
5152
}
5253

5354
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,

be/src/vec/exec/format/parquet/bool_rle_decoder.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,29 +30,29 @@
3030
#include "vec/exec/format/parquet/parquet_common.h"
3131

3232
namespace doris::vectorized {
33-
void BoolRLEDecoder::set_data(Slice* slice) {
33+
Status BoolRLEDecoder::set_data(Slice* slice) {
3434
_data = slice;
3535
_num_bytes = slice->size;
3636
_offset = 0;
37-
_current_value_idx = 0;
3837
if (_num_bytes < 4) {
39-
LOG(FATAL) << "Received invalid length : " + std::to_string(_num_bytes) +
40-
" (corrupt data page?)";
38+
return Status::IOError("Received invalid length : " + std::to_string(_num_bytes) +
39+
" (corrupt data page?)");
4140
}
4241
// Load the first 4 bytes in little-endian, which indicates the length
4342
const uint8_t* data = reinterpret_cast<const uint8_t*>(_data->data);
4443
uint32_t num_bytes = decode_fixed32_le(data);
4544
if (num_bytes > static_cast<uint32_t>(_num_bytes - 4)) {
46-
LOG(FATAL) << ("Received invalid number of bytes : " + std::to_string(num_bytes) +
47-
" (corrupt data page?)");
45+
return Status::IOError("Received invalid number of bytes : " + std::to_string(num_bytes) +
46+
" (corrupt data page?)");
4847
}
4948
_num_bytes = num_bytes;
5049
auto decoder_data = data + 4;
5150
_decoder = RleDecoder<uint8_t>(decoder_data, num_bytes, 1);
51+
return Status::OK();
5252
}
5353

5454
Status BoolRLEDecoder::skip_values(size_t num_values) {
55-
_current_value_idx += num_values;
55+
_decoder.Skip(num_values);
5656
return Status::OK();
5757
}
5858

@@ -76,15 +76,16 @@ Status BoolRLEDecoder::_decode_values(MutableColumnPtr& doris_column, DataTypePt
7676
if (!_decoder.get_values(_values.data(), max_values)) {
7777
return Status::IOError("Can't read enough booleans in rle decoder");
7878
}
79+
size_t current_value_idx = 0;
7980
ColumnSelectVector::DataReadType read_type;
8081
while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
8182
switch (read_type) {
8283
case ColumnSelectVector::CONTENT: {
8384
bool value; // Can't use uint8_t directly, we should correct it.
8485
for (size_t i = 0; i < run_length; ++i) {
85-
DCHECK(_current_value_idx < max_values)
86-
<< _current_value_idx << " vs. " << max_values;
87-
value = _values[_current_value_idx++];
86+
DCHECK(current_value_idx < max_values)
87+
<< current_value_idx << " vs. " << max_values;
88+
value = _values[current_value_idx++];
8889
column_data[data_index++] = (UInt8)value;
8990
}
9091
break;
@@ -94,15 +95,14 @@ Status BoolRLEDecoder::_decode_values(MutableColumnPtr& doris_column, DataTypePt
9495
break;
9596
}
9697
case ColumnSelectVector::FILTERED_CONTENT: {
97-
_current_value_idx += run_length;
98+
current_value_idx += run_length;
9899
break;
99100
}
100101
case ColumnSelectVector::FILTERED_NULL: {
101102
break;
102103
}
103104
}
104105
}
105-
_current_value_idx = 0;
106106
return Status::OK();
107107
}
108108
} // namespace doris::vectorized

be/src/vec/exec/format/parquet/bool_rle_decoder.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class BoolRLEDecoder final : public Decoder {
4040
BoolRLEDecoder() = default;
4141
~BoolRLEDecoder() override = default;
4242

43-
void set_data(Slice* slice) override;
43+
Status set_data(Slice* slice) override;
4444

4545
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
4646
ColumnSelectVector& select_vector, bool is_dict_filter) override;
@@ -55,6 +55,5 @@ class BoolRLEDecoder final : public Decoder {
5555
RleDecoder<uint8_t> _decoder;
5656
std::vector<uint8_t> _values;
5757
size_t _num_bytes;
58-
size_t _current_value_idx = 0;
5958
};
60-
} // namespace doris::vectorized
59+
} // namespace doris::vectorized

be/src/vec/exec/format/parquet/decoder.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ class Decoder {
5959
void set_type_length(int32_t type_length) { _type_length = type_length; }
6060

6161
// Set the data to be decoded
62-
virtual void set_data(Slice* data) {
62+
virtual Status set_data(Slice* data) {
6363
_data = data;
6464
_offset = 0;
65+
return Status::OK();
6566
}
6667

6768
// Write the decoded values batch to doris's column
@@ -95,13 +96,14 @@ class BaseDictDecoder : public Decoder {
9596
~BaseDictDecoder() override = default;
9697

9798
// Set the data to be decoded
98-
void set_data(Slice* data) override {
99+
Status set_data(Slice* data) override {
99100
_data = data;
100101
_offset = 0;
101102
uint8_t bit_width = *data->data;
102103
_index_batch_decoder = std::make_unique<RleBatchDecoder<uint32_t>>(
103104
reinterpret_cast<uint8_t*>(data->data) + 1, static_cast<int>(data->size) - 1,
104105
bit_width);
106+
return Status::OK();
105107
}
106108

107109
protected:
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "vec/exec/format/parquet/delta_bit_pack_decoder.h"
19+
20+
namespace doris::vectorized {
21+
Status DeltaLengthByteArrayDecoder::_decode_lengths() {
22+
RETURN_IF_ERROR(_len_decoder.set_bit_reader(_bit_reader));
23+
// get the number of encoded lengths
24+
int num_length = _len_decoder.valid_values_count();
25+
_buffered_length.resize(num_length);
26+
27+
// decode all the lengths. all the lengths are buffered in buffered_length_.
28+
int ret;
29+
RETURN_IF_ERROR(_len_decoder.decode(_buffered_length.data(), num_length, &ret));
30+
DCHECK_EQ(ret, num_length);
31+
_length_idx = 0;
32+
_num_valid_values = num_length;
33+
return Status::OK();
34+
}
35+
36+
Status DeltaLengthByteArrayDecoder::_get_internal(Slice* buffer, int max_values,
37+
int* out_num_values) {
38+
// Decode up to `max_values` strings into an internal buffer
39+
// and reference them into `buffer`.
40+
max_values = std::min(max_values, _num_valid_values);
41+
if (max_values == 0) {
42+
*out_num_values = 0;
43+
return Status::OK();
44+
}
45+
46+
int32_t data_size = 0;
47+
const int32_t* length_ptr = _buffered_length.data() + _length_idx;
48+
for (int i = 0; i < max_values; ++i) {
49+
int32_t len = length_ptr[i];
50+
if (PREDICT_FALSE(len < 0)) {
51+
return Status::InvalidArgument("Negative string delta length");
52+
}
53+
buffer[i].size = len;
54+
if (common::add_overflow(data_size, len, data_size)) {
55+
return Status::InvalidArgument("Excess expansion in DELTA_(LENGTH_)BYTE_ARRAY");
56+
}
57+
}
58+
_length_idx += max_values;
59+
60+
_buffered_data.resize(data_size);
61+
char* data_ptr = _buffered_data.data();
62+
for (int j = 0; j < data_size; j++) {
63+
if (!_bit_reader->GetValue(8, data_ptr + j)) {
64+
return Status::IOError("Get length bytes EOF");
65+
}
66+
}
67+
68+
for (int i = 0; i < max_values; ++i) {
69+
buffer[i].data = data_ptr;
70+
data_ptr += buffer[i].size;
71+
}
72+
// this->num_values_ -= max_values;
73+
_num_valid_values -= max_values;
74+
*out_num_values = max_values;
75+
return Status::OK();
76+
}
77+
78+
Status DeltaByteArrayDecoder::_get_internal(Slice* buffer, int max_values, int* out_num_values) {
79+
// Decode up to `max_values` strings into an internal buffer
80+
// and reference them into `buffer`.
81+
max_values = std::min(max_values, _num_valid_values);
82+
if (max_values == 0) {
83+
*out_num_values = max_values;
84+
return Status::OK();
85+
}
86+
87+
int suffix_read;
88+
RETURN_IF_ERROR(_suffix_decoder.decode(buffer, max_values, &suffix_read));
89+
if (PREDICT_FALSE(suffix_read != max_values)) {
90+
return Status::IOError("Read {}, expecting {} from suffix decoder",
91+
std::to_string(suffix_read), std::to_string(max_values));
92+
}
93+
94+
int64_t data_size = 0;
95+
const int32_t* prefix_len_ptr = _buffered_prefix_length.data() + _prefix_len_offset;
96+
for (int i = 0; i < max_values; ++i) {
97+
if (PREDICT_FALSE(prefix_len_ptr[i] < 0)) {
98+
return Status::InvalidArgument("negative prefix length in DELTA_BYTE_ARRAY");
99+
}
100+
if (PREDICT_FALSE(common::add_overflow(data_size, static_cast<int64_t>(prefix_len_ptr[i]),
101+
data_size) ||
102+
common::add_overflow(data_size, static_cast<int64_t>(buffer[i].size),
103+
data_size))) {
104+
return Status::InvalidArgument("excess expansion in DELTA_BYTE_ARRAY");
105+
}
106+
}
107+
_buffered_data.resize(data_size);
108+
109+
std::string_view prefix {_last_value};
110+
111+
char* data_ptr = _buffered_data.data();
112+
for (int i = 0; i < max_values; ++i) {
113+
if (PREDICT_FALSE(static_cast<size_t>(prefix_len_ptr[i]) > prefix.length())) {
114+
return Status::InvalidArgument("prefix length too large in DELTA_BYTE_ARRAY");
115+
}
116+
memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]);
117+
// buffer[i] currently points to the string suffix
118+
memcpy(data_ptr + prefix_len_ptr[i], buffer[i].data, buffer[i].size);
119+
buffer[i].data = data_ptr;
120+
buffer[i].size += prefix_len_ptr[i];
121+
data_ptr += buffer[i].size;
122+
prefix = std::string_view {buffer[i].data, buffer[i].size};
123+
}
124+
_prefix_len_offset += max_values;
125+
_num_valid_values -= max_values;
126+
_last_value = std::string {prefix};
127+
128+
if (_num_valid_values == 0) {
129+
_last_value_in_previous_page = _last_value;
130+
}
131+
*out_num_values = max_values;
132+
return Status::OK();
133+
}
134+
} // namespace doris::vectorized

0 commit comments

Comments
 (0)