Skip to content

Commit cc67ae4

Browse files
committed
input format csv
1 parent a59259c commit cc67ae4

File tree

1 file changed

+296
-0
lines changed

1 file changed

+296
-0
lines changed
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::mem;
16+
use std::sync::Arc;
17+
18+
use common_datavalues::TypeDeserializer;
19+
use common_exception::ErrorCode;
20+
use common_exception::Result;
21+
use common_formats::verbose_string;
22+
use common_io::prelude::FormatSettings;
23+
use common_io::prelude::NestedCheckpointReader;
24+
use common_meta_types::StageFileFormatType;
25+
use csv_core::ReadRecordResult;
26+
27+
use crate::processors::sources::input_formats::delimiter::RecordDelimiter;
28+
use crate::processors::sources::input_formats::input_format_text::AligningState;
29+
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
30+
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
31+
use crate::processors::sources::input_formats::input_format_text::RowBatch;
32+
use crate::processors::sources::input_formats::InputContext;
33+
34+
pub struct InputFormatCSV {}
35+
36+
impl InputFormatCSV {
37+
fn read_row(
38+
buf: &[u8],
39+
deserializers: &mut [common_datavalues::TypeDeserializerImpl],
40+
field_ends: &[usize],
41+
format_settings: &FormatSettings,
42+
path: &str,
43+
row_index: usize,
44+
) -> Result<()> {
45+
let mut field_start = 0;
46+
for (c, deserializer) in deserializers.iter_mut().enumerate() {
47+
let field_end = field_ends[c];
48+
let col_data = &buf[field_start..field_end];
49+
if col_data.is_empty() {
50+
deserializer.de_default(format_settings);
51+
} else {
52+
let mut reader = NestedCheckpointReader::new(col_data);
53+
// reader.ignores(|c: u8| c == b' ').expect("must success");
54+
// todo(youngsofun): do not need escape, already done in csv-core
55+
if let Err(e) = deserializer.de_text(&mut reader, format_settings) {
56+
let mut value = String::new();
57+
verbose_string(buf, &mut value);
58+
let err_msg = format!(
59+
"fail to decode column {}: {:?}, [column_data]=[{}]",
60+
c, e, value
61+
);
62+
return Err(csv_error(&err_msg, path, row_index));
63+
};
64+
}
65+
field_start = field_end;
66+
}
67+
Ok(())
68+
}
69+
}
70+
71+
impl InputFormatTextBase for InputFormatCSV {
72+
fn format_type() -> StageFileFormatType {
73+
StageFileFormatType::Csv
74+
}
75+
76+
fn default_field_delimiter() -> u8 {
77+
b','
78+
}
79+
80+
fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
81+
let columns = &mut builder.mutable_columns;
82+
let n_column = columns.len();
83+
let mut start = 0usize;
84+
let start_row = batch.start_row.expect("must success");
85+
let mut field_end_idx = 0;
86+
for (i, end) in batch.row_ends.iter().enumerate() {
87+
let buf = &batch.data[start..*end];
88+
Self::read_row(
89+
buf,
90+
columns,
91+
&batch.field_ends[field_end_idx..field_end_idx + n_column],
92+
&builder.ctx.format_settings,
93+
&batch.path,
94+
start_row + i,
95+
)?;
96+
start = *end;
97+
field_end_idx += n_column;
98+
}
99+
Ok(())
100+
}
101+
102+
fn align(state: &mut AligningState<Self>, buf_in: &[u8]) -> Result<Vec<RowBatch>> {
103+
let num_fields = state.num_fields;
104+
let reader = state.csv_reader.as_mut().expect("must success");
105+
let field_ends = &mut reader.field_ends[..];
106+
let start_row = state.rows;
107+
state.offset += buf_in.len();
108+
109+
// assume n_out <= n_in for read_record
110+
let mut out_tmp = vec![0u8; buf_in.len()];
111+
let mut endlen = reader.n_end;
112+
let mut buf = buf_in;
113+
114+
while state.rows_to_skip > 0 {
115+
let (result, n_in, _, n_end) =
116+
reader
117+
.reader
118+
.read_record(buf, &mut out_tmp, &mut field_ends[endlen..]);
119+
buf = &buf[n_in..];
120+
endlen += n_end;
121+
122+
match result {
123+
ReadRecordResult::InputEmpty => {
124+
reader.n_end = endlen;
125+
return Ok(vec![]);
126+
}
127+
ReadRecordResult::OutputFull => {
128+
return Err(csv_error(
129+
"output more than input, in header",
130+
&state.path,
131+
state.rows,
132+
));
133+
}
134+
ReadRecordResult::OutputEndsFull => {
135+
return Err(csv_error(
136+
&format!(
137+
"too many fields, expect {}, got more than {}",
138+
num_fields,
139+
field_ends.len()
140+
),
141+
&state.path,
142+
state.rows,
143+
));
144+
}
145+
ReadRecordResult::Record => {
146+
if endlen < num_fields {
147+
return Err(csv_error(
148+
&format!("expect {} fields, only found {} ", num_fields, n_end),
149+
&state.path,
150+
state.rows,
151+
));
152+
} else if endlen > num_fields + 1 {
153+
return Err(csv_error(
154+
&format!("too many fields, expect {}, got {}", num_fields, n_end),
155+
&state.path,
156+
state.rows,
157+
));
158+
}
159+
160+
state.rows_to_skip -= 1;
161+
state.rows += 1;
162+
endlen = 0;
163+
}
164+
ReadRecordResult::End => {
165+
return Err(csv_error("unexpect eof in header", &state.path, state.rows));
166+
}
167+
}
168+
}
169+
170+
let mut out_pos = 0usize;
171+
let mut row_batch_end: usize = 0;
172+
173+
let last_batch_remain_len = reader.out.len();
174+
175+
let mut row_batch = RowBatch {
176+
data: vec![],
177+
row_ends: vec![],
178+
field_ends: vec![],
179+
path: state.path.to_string(),
180+
offset: 0,
181+
start_row: None,
182+
};
183+
184+
while !buf.is_empty() {
185+
let (result, n_in, n_out, n_end) =
186+
reader
187+
.reader
188+
.read_record(buf, &mut out_tmp[out_pos..], &mut field_ends[endlen..]);
189+
buf = &buf[n_in..];
190+
endlen += n_end;
191+
out_pos += n_out;
192+
match result {
193+
ReadRecordResult::InputEmpty => {
194+
break;
195+
}
196+
ReadRecordResult::OutputFull => {
197+
return Err(csv_error(
198+
"output more than input",
199+
&state.path,
200+
start_row + row_batch.row_ends.len(),
201+
));
202+
}
203+
ReadRecordResult::OutputEndsFull => {
204+
return Err(csv_error(
205+
&format!(
206+
"too many fields, expect {}, got more than {}",
207+
num_fields,
208+
field_ends.len()
209+
),
210+
&state.path,
211+
start_row + row_batch.row_ends.len(),
212+
));
213+
}
214+
ReadRecordResult::Record => {
215+
if endlen < num_fields {
216+
return Err(csv_error(
217+
&format!("expect {} fields, only found {} ", num_fields, n_end),
218+
&state.path,
219+
start_row + row_batch.row_ends.len(),
220+
));
221+
} else if endlen > num_fields + 1 {
222+
return Err(csv_error(
223+
&format!("too many fields, expect {}, got {}", num_fields, n_end),
224+
&state.path,
225+
start_row + row_batch.row_ends.len(),
226+
));
227+
}
228+
row_batch
229+
.field_ends
230+
.extend_from_slice(&field_ends[..num_fields]);
231+
row_batch.row_ends.push(last_batch_remain_len + out_pos);
232+
endlen = 0;
233+
row_batch_end = out_pos;
234+
}
235+
ReadRecordResult::End => {
236+
return Err(csv_error(
237+
"unexpect eof",
238+
&state.path,
239+
start_row + row_batch.row_ends.len(),
240+
));
241+
}
242+
}
243+
}
244+
245+
if row_batch.row_ends.is_empty() {
246+
reader.out.extend_from_slice(&out_tmp[..out_pos]);
247+
Ok(vec![])
248+
} else {
249+
state.rows += row_batch.row_ends.len();
250+
let last_remain = mem::take(&mut reader.out);
251+
reader.out.extend_from_slice(&out_tmp[row_batch_end..]);
252+
out_tmp.truncate(row_batch_end);
253+
row_batch.start_row = Some(state.rows);
254+
row_batch.data = if last_remain.is_empty() {
255+
out_tmp
256+
} else {
257+
vec![last_remain, out_tmp].concat()
258+
};
259+
Ok(vec![row_batch])
260+
}
261+
}
262+
}
263+
264+
pub struct CsvReaderState {
265+
pub reader: csv_core::Reader,
266+
267+
// remain from last read batch
268+
pub out: Vec<u8>,
269+
pub field_ends: Vec<usize>,
270+
pub n_end: usize,
271+
}
272+
273+
impl CsvReaderState {
274+
pub(crate) fn create(ctx: &Arc<InputContext>) -> Self {
275+
let reader = csv_core::ReaderBuilder::new()
276+
.delimiter(ctx.field_delimiter)
277+
.terminator(match ctx.record_delimiter {
278+
RecordDelimiter::Crlf => csv_core::Terminator::CRLF,
279+
RecordDelimiter::Any(v) => csv_core::Terminator::Any(v),
280+
})
281+
.build();
282+
Self {
283+
reader,
284+
out: vec![],
285+
field_ends: vec![0; ctx.schema.num_fields() + 6],
286+
n_end: 0,
287+
}
288+
}
289+
}
290+
291+
fn csv_error(msg: &str, path: &str, row: usize) -> ErrorCode {
292+
let row = row + 1;
293+
let msg = format!("fail to parse CSV {}:{} {} ", path, row, msg);
294+
295+
ErrorCode::BadBytes(msg)
296+
}

0 commit comments

Comments
 (0)