Skip to content

Commit d8df411

Browse files
committed
fix align_by_record_delimiter
1 parent 4e0aafc commit d8df411

File tree

3 files changed

+20
-11
lines changed

3 files changed

+20
-11
lines changed

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_ndjson.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ impl InputFormatTextBase for InputFormatNDJson {
105105
return Err(ErrorCode::BadBytes(msg));
106106
}
107107
}
108-
start = *end + 1;
108+
start = *end;
109109
}
110110
Ok(())
111111
}

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,15 @@ impl InputFormatTextBase for InputFormatTSV {
115115
tracing::debug!(
116116
"tsv deserializing row batch {}, id={}, start_row={:?}, offset={}",
117117
batch.path,
118-
batch.id,
118+
batch.batch_id,
119119
batch.start_row,
120120
batch.offset
121121
);
122122
let columns = &mut builder.mutable_columns;
123123
let mut start = 0usize;
124124
let start_row = batch.start_row;
125125
for (i, end) in batch.row_ends.iter().enumerate() {
126-
let buf = &batch.data[start..*end];
126+
let buf = &batch.data[start..*end]; // include \n
127127
Self::read_row(
128128
buf,
129129
columns,
@@ -133,7 +133,7 @@ impl InputFormatTextBase for InputFormatTSV {
133133
batch.offset + start,
134134
start_row.map(|n| n + i),
135135
)?;
136-
start = *end + 1;
136+
start = *end;
137137
}
138138
Ok(())
139139
}

src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ pub struct AligningState<T> {
166166
impl<T: InputFormatTextBase> AligningState<T> {
167167
pub fn align_by_record_delimiter(&mut self, buf_in: &[u8]) -> Vec<RowBatch> {
168168
let record_delimiter_end = self.record_delimiter_end;
169+
let size_last_remain = self.tail_of_last_batch.len();
169170
let mut buf = buf_in;
170171
if self.rows_to_skip > 0 {
171172
let mut i = 0;
@@ -193,16 +194,17 @@ impl<T: InputFormatTextBase> AligningState<T> {
193194
let rows = &mut output.row_ends;
194195
for (i, b) in buf.iter().enumerate() {
195196
if *b == b'\n' {
196-
rows.push(i)
197+
rows.push(i + 1 + size_last_remain)
197198
}
198199
}
199-
let last = rows[rows.len() - 1];
200+
let batch_end = rows[rows.len() - 1] - size_last_remain;
200201
if rows.is_empty() {
201202
self.tail_of_last_batch.extend_from_slice(buf);
202203
vec![]
203204
} else {
204205
output.data = mem::take(&mut self.tail_of_last_batch);
205-
output.data.extend_from_slice(&buf[0..last + 1]);
206+
output.data.extend_from_slice(&buf[..batch_end]);
207+
self.tail_of_last_batch.extend_from_slice(&buf[batch_end..]);
206208
let size = output.data.len();
207209
output.path = self.path.to_string();
208210
output.start_row = Some(self.rows);
@@ -212,11 +214,12 @@ impl<T: InputFormatTextBase> AligningState<T> {
212214
self.rows += rows.len();
213215
self.batch_id += 1;
214216
tracing::debug!(
215-
"align {} bytes to {} rows: {} .. {}",
216-
size,
217+
"align batch {}, {} + {} + {} bytes to {} rows",
218+
output.batch_id,
219+
size_last_remain,
220+
batch_end,
221+
self.tail_of_last_batch.len(),
217222
rows.len(),
218-
rows[0],
219-
last
220223
);
221224
vec![output]
222225
}
@@ -238,6 +241,12 @@ impl<T: InputFormatTextBase> AligningState<T> {
238241
offset: self.offset,
239242
start_row: Some(self.rows),
240243
};
244+
tracing::debug!(
245+
"align flush batch {}, bytes = {}, start_row = {}",
246+
row_batch.batch_id,
247+
self.tail_of_last_batch.len(),
248+
self.rows
249+
);
241250
vec![row_batch]
242251
}
243252
}

0 commit comments

Comments
 (0)