Skip to content

Commit bdcf7a9

Browse files
committed
input format parquet.
1 parent 830b1b9 commit bdcf7a9

File tree

1 file changed

+194
-18
lines changed

1 file changed

+194
-18
lines changed

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs

Lines changed: 194 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,38 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::fmt::Debug;
1617
use std::fmt::Formatter;
18+
use std::io::Cursor;
19+
use std::io::Read;
20+
use std::io::Seek;
21+
use std::mem;
1722
use std::sync::Arc;
1823

24+
use common_arrow::arrow::array::Array;
25+
use common_arrow::arrow::chunk::Chunk;
26+
use common_arrow::arrow::datatypes::Field;
27+
use common_arrow::arrow::io::parquet::read;
28+
use common_arrow::arrow::io::parquet::read::read_columns;
29+
use common_arrow::arrow::io::parquet::read::to_deserializer;
30+
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
31+
use common_arrow::parquet::metadata::ColumnChunkMetaData;
32+
use common_arrow::parquet::metadata::FileMetaData;
1933
use common_arrow::parquet::metadata::RowGroupMetaData;
34+
use common_arrow::parquet::read::read_metadata;
2035
use common_base::base::tokio::sync::mpsc::Receiver;
2136
use common_datablocks::DataBlock;
37+
use common_datavalues::remove_nullable;
38+
use common_datavalues::DataField;
39+
use common_datavalues::DataSchemaRef;
40+
use common_exception::ErrorCode;
2241
use common_exception::Result;
2342
use common_pipeline_core::Pipeline;
2443
use opendal::Object;
44+
use similar_asserts::traits::MakeDiff;
2545

46+
use crate::processors::sources::input_formats::delimiter::RecordDelimiter;
2647
use crate::processors::sources::input_formats::input_context::InputContext;
2748
use crate::processors::sources::input_formats::input_format::FileInfo;
2849
use crate::processors::sources::input_formats::input_format::InputData;
@@ -33,32 +54,45 @@ use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
3354
use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch;
3455
use crate::processors::sources::input_formats::InputFormat;
3556

36-
struct InputFormatParquet;
57+
pub struct InputFormatParquet;
3758

