Skip to content

Commit 8e4308d

Browse files
authored
Implement basic Parquet data file reading capability (#207)
* feat: TableScan parquet file read to RecordBatch stream * chore: add inline hinting and fix incorrect comment * refactor: extract record batch reader * refactor: rename `FileRecordBatchReader` to `ArrowReader` * refactor: rename file_record_batch_reader.rs to arrow.rs * refactor: move `batch_size` param to `TableScanBuilder` * refactor: rename `TableScan.execute` to `to_arrow` * refactor: use builder pattern to create `ArrowReader`
1 parent f61d475 commit 8e4308d

File tree

8 files changed

+312
-97
lines changed

8 files changed

+312
-97
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@ apache-avro = "0.16"
3434
arrow-arith = { version = ">=46" }
3535
arrow-array = { version = ">=46" }
3636
arrow-schema = { version = ">=46" }
37+
async-stream = "0.3.5"
3738
async-trait = "0.1"
3839
bimap = "0.6"
3940
bitvec = "1.0.1"
41+
bytes = "1.5"
4042
chrono = "0.4"
4143
derive_builder = "0.20.0"
4244
either = "1"
@@ -52,6 +54,7 @@ murmur3 = "0.5.2"
5254
once_cell = "1"
5355
opendal = "0.45"
5456
ordered-float = "4.0.0"
57+
parquet = "50"
5558
pretty_assertions = "1.4.0"
5659
port_scanner = "0.1.5"
5760
reqwest = { version = "^0.11", features = ["json"] }

crates/iceberg/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@ apache-avro = { workspace = true }
3434
arrow-arith = { workspace = true }
3535
arrow-array = { workspace = true }
3636
arrow-schema = { workspace = true }
37+
async-stream = { workspace = true }
3738
async-trait = { workspace = true }
3839
bimap = { workspace = true }
3940
bitvec = { workspace = true }
41+
bytes = { workspace = true }
4042
chrono = { workspace = true }
4143
derive_builder = { workspace = true }
4244
either = { workspace = true }
@@ -48,6 +50,7 @@ murmur3 = { workspace = true }
4850
once_cell = { workspace = true }
4951
opendal = { workspace = true }
5052
ordered-float = { workspace = true }
53+
parquet = { workspace = true, features = ["async"] }
5154
reqwest = { workspace = true }
5255
rust_decimal = { workspace = true }
5356
serde = { workspace = true }
@@ -56,6 +59,7 @@ serde_derive = { workspace = true }
5659
serde_json = { workspace = true }
5760
serde_repr = { workspace = true }
5861
serde_with = { workspace = true }
62+
tokio = { workspace = true }
5963
typed-builder = { workspace = true }
6064
url = { workspace = true }
6165
urlencoding = { workspace = true }

crates/iceberg/src/arrow.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Parquet file data reader
19+
20+
use async_stream::try_stream;
21+
use futures::stream::StreamExt;
22+
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
23+
24+
use crate::io::FileIO;
25+
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
26+
use crate::spec::SchemaRef;
27+
28+
/// Builder to create ArrowReader
29+
pub struct ArrowReaderBuilder {
30+
batch_size: Option<usize>,
31+
file_io: FileIO,
32+
schema: SchemaRef,
33+
}
34+
35+
impl ArrowReaderBuilder {
36+
/// Create a new ArrowReaderBuilder
37+
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
38+
ArrowReaderBuilder {
39+
batch_size: None,
40+
file_io,
41+
schema,
42+
}
43+
}
44+
45+
/// Sets the desired size of batches in the response
46+
/// to something other than the default
47+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
48+
self.batch_size = Some(batch_size);
49+
self
50+
}
51+
52+
/// Build the ArrowReader.
53+
pub fn build(self) -> ArrowReader {
54+
ArrowReader {
55+
batch_size: self.batch_size,
56+
schema: self.schema,
57+
file_io: self.file_io,
58+
}
59+
}
60+
}
61+
62+
/// Reads data from Parquet files
63+
pub struct ArrowReader {
64+
batch_size: Option<usize>,
65+
#[allow(dead_code)]
66+
schema: SchemaRef,
67+
file_io: FileIO,
68+
}
69+
70+
impl ArrowReader {
71+
/// Take a stream of FileScanTasks and reads all the files.
72+
/// Returns a stream of Arrow RecordBatches containing the data from the files
73+
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
74+
let file_io = self.file_io.clone();
75+
76+
Ok(try_stream! {
77+
while let Some(Ok(task)) = tasks.next().await {
78+
79+
let projection_mask = self.get_arrow_projection_mask(&task);
80+
81+
let parquet_reader = file_io
82+
.new_input(task.data_file().file_path())?
83+
.reader()
84+
.await?;
85+
86+
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
87+
.await?
88+
.with_projection(projection_mask);
89+
90+
if let Some(batch_size) = self.batch_size {
91+
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
92+
}
93+
94+
let mut batch_stream = batch_stream_builder.build()?;
95+
96+
while let Some(batch) = batch_stream.next().await {
97+
yield batch?;
98+
}
99+
}
100+
}
101+
.boxed())
102+
}
103+
104+
fn get_arrow_projection_mask(&self, _task: &FileScanTask) -> ProjectionMask {
105+
// TODO: full implementation
106+
ProjectionMask::all()
107+
}
108+
}

crates/iceberg/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,12 @@ define_from_err!(
325325
"Failed to convert decimal literal to rust decimal"
326326
);
327327

328+
define_from_err!(
329+
parquet::errors::ParquetError,
330+
ErrorKind::Unexpected,
331+
"Failed to read a Parquet file"
332+
);
333+
328334
define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed");
329335

330336
/// Helper macro to check arguments.

crates/iceberg/src/io.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
5454
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
5555
use once_cell::sync::Lazy;
5656
use opendal::{Operator, Scheme};
57+
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
5758
use url::Url;
5859

5960
/// Following are arguments for [s3 file io](https://py.iceberg.apache.org/configuration/#s3).
@@ -215,9 +216,12 @@ pub struct InputFile {
215216
}
216217

217218
/// Trait for reading file.
218-
pub trait FileRead: AsyncRead + AsyncSeek {}
219+
pub trait FileRead: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek {}
219220

220-
impl<T> FileRead for T where T: AsyncRead + AsyncSeek {}
221+
impl<T> FileRead for T where
222+
T: AsyncRead + AsyncSeek + Send + Unpin + TokioAsyncRead + TokioAsyncSeek
223+
{
224+
}
221225

222226
impl InputFile {
223227
/// Absolute path to root uri.

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,5 @@ pub mod expr;
5252
pub mod transaction;
5353
pub mod transform;
5454

55+
pub mod arrow;
5556
pub mod writer;

0 commit comments

Comments
 (0)