Skip to content

Commit 42f4957

Browse files
committed
feat(query): new input format framework.
1 parent 89b1367 commit 42f4957

File tree

16 files changed

+1867
-0
lines changed

16 files changed

+1867
-0
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/pipeline/sources/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,30 @@ doctest = false
99
test = false
1010

1111
[dependencies]
12+
async-channel = "1.7.1"
13+
common-arrow = { path = "../../../common/arrow" }
1214
common-base = { path = "../../../common/base" }
1315
common-catalog = { path = "../../catalog" }
1416
common-datablocks = { path = "../../datablocks" }
17+
common-datavalues = { path = "../../datavalues" }
1518
common-exception = { path = "../../../common/exception" }
1619
common-formats = { path = "../../formats" }
1720
common-io = { path = "../../../common/io" }
1821
common-meta-types = { path = "../../../meta/types" }
1922
common-pipeline-core = { path = "../core" }
23+
common-settings = { path = "../../settings" }
2024
common-storage = { path = "../../../common/storage" }
2125
common-streams = { path = "../../streams" }
2226

2327
async-trait = { version = "0.1.0", package = "async-trait-fn" }
28+
bstr = "0.2.17"
29+
crossbeam-channel = "0.5.6"
30+
csv-core = "0.1.10"
2431
futures = "0.3.21"
2532
futures-util = "0.3.21"
2633
opendal = { version = "0.17.1", features = ["layers-retry", "compress"] }
2734
parking_lot = "0.12.1"
35+
serde_json = "1.0.81"
36+
similar-asserts = "1.2.0"
37+
tracing = "0.1.35"
38+
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::ErrorCode;
16+
use common_exception::Result;
17+
18+
pub enum RecordDelimiter {
19+
Crlf,
20+
Any(u8),
21+
}
22+
23+
impl RecordDelimiter {
24+
pub fn end(&self) -> u8 {
25+
match self {
26+
RecordDelimiter::Crlf => b'\n',
27+
RecordDelimiter::Any(b) => *b,
28+
}
29+
}
30+
}
31+
32+
impl TryFrom<&str> for RecordDelimiter {
33+
type Error = ErrorCode;
34+
fn try_from(s: &str) -> Result<Self> {
35+
Self::try_from(s.as_bytes())
36+
}
37+
}
38+
39+
impl TryFrom<&[u8]> for RecordDelimiter {
40+
type Error = ErrorCode;
41+
fn try_from(s: &[u8]) -> Result<Self> {
42+
match s.len() {
43+
1 => Ok(RecordDelimiter::Any(s[0])),
44+
2 if s.eq(b"\r\n") => Ok(RecordDelimiter::Crlf),
45+
_ => Err(ErrorCode::InvalidArgument(format!(
46+
"bad RecordDelimiter: '{:?}'",
47+
s
48+
))),
49+
}
50+
}
51+
}
52+
53+
impl Default for RecordDelimiter {
54+
fn default() -> Self {
55+
RecordDelimiter::Crlf
56+
}
57+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt::Debug;
16+
use std::fmt::Formatter;
17+
use std::sync::Arc;
18+
19+
use common_arrow::parquet::metadata::RowGroupMetaData;
20+
use common_base::base::tokio::sync::mpsc::Receiver;
21+
use common_datablocks::DataBlock;
22+
use common_exception::Result;
23+
use common_pipeline_core::Pipeline;
24+
use opendal::Object;
25+
26+
use crate::processors::sources::input_formats::input_context::InputContext;
27+
use crate::processors::sources::input_formats::input_format::FileInfo;
28+
use crate::processors::sources::input_formats::input_format::InputData;
29+
use crate::processors::sources::input_formats::input_format::SplitInfo;
30+
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
31+
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
32+
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
33+
use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch;
34+
use crate::processors::sources::input_formats::InputFormat;
35+
36+
struct InputFormatParquet;
37+
38+
#[async_trait::async_trait]
39+
impl InputFormat for InputFormatParquet {
40+
async fn read_file_meta(
41+
&self,
42+
obj: &Object,
43+
size: usize,
44+
) -> Result<Option<Arc<dyn InputData>>> {
45+
todo!()
46+
}
47+
48+
async fn read_split_meta(
49+
&self,
50+
obj: &Object,
51+
split_info: &SplitInfo,
52+
) -> Result<Option<Box<dyn InputData>>> {
53+
todo!()
54+
}
55+
56+
fn split_files(&self, file_infos: Vec<FileInfo>, split_size: usize) -> Vec<SplitInfo> {
57+
todo!()
58+
}
59+
60+
fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
61+
todo!()
62+
}
63+
64+
fn exec_stream(
65+
&self,
66+
ctx: Arc<InputContext>,
67+
pipeline: &mut Pipeline,
68+
input: Receiver<StreamingReadBatch>,
69+
) -> Result<()> {
70+
todo!()
71+
}
72+
}
73+
74+
pub struct ParquetFormatPipe;
75+
76+
#[async_trait::async_trait]
77+
impl InputFormatPipe for ParquetFormatPipe {
78+
type ReadBatch = ReadBatch;
79+
type RowBatch = RowGroupInMemory;
80+
type AligningState = AligningState;
81+
type BlockBuilder = ParquetBlockBuilder;
82+
}
83+
84+
pub struct SplitMeta {
85+
row_groups: Vec<RowGroupMetaData>,
86+
}
87+
88+
pub struct RowGroupInMemory {}
89+
90+
impl Debug for RowGroupInMemory {
91+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92+
write!(f, "RowGroupInMemory")
93+
}
94+
}
95+
96+
#[derive(Debug)]
97+
pub enum ReadBatch {
98+
Buffer(Vec<u8>),
99+
RowGroup(RowGroupInMemory),
100+
}
101+
102+
impl From<Vec<u8>> for ReadBatch {
103+
fn from(v: Vec<u8>) -> Self {
104+
Self::Buffer(v)
105+
}
106+
}
107+
108+
pub struct ParquetBlockBuilder {
109+
ctx: Arc<InputContext>,
110+
}
111+
112+
impl BlockBuilderTrait for ParquetBlockBuilder {
113+
type Pipe = ParquetFormatPipe;
114+
115+
fn create(ctx: Arc<InputContext>) -> Self {
116+
ParquetBlockBuilder { ctx }
117+
}
118+
119+
fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
120+
todo!()
121+
}
122+
}
123+
124+
pub struct AligningState {
125+
buffers: Vec<Vec<u8>>,
126+
}
127+
128+
impl AligningStateTrait for AligningState {
129+
type Pipe = ParquetFormatPipe;
130+
131+
fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
132+
todo!()
133+
}
134+
135+
fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
136+
todo!()
137+
}
138+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_datavalues::TypeDeserializer;
16+
use common_exception::ErrorCode;
17+
use common_exception::Result;
18+
use common_formats::verbose_string;
19+
use common_io::prelude::BufferReadExt;
20+
use common_io::prelude::FormatSettings;
21+
use common_io::prelude::NestedCheckpointReader;
22+
use common_meta_types::StageFileFormatType;
23+
24+
use crate::processors::sources::input_formats::input_format_text::AligningState;
25+
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
26+
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
27+
use crate::processors::sources::input_formats::input_format_text::RowBatch;
28+
29+
pub struct InputFormatTSV {}
30+
31+
impl InputFormatTSV {
32+
fn read_row(
33+
buf: &[u8],
34+
deserializers: &mut Vec<common_datavalues::TypeDeserializerImpl>,
35+
format_settings: &FormatSettings,
36+
path: &str,
37+
offset: usize,
38+
row_index: Option<usize>,
39+
) -> Result<()> {
40+
let num_columns = deserializers.len();
41+
let mut column_index = 0;
42+
let mut field_start = 0;
43+
let mut pos = 0;
44+
let mut err_msg = None;
45+
let buf_len = buf.len();
46+
while pos <= buf_len {
47+
if pos == buf_len || buf[pos] == b'\t' {
48+
let col_data = &buf[field_start..pos];
49+
if col_data.is_empty() {
50+
deserializers[column_index].de_default(format_settings);
51+
} else {
52+
let mut reader = NestedCheckpointReader::new(col_data);
53+
reader.ignores(|c: u8| c == b' ').expect("must success");
54+
if let Err(e) =
55+
deserializers[column_index].de_text(&mut reader, format_settings)
56+
{
57+
err_msg = Some(format!(
58+
"fail to decode column {}: {:?}, [column_data]=[{}]",
59+
column_index, e, ""
60+
));
61+
break;
62+
};
63+
// todo(youngsofun): check remaining data
64+
}
65+
column_index += 1;
66+
field_start = pos + 1;
67+
if column_index > num_columns {
68+
err_msg = Some("too many columns".to_string());
69+
break;
70+
}
71+
}
72+
pos += 1;
73+
}
74+
if column_index < num_columns - 1 {
75+
// todo(youngsofun): allow it optionally (set default)
76+
err_msg = Some(format!(
77+
"need {} columns, find {} only",
78+
num_columns,
79+
column_index + 1
80+
));
81+
}
82+
if let Some(m) = err_msg {
83+
let row_info = if let Some(r) = row_index {
84+
format!("at row {},", r)
85+
} else {
86+
String::new()
87+
};
88+
let mut msg = format!(
89+
"fail to parse tsv {} at offset {}, {} reason={}, row data: ",
90+
path,
91+
offset + pos,
92+
row_info,
93+
m
94+
);
95+
verbose_string(buf, &mut msg);
96+
Err(ErrorCode::BadBytes(msg))
97+
} else {
98+
Ok(())
99+
}
100+
}
101+
}
102+
103+
impl InputFormatTextBase for InputFormatTSV {
104+
fn format_type() -> StageFileFormatType {
105+
StageFileFormatType::Tsv
106+
}
107+
108+
fn default_field_delimiter() -> u8 {
109+
b'\t'
110+
}
111+
112+
fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
113+
let columns = &mut builder.mutable_columns;
114+
let mut start = 0usize;
115+
let start_row = batch.start_row;
116+
for (i, end) in batch.row_ends.iter().enumerate() {
117+
let buf = &batch.data[start..*end];
118+
Self::read_row(
119+
buf,
120+
columns,
121+
&builder.ctx.format_settings,
122+
&batch.path,
123+
batch.offset + start,
124+
start_row.map(|n| n + i),
125+
)?;
126+
start = *end + 1;
127+
}
128+
Ok(())
129+
}
130+
131+
fn align(state: &mut AligningState<Self>, buf: &[u8]) -> Result<Vec<RowBatch>> {
132+
Ok(state.align_by_record_delimiter(buf))
133+
}
134+
}

0 commit comments

Comments
 (0)