3859
#[async_trait::async_trait]
3960
impl InputFormat for InputFormatParquet {
61+
fn default_record_delimiter(&self) -> RecordDelimiter {
62+
RecordDelimiter::Crlf
63+
}
64+
65+
fn default_field_delimiter(&self) -> u8 {
66+
b'_'
67+
}
68+
4069
async fn read_file_meta(
4170
&self,
42-
obj: &Object,
43-
size: usize,
71+
_obj: &Object,
72+
_size: usize,
4473
) -> Result<Option<Arc<dyn InputData>>> {
45-
todo!()
74+
// todo(youngsofun): execute_copy_aligned
75+
Ok(None)
4676
}
4777

4878
async fn read_split_meta(
4979
&self,
50-
obj: &Object,
51-
split_info: &SplitInfo,
80+
_obj: &Object,
81+
_split_info: &SplitInfo,
5282
) -> Result<Option<Box<dyn InputData>>> {
53-
todo!()
83+
Ok(None)
5484
}
5585

56-
fn split_files(&self, file_infos: Vec<FileInfo>, split_size: usize) -> Vec<SplitInfo> {
57-
todo!()
86+
fn split_files(&self, file_infos: Vec<FileInfo>, _split_size: usize) -> Vec<SplitInfo> {
87+
file_infos
88+
.into_iter()
89+
.map(SplitInfo::from_file_info)
90+
.collect()
5891
}
5992

6093
fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
61-
todo!()
94+
// todo(youngsofun): execute_copy_aligned
95+
ParquetFormatPipe::execute_copy_with_aligner(ctx, pipeline)
6296
}
6397

6498
fn exec_stream(
@@ -67,7 +101,7 @@ impl InputFormat for InputFormatParquet {
67101
pipeline: &mut Pipeline,
68102
input: Receiver<StreamingReadBatch>,
69103
) -> Result<()> {
70-
todo!()
104+
ParquetFormatPipe::execute_stream(ctx, pipeline, input)
71105
}
72106
}
73107

@@ -81,11 +115,58 @@ impl InputFormatPipe for ParquetFormatPipe {
81115
type BlockBuilder = ParquetBlockBuilder;
82116
}
83117

84-
pub struct SplitMeta {
85-
row_groups: Vec<RowGroupMetaData>,
118+
pub struct RowGroupInMemory {
119+
pub meta: RowGroupMetaData,
120+
pub fields: Arc<Vec<Field>>,
121+
pub field_meta_indexes: Vec<Vec<usize>>,
122+
pub field_arrays: Vec<Vec<Vec<u8>>>,
86123
}
87124

88-
pub struct RowGroupInMemory {}
125+
impl RowGroupInMemory {
126+
fn read<R: Read + Seek>(
127+
reader: &mut R,
128+
meta: RowGroupMetaData,
129+
fields: Arc<Vec<Field>>,
130+
) -> Result<Self> {
131+
let field_names = fields.iter().map(|x| x.name.as_str()).collect::<Vec<_>>();
132+
let field_meta_indexes = split_column_metas_by_field(meta.columns(), &field_names);
133+
let mut filed_arrays = vec![];
134+
for field_name in field_names {
135+
let meta_data = read_columns(reader, meta.columns(), field_name)?;
136+
let data = meta_data.into_iter().map(|t| t.1).collect::<Vec<_>>();
137+
filed_arrays.push(data)
138+
}
139+
Ok(Self {
140+
meta,
141+
field_meta_indexes,
142+
field_arrays: filed_arrays,
143+
fields,
144+
})
145+
}
146+
147+
fn get_arrow_chunk(&mut self) -> Result<Chunk<Box<dyn Array>>> {
148+
let mut column_chunks = vec![];
149+
let field_arrays = mem::take(&mut self.field_arrays);
150+
for (f, datas) in field_arrays.into_iter().enumerate() {
151+
let meta_iters = self.field_meta_indexes[f]
152+
.iter()
153+
.map(|c| &self.meta.columns()[*c]);
154+
let meta_data = meta_iters.zip(datas.into_iter()).collect::<Vec<_>>();
155+
let array_iters = to_deserializer(
156+
meta_data,
157+
self.fields[f].clone(),
158+
self.meta.num_rows() as usize,
159+
None,
160+
)?;
161+
column_chunks.push(array_iters);
162+
}
163+
match RowGroupDeserializer::new(column_chunks, self.meta.num_rows(), None).next() {
164+
None => Err(ErrorCode::ParquetError("fail to get a chunk")),
165+
Some(Ok(chunk)) => Ok(chunk),
166+
Some(Err(e)) => Err(ErrorCode::ParquetError(e.to_string())),
167+
}
168+
}
169+
}
89170

90171
impl Debug for RowGroupInMemory {
91172
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
@@ -96,6 +177,7 @@ impl Debug for RowGroupInMemory {
96177
#[derive(Debug)]
97178
pub enum ReadBatch {
98179
Buffer(Vec<u8>),
180+
#[allow(unused)]
99181
RowGroup(RowGroupInMemory),
100182
}
101183

@@ -116,23 +198,117 @@ impl BlockBuilderTrait for ParquetBlockBuilder {
116198
ParquetBlockBuilder { ctx }
117199
}
118200

119-
fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
120-
todo!()
201+
fn deserialize(&mut self, mut batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
202+
if let Some(rg) = batch.as_mut() {
203+
let chunk = rg.get_arrow_chunk()?;
204+
let block = DataBlock::from_chunk(&self.ctx.schema, &chunk)?;
205+
Ok(vec![block])
206+
} else {
207+
Ok(vec![])
208+
}
121209
}
122210
}
123211

124212
pub struct AligningState {
213+
ctx: Arc<InputContext>,
214+
split_info: SplitInfo,
125215
buffers: Vec<Vec<u8>>,
126216
}
127217

128218
impl AligningStateTrait for AligningState {
129219
type Pipe = ParquetFormatPipe;
130220

131221
fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
132-
todo!()
222+
Ok(AligningState {
223+
ctx: ctx.clone(),
224+
split_info: split_info.clone(),
225+
buffers: vec![],
226+
})
133227
}
134228

135229
fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
136-
todo!()
230+
if let Some(rb) = read_batch {
231+
if let ReadBatch::Buffer(b) = rb {
232+
self.buffers.push(b)
233+
};
234+
Ok(vec![])
235+
} else {
236+
let file_in_memory = self.buffers.concat();
237+
let size = file_in_memory.len();
238+
tracing::debug!(
239+
"aligning parquet file {} of {} bytes",
240+
self.split_info.file_info.path,
241+
size,
242+
);
243+
let mut cursor = Cursor::new(file_in_memory);
244+
let file_meta =
245+
read_metadata(&mut cursor).map_err(|e| ErrorCode::ParquetError(e.to_string()))?;
246+
let read_fields = Arc::new(get_fields(&file_meta, &self.ctx.schema)?);
247+
248+
let mut row_batches = Vec::with_capacity(file_meta.row_groups.len());
249+
for row_group in file_meta.row_groups.into_iter() {
250+
row_batches.push(RowGroupInMemory::read(
251+
&mut cursor,
252+
row_group,
253+
read_fields.clone(),
254+
)?)
255+
}
256+
tracing::info!(
257+
"align parquet file {} of {} bytes to {} row groups",
258+
self.split_info.file_info.path,
259+
size,
260+
row_batches.len()
261+
);
262+
Ok(row_batches)
263+
}
264+
}
265+
}
266+
267+
fn get_fields(file_meta: &FileMetaData, schema: &DataSchemaRef) -> Result<Vec<Field>> {
268+
let infer_schema = read::infer_schema(file_meta)?;
269+
let mut read_fields = Vec::with_capacity(schema.num_fields());
270+
for f in schema.fields().iter() {
271+
if let Some(m) = infer_schema
272+
.fields
273+
.iter()
274+
.filter(|c| c.name.eq_ignore_ascii_case(f.name()))
275+
.last()
276+
{
277+
let tf = DataField::from(m);
278+
if remove_nullable(tf.data_type()) != remove_nullable(f.data_type()) {
279+
let pair = (f, m);
280+
let diff = pair.make_diff("expected_field", "infer_field");
281+
return Err(ErrorCode::ParquetError(format!(
282+
"parquet schema mismatch, differ: {}",
283+
diff
284+
)));
285+
}
286+
287+
read_fields.push(m.clone());
288+
} else {
289+
return Err(ErrorCode::ParquetError(format!(
290+
"schema field size mismatch, expected to find column: {}",
291+
f.name()
292+
)));
293+
}
137294
}
295+
Ok(read_fields)
296+
}
297+
298+
pub fn split_column_metas_by_field(
299+
columns: &[ColumnChunkMetaData],
300+
field_names: &[&str],
301+
) -> Vec<Vec<usize>> {
302+
let mut r = field_names.iter().map(|_| vec![]).collect::<Vec<_>>();
303+
let d = field_names
304+
.iter()
305+
.enumerate()
306+
.map(|(i, name)| (name, i))
307+
.collect::<HashMap<_, _>>();
308+
columns.iter().enumerate().for_each(|(col_i, x)| {
309+
if let Some(field_i) = d.get(&x.descriptor().path_in_schema[0].as_str()) {
310+
r[*field_i].push(col_i);
311+
}
312+
});
313+
r
138314
}

0 commit comments

Comments
 (0)