Skip to content

Commit 7452ffc

Browse files
authored
feat: refactor ndjson input format. (#14943)
* feat: refactor ndjson input format. * feat: refactor ndjson input format.
1 parent 29f6bde commit 7452ffc

File tree

19 files changed

+683
-55
lines changed

19 files changed

+683
-55
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/formats/src/field_decoder/json_ast.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,12 @@ impl FieldDecoder for FieldJsonAstDecoder {
6363
}
6464

6565
impl FieldJsonAstDecoder {
66-
pub fn create(options: &FileFormatOptionsExt, rounding_mode: bool) -> Self {
66+
pub fn create(options: &FileFormatOptionsExt) -> Self {
6767
FieldJsonAstDecoder {
6868
timezone: options.timezone,
6969
ident_case_sensitive: options.ident_case_sensitive,
7070
is_select: options.is_select,
71-
is_rounding_mode: rounding_mode,
71+
is_rounding_mode: options.is_rounding_mode,
7272
}
7373
}
7474

src/query/pipeline/sources/src/input_formats/impls/input_format_ndjson.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,7 @@ impl InputFormatTextBase for InputFormatNDJson {
179179
_params: &FileFormatParams,
180180
options: &FileFormatOptionsExt,
181181
) -> Arc<dyn FieldDecoder> {
182-
Arc::new(FieldJsonAstDecoder::create(
183-
options,
184-
options.is_rounding_mode,
185-
))
182+
Arc::new(FieldJsonAstDecoder::create(options))
186183
}
187184

188185
fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {

src/query/storages/stage/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ databend-common-storages-parquet = { path = "../parquet" }
2828

2929
async-backtrace = { workspace = true }
3030
async-trait = { workspace = true }
31+
bstr = "1.9.1"
3132
csv-core = "0.1.11"
3233
dashmap = { workspace = true }
34+
enum-as-inner = "0.6.0"
3335
log = { workspace = true }
3436
opendal = { workspace = true }
3537
serde = { workspace = true }
3638

39+
serde_json = { workspace = true }
3740
typetag = { workspace = true }
3841
uuid = { workspace = true }
3942

src/query/storages/stage/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#![allow(internal_features)]
16+
#![feature(core_intrinsics)]
1517
#![feature(impl_trait_in_assoc_type)]
1618
#![feature(box_patterns)]
1719
#![allow(clippy::uninlined_format_args)]

src/query/storages/stage/src/read/row_based/batch.rs

Lines changed: 143 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,39 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::intrinsics::unlikely;
16+
1517
use databend_common_expression::BlockMetaInfo;
18+
use enum_as_inner::EnumAsInner;
19+
use serde::Deserialize;
20+
use serde::Serialize;
1621

17-
#[derive(serde::Serialize, serde::Deserialize, Debug)]
22+
#[derive(Serialize, Deserialize, Debug, Clone)]
23+
pub struct Position {
24+
pub path: String,
25+
pub rows: usize,
26+
pub offset: usize,
27+
}
28+
29+
impl Position {
30+
pub fn new(path: String) -> Self {
31+
Self {
32+
path,
33+
rows: 0,
34+
offset: 0,
35+
}
36+
}
37+
38+
pub fn from_bytes_batch(batch: &BytesBatch, start_row_id: usize) -> Self {
39+
Self {
40+
path: batch.path.clone(),
41+
rows: start_row_id,
42+
offset: batch.offset,
43+
}
44+
}
45+
}
46+
47+
#[derive(Serialize, Deserialize, Debug)]
1848
pub struct BytesBatch {
1949
pub data: Vec<u8>,
2050

@@ -45,32 +75,131 @@ impl BlockMetaInfo for BytesBatch {
4575
}
4676
}
4777

78+
#[derive(serde::Serialize, serde::Deserialize, Debug)]
79+
pub struct RowBatchWithPosition {
80+
pub data: RowBatch,
81+
pub start_pos: Position,
82+
}
83+
84+
impl RowBatchWithPosition {
85+
pub fn new(data: RowBatch, start_pos: Position) -> Self {
86+
Self { data, start_pos }
87+
}
88+
}
89+
90+
#[derive(serde::Serialize, serde::Deserialize, Debug, EnumAsInner)]
91+
pub enum RowBatch {
92+
Csv(CSVRowBatch),
93+
NDJson(NdjsonRowBatch),
94+
}
95+
96+
impl RowBatch {
97+
pub fn rows(&self) -> usize {
98+
match self {
99+
RowBatch::Csv(b) => b.rows(),
100+
RowBatch::NDJson(b) => b.rows(),
101+
}
102+
}
103+
104+
pub fn size(&self) -> usize {
105+
match self {
106+
RowBatch::Csv(b) => b.size(),
107+
RowBatch::NDJson(b) => b.size(),
108+
}
109+
}
110+
}
111+
48112
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
49-
pub struct RowBatch {
113+
pub struct CSVRowBatch {
50114
/// row[i] starts at row_ends[i-1] and ends at row_ends[i]
51115
/// has num_fields[i] fields
52116
/// field[j] starts at field_ends[i-1][j] and ends at field_ends[i-1][j]
53117
pub data: Vec<u8>,
54118
pub row_ends: Vec<usize>,
55119
pub field_ends: Vec<usize>,
56120
pub num_fields: Vec<usize>,
121+
}
57122

58-
pub path: String,
59-
pub offset: usize,
60-
// start from 0
61-
pub start_row_id: usize,
123+
#[derive(serde::Serialize, serde::Deserialize, Debug, Default)]
124+
pub struct NdjsonRowBatch {
125+
// as the first row of this batch
126+
pub tail_of_last_batch: Option<Vec<u8>>,
127+
128+
// designed to use Vec of BytesBatch without realloc
129+
// should ignore data[..start]
130+
pub data: Vec<u8>,
131+
pub start: usize,
132+
pub row_ends: Vec<usize>,
133+
}
134+
pub struct NdJsonRowBatchIter<'a> {
135+
first_row: &'a [u8],
136+
data: &'a [u8],
137+
row_ends: &'a [usize],
138+
end_index: i32,
139+
start: usize,
62140
}
63141

64-
impl RowBatch {
65-
pub fn new(raw: &BytesBatch, start_row_id: usize) -> Self {
66-
Self {
67-
path: raw.path.clone(),
68-
offset: raw.offset,
69-
start_row_id,
70-
..Default::default()
142+
impl<'a> NdjsonRowBatch {
143+
pub fn iter(&'a self) -> NdJsonRowBatchIter<'a> {
144+
let (end_index, first_row) = if let Some(row) = &self.tail_of_last_batch {
145+
(-1, row)
146+
} else {
147+
(0, &self.data)
148+
};
149+
NdJsonRowBatchIter {
150+
first_row,
151+
end_index,
152+
data: &self.data,
153+
row_ends: &self.row_ends,
154+
start: self.start,
71155
}
72156
}
157+
}
158+
159+
impl<'a> Iterator for NdJsonRowBatchIter<'a> {
160+
type Item = &'a [u8];
161+
162+
fn next(&mut self) -> Option<Self::Item> {
163+
if unlikely(self.end_index < 0) {
164+
self.end_index = 0;
165+
Some(self.first_row)
166+
} else {
167+
let end_index = self.end_index as usize;
168+
if end_index >= self.row_ends.len() {
169+
None
170+
} else {
171+
let end = self.row_ends[end_index];
172+
let start = self.start;
173+
self.start = end;
174+
self.end_index += 1;
175+
Some(&self.data[start..end])
176+
}
177+
}
178+
}
179+
}
180+
181+
impl NdjsonRowBatch {
182+
pub fn rows(&self) -> usize {
183+
self.row_ends.len()
184+
+ if self.tail_of_last_batch.is_none() {
185+
0
186+
} else {
187+
1
188+
}
189+
}
190+
191+
pub fn size(&self) -> usize {
192+
self.data.len()
193+
+ self
194+
.tail_of_last_batch
195+
.as_ref()
196+
.map(|v| v.len())
197+
.unwrap_or(0)
198+
- self.start
199+
}
200+
}
73201

202+
impl CSVRowBatch {
74203
pub fn rows(&self) -> usize {
75204
self.row_ends.len()
76205
}
@@ -81,7 +210,7 @@ impl RowBatch {
81210
}
82211

83212
#[typetag::serde(name = "row_batch")]
84-
impl BlockMetaInfo for RowBatch {
213+
impl BlockMetaInfo for RowBatchWithPosition {
85214
fn equals(&self, _info: &Box<dyn BlockMetaInfo>) -> bool {
86215
unreachable!("RowBatch as BlockMetaInfo is not expected to be compared.")
87216
}

src/query/storages/stage/src/read/row_based/format.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,22 @@ use databend_common_meta_app::principal::FileFormatParams;
2121
use databend_common_storage::FileStatus;
2222

2323
use super::batch::BytesBatch;
24-
use super::batch::RowBatch;
24+
use super::batch::RowBatchWithPosition;
2525
use super::processors::BlockBuilderState;
2626
use crate::read::load_context::LoadContext;
2727
use crate::read::row_based::formats::CsvInputFormat;
28+
use crate::read::row_based::formats::NdJsonInputFormat;
2829

2930
pub trait SeparatorState: Send + Sync {
30-
fn append(&mut self, batch: BytesBatch) -> Result<(Vec<RowBatch>, FileStatus)>;
31+
fn append(&mut self, batch: BytesBatch) -> Result<(Vec<RowBatchWithPosition>, FileStatus)>;
3132
}
3233

3334
pub trait RowDecoder: Send + Sync {
34-
fn add(&self, block_builder: &mut BlockBuilderState, batch: RowBatch)
35-
-> Result<Vec<DataBlock>>;
35+
fn add(
36+
&self,
37+
block_builder: &mut BlockBuilderState,
38+
batch: RowBatchWithPosition,
39+
) -> Result<Vec<DataBlock>>;
3640

3741
fn flush(&self, columns: Vec<Column>, _num_rows: usize) -> Vec<Column> {
3842
columns
@@ -51,6 +55,7 @@ pub trait RowBasedFileFormat: Sync + Send {
5155
pub fn create_row_based_file_format(params: &FileFormatParams) -> Arc<dyn RowBasedFileFormat> {
5256
match params {
5357
FileFormatParams::Csv(p) => Arc::new(CsvInputFormat { params: p.clone() }),
58+
FileFormatParams::NdJson(p) => Arc::new(NdJsonInputFormat { params: p.clone() }),
5459
_ => {
5560
unreachable!("Unsupported row based file format")
5661
}

src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use databend_common_pipeline_sources::input_formats::error_utils::get_decode_err
2727
use databend_common_storage::FileParseError;
2828

2929
use crate::read::load_context::LoadContext;
30-
use crate::read::row_based::batch::RowBatch;
30+
use crate::read::row_based::batch::RowBatchWithPosition;
3131
use crate::read::row_based::format::RowDecoder;
3232
use crate::read::row_based::formats::csv::CsvInputFormat;
3333
use crate::read::row_based::processors::BlockBuilderState;
@@ -157,24 +157,29 @@ impl CsvDecoder {
157157
}
158158

159159
impl RowDecoder for CsvDecoder {
160-
fn add(&self, state: &mut BlockBuilderState, batch: RowBatch) -> Result<Vec<DataBlock>> {
160+
fn add(
161+
&self,
162+
state: &mut BlockBuilderState,
163+
batch: RowBatchWithPosition,
164+
) -> Result<Vec<DataBlock>> {
165+
let data = batch.data.into_csv().unwrap();
161166
let columns = &mut state.mutable_columns;
162167
let mut start = 0usize;
163168
let mut field_end_idx = 0;
164-
for (i, end) in batch.row_ends.iter().enumerate() {
165-
let num_fields = batch.num_fields[i];
166-
let buf = &batch.data[start..*end];
169+
for (i, end) in data.row_ends.iter().enumerate() {
170+
let num_fields = data.num_fields[i];
171+
let buf = &data.data[start..*end];
167172
if let Err(e) = self.read_row(
168173
buf,
169174
columns,
170-
&batch.field_ends[field_end_idx..field_end_idx + num_fields],
175+
&data.field_ends[field_end_idx..field_end_idx + num_fields],
171176
) {
172177
self.load_context.error_handler.on_error(
173178
e,
174179
Some((columns, state.num_rows)),
175180
&mut state.file_status,
176-
&batch.path,
177-
i + batch.start_row_id,
181+
&batch.start_pos.path,
182+
i + batch.start_pos.rows,
178183
)?
179184
} else {
180185
state.num_rows += 1;

src/query/storages/stage/src/read/row_based/formats/csv/format.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@ use crate::read::row_based::format::SeparatorState;
2424
use crate::read::row_based::formats::csv::block_builder::CsvDecoder;
2525
use crate::read::row_based::formats::csv::separator::CsvReader;
2626

27-
pub struct Position {
28-
pub path: String,
29-
pub rows: usize,
30-
pub offset: usize,
31-
}
3227
#[derive(Clone)]
3328
pub struct CsvInputFormat {
3429
pub(crate) params: CsvFileFormatParams,

0 commit comments

Comments
 (0)