Skip to content

Commit 38881e4

Browse files
committed
feat(query): new input format framework.
1 parent 75273b0 commit 38881e4

File tree

15 files changed

+1496
-0
lines changed

15 files changed

+1496
-0
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/pipeline/sources/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,22 @@ doctest = false
99
test = false
1010

1111
[dependencies]
12+
async-channel = "1.7.1"
13+
common-arrow = { path = "../../../common/arrow" }
1214
common-base = { path = "../../../common/base" }
1315
common-catalog = { path = "../../catalog" }
1416
common-datablocks = { path = "../../datablocks" }
17+
common-datavalues = { path = "../../datavalues" }
1518
common-exception = { path = "../../../common/exception" }
1619
common-formats = { path = "../../formats" }
1720
common-io = { path = "../../../common/io" }
1821
common-meta-types = { path = "../../../meta/types" }
1922
common-pipeline-core = { path = "../core" }
23+
common-settings = { path = "../../settings" }
2024
common-storage = { path = "../../../common/storage" }
2125
common-streams = { path = "../../streams" }
2226

27+
tracing = "0.1.35"
2328
async-trait = { version = "0.1.0", package = "async-trait-fn" }
2429
futures = "0.3.21"
2530
futures-util = "0.3.21"
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::ErrorCode;
16+
use common_exception::Result;
17+
18+
pub enum RecordDelimiter {
19+
CRLF,
20+
Any(u8),
21+
}
22+
23+
impl RecordDelimiter {
24+
pub fn end(&self) -> u8 {
25+
match self {
26+
RecordDelimiter::CRLF => b'\n',
27+
RecordDelimiter::Any(b) => *b,
28+
}
29+
}
30+
}
31+
32+
impl TryFrom<&str> for RecordDelimiter {
33+
type Error = ErrorCode;
34+
fn try_from(s: &str) -> Result<Self> {
35+
match s.len() {
36+
1 => Ok(RecordDelimiter::Any(s.as_bytes()[0])),
37+
2 if s.eq("\r\n") => Ok(RecordDelimiter::CRLF),
38+
_ => Err(ErrorCode::InvalidArgument(format!(
39+
"bad RecordDelimiter: '{}'",
40+
s
41+
))),
42+
}
43+
}
44+
}
45+
46+
impl Default for RecordDelimiter {
47+
fn default() -> Self {
48+
RecordDelimiter::CRLF
49+
}
50+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::Debug;
16+
use std::fmt::Formatter;
17+
use std::sync::Arc;
18+
19+
use common_arrow::parquet::metadata::RowGroupMetaData;
20+
use common_base::base::tokio::sync::mpsc::Receiver;
21+
use common_datablocks::DataBlock;
22+
use common_exception::Result;
23+
use common_pipeline_core::Pipeline;
24+
use opendal::Object;
25+
26+
use crate::processors::sources::input_formats::input_context::InputContext;
27+
use crate::processors::sources::input_formats::input_format::FileInfo;
28+
use crate::processors::sources::input_formats::input_format::InputData;
29+
use crate::processors::sources::input_formats::input_format::SplitInfo;
30+
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
31+
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
32+
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
33+
use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch;
34+
use crate::processors::sources::input_formats::InputFormat;
35+
36+
struct InputFormatParquet;
37+
38+
#[async_trait::async_trait]
39+
impl InputFormat for InputFormatParquet {
40+
async fn read_file_meta(
41+
&self,
42+
obj: &Object,
43+
size: usize,
44+
) -> Result<Option<Arc<dyn InputData>>> {
45+
todo!()
46+
}
47+
48+
async fn read_split_meta(
49+
&self,
50+
obj: &Object,
51+
split_info: &SplitInfo,
52+
) -> Result<Option<Box<dyn InputData>>> {
53+
todo!()
54+
}
55+
56+
fn split_files(&self, file_infos: Vec<FileInfo>, split_size: usize) -> Vec<SplitInfo> {
57+
todo!()
58+
}
59+
60+
fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
61+
todo!()
62+
}
63+
64+
fn exec_stream(
65+
&self,
66+
ctx: Arc<InputContext>,
67+
pipeline: &mut Pipeline,
68+
input: Receiver<StreamingReadBatch>,
69+
) -> Result<()> {
70+
todo!()
71+
}
72+
}
73+
74+
pub struct ParquetFormatPipe;
75+
76+
#[async_trait::async_trait]
77+
impl InputFormatPipe for ParquetFormatPipe {
78+
type ReadBatch = ReadBatch;
79+
type RowBatch = RowGroupInMemory;
80+
type AligningState = AligningState;
81+
type BlockBuilder = ParquetBlockBuilder;
82+
}
83+
84+
pub struct SplitMeta {
85+
row_groups: Vec<RowGroupMetaData>,
86+
}
87+
88+
pub struct RowGroupInMemory {}
89+
90+
impl Debug for RowGroupInMemory {
91+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92+
write!(f, "RowGroupInMemory")
93+
}
94+
}
95+
96+
#[derive(Debug)]
97+
pub enum ReadBatch {
98+
Buffer(Vec<u8>),
99+
RowGroup(RowGroupInMemory),
100+
}
101+
102+
impl From<Vec<u8>> for ReadBatch {
103+
fn from(v: Vec<u8>) -> Self {
104+
Self::Buffer(v)
105+
}
106+
}
107+
108+
pub struct ParquetBlockBuilder {
109+
ctx: Arc<InputContext>,
110+
}
111+
112+
impl BlockBuilderTrait for ParquetBlockBuilder {
113+
type Pipe = ParquetFormatPipe;
114+
115+
fn create(ctx: Arc<InputContext>) -> Self {
116+
ParquetBlockBuilder { ctx }
117+
}
118+
119+
fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
120+
todo!()
121+
}
122+
}
123+
124+
pub struct AligningState {
125+
buffers: Vec<Vec<u8>>,
126+
}
127+
128+
impl AligningStateTrait for AligningState {
129+
type Pipe = ParquetFormatPipe;
130+
131+
fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
132+
todo!()
133+
}
134+
135+
fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
136+
todo!()
137+
}
138+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_datavalues::TypeDeserializer;
16+
use common_exception::ErrorCode;
17+
use common_exception::Result;
18+
use common_io::prelude::BufferReadExt;
19+
use common_io::prelude::FormatSettings;
20+
use common_io::prelude::NestedCheckpointReader;
21+
22+
use crate::processors::sources::input_formats::input_format_text::AligningState;
23+
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
24+
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
25+
use crate::processors::sources::input_formats::input_format_text::RowBatch;
26+
27+
pub struct InputFormatTSV {}
28+
29+
impl InputFormatTSV {
30+
fn read_row(
31+
buf: &[u8],
32+
deserializers: &mut Vec<common_datavalues::TypeDeserializerImpl>,
33+
format_settings: &FormatSettings,
34+
path: &str,
35+
offset: usize,
36+
row_index: Option<usize>,
37+
) -> Result<()> {
38+
let len = deserializers.len();
39+
let mut n_col = 0;
40+
let mut field_start = 0;
41+
let mut pos = 0;
42+
let mut err_msg = None;
43+
for b in buf.iter() {
44+
if *b == b'\t' {
45+
let col_data = &buf[field_start..pos];
46+
if col_data.is_empty() {
47+
deserializers[n_col].de_default(format_settings);
48+
} else {
49+
let mut reader = NestedCheckpointReader::new(col_data);
50+
reader.ignores(|c: u8| c == b' ').expect("must success");
51+
if let Err(e) = deserializers[n_col].de_text(&mut reader, format_settings) {
52+
err_msg = Some(format!(
53+
"fail to decode column {}: {:?}, [column_data]=[{}]",
54+
n_col, e, ""
55+
));
56+
break;
57+
};
58+
// todo(youngsofun): check remaining data
59+
}
60+
n_col += 1;
61+
field_start = pos + 1;
62+
if n_col > len {
63+
err_msg = Some("too many columns".to_string());
64+
break;
65+
}
66+
}
67+
pos += 1;
68+
}
69+
if n_col < len {
70+
// todo(youngsofun): allow it optionally (set default)
71+
err_msg = Some(format!("need {} columns, find {} only", len, n_col).to_string());
72+
}
73+
if let Some(m) = err_msg {
74+
let row_info = if let Some(r) = row_index {
75+
format!("at row {},", r)
76+
} else {
77+
String::new()
78+
};
79+
let msg = format!(
80+
"fail to parse tsv {} at offset {}, {}, reason={}",
81+
path,
82+
offset + pos,
83+
row_info,
84+
m
85+
);
86+
Err(ErrorCode::BadBytes(msg))
87+
} else {
88+
Ok(())
89+
}
90+
}
91+
}
92+
93+
impl InputFormatTextBase for InputFormatTSV {
94+
fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
95+
let columns = &mut builder.mutable_columns;
96+
let mut start = 0usize;
97+
let start_row = batch.start_row;
98+
for (i, end) in batch.row_ends.iter().enumerate() {
99+
let buf = &batch.data[start..*end];
100+
Self::read_row(
101+
buf,
102+
columns,
103+
&builder.ctx.format_settings,
104+
&batch.path,
105+
batch.offset + start,
106+
start_row.map(|n| n + i),
107+
)?;
108+
start = *end;
109+
}
110+
Ok(())
111+
}
112+
113+
fn align(state: &mut AligningState<Self>, buf: &[u8]) -> Result<Vec<RowBatch>> {
114+
Ok(state.align_by_row_delimiter(buf))
115+
}
116+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
pub mod input_format_parquet;
16+
pub mod input_format_tsv;

0 commit comments

Comments
 (0)