Skip to content

Commit a59259c

Browse files
committed
input format ndjson
1 parent bdcf7a9 commit a59259c

File tree

1 file changed

+129
-0
lines changed

1 file changed

+129
-0
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::borrow::Cow;
16+
17+
use bstr::ByteSlice;
18+
use common_datavalues::DataSchemaRef;
19+
use common_datavalues::TypeDeserializer;
20+
use common_datavalues::TypeDeserializerImpl;
21+
use common_exception::ErrorCode;
22+
use common_exception::Result;
23+
use common_io::prelude::FormatSettings;
24+
use common_meta_types::StageFileFormatType;
25+
26+
use crate::processors::sources::input_formats::input_format_text::AligningState;
27+
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
28+
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
29+
use crate::processors::sources::input_formats::input_format_text::RowBatch;
30+
31+
pub struct InputFormatNDJson {}
32+
33+
impl InputFormatNDJson {
34+
fn read_row(
35+
buf: &[u8],
36+
deserializers: &mut [TypeDeserializerImpl],
37+
format_settings: &FormatSettings,
38+
schema: &DataSchemaRef,
39+
) -> Result<()> {
40+
let mut json: serde_json::Value = serde_json::from_reader(buf)?;
41+
// if it's not case_sensitive, we convert to lowercase
42+
if !format_settings.ident_case_sensitive {
43+
if let serde_json::Value::Object(x) = json {
44+
let y = x.into_iter().map(|(k, v)| (k.to_lowercase(), v)).collect();
45+
json = serde_json::Value::Object(y);
46+
}
47+
}
48+
49+
for (f, deser) in schema.fields().iter().zip(deserializers.iter_mut()) {
50+
let value = if format_settings.ident_case_sensitive {
51+
&json[f.name().to_owned()]
52+
} else {
53+
&json[f.name().to_lowercase()]
54+
};
55+
56+
deser.de_json(value, format_settings).map_err(|e| {
57+
let value_str = format!("{:?}", value);
58+
ErrorCode::BadBytes(format!(
59+
"{}. column={} value={}",
60+
e,
61+
f.name(),
62+
maybe_truncated(&value_str, 1024),
63+
))
64+
})?;
65+
}
66+
Ok(())
67+
}
68+
}
69+
70+
impl InputFormatTextBase for InputFormatNDJson {
71+
fn format_type() -> StageFileFormatType {
72+
StageFileFormatType::NdJson
73+
}
74+
75+
fn default_field_delimiter() -> u8 {
76+
b','
77+
}
78+
79+
fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
80+
let columns = &mut builder.mutable_columns;
81+
let mut start = 0usize;
82+
let start_row = batch.start_row;
83+
for (i, end) in batch.row_ends.iter().enumerate() {
84+
let buf = &batch.data[start..*end];
85+
let buf = buf.trim();
86+
if !buf.is_empty() {
87+
if let Err(e) = Self::read_row(
88+
buf,
89+
columns,
90+
&builder.ctx.format_settings,
91+
&builder.ctx.schema,
92+
) {
93+
let row_info = if let Some(r) = start_row {
94+
format!("row={},", r + i)
95+
} else {
96+
String::new()
97+
};
98+
let msg = format!(
99+
"fail to parse NDJSON: {}, path={}, offset={}, {}",
100+
&batch.path,
101+
e,
102+
batch.offset + start,
103+
row_info,
104+
);
105+
return Err(ErrorCode::BadBytes(msg));
106+
}
107+
}
108+
start = *end + 1;
109+
}
110+
Ok(())
111+
}
112+
113+
fn align(state: &mut AligningState<Self>, buf: &[u8]) -> Result<Vec<RowBatch>> {
114+
Ok(state.align_by_record_delimiter(buf))
115+
}
116+
}
117+
118+
fn maybe_truncated(s: &str, limit: usize) -> Cow<'_, str> {
119+
if s.len() > limit {
120+
Cow::Owned(format!(
121+
"(first {}B of {}B): {}",
122+
limit,
123+
s.len(),
124+
&s[..limit]
125+
))
126+
} else {
127+
Cow::Borrowed(s)
128+
}
129+
}

0 commit comments

Comments
 (0)