Skip to content

Commit 722ae22

Browse files
authored
fix: wrong row id (#15018)
* chore: enable_new_copy_for_text_formats=0 by default. reason: #14983 * fix: wrong csv row id for the last row. * Update src/query/settings/src/settings_default.rs
1 parent 22362e6 commit 722ae22

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl InputFormatCSV {
8383
column_type: field.data_type.to_string(),
8484
empty_field_as: empty_filed_as.to_string(),
8585
remedy: format!(
86-
"one of the following options: 1. Modify the `{}` column to allow NULL values. 2. Set `EMPTY_FIELD_AS = FIELD_DEFAULT`.",
86+
"one of the following options: 1. Modify the `{}` column to allow NULL values. 2. Set EMPTY_FIELD_AS to FIELD_DEFAULT.",
8787
field.name()
8888
),
8989
});
@@ -394,14 +394,18 @@ impl AligningStateTextBased for CsvReaderState {
394394
let size_in = buf_in.len();
395395
let mut file_status = FileStatus::default();
396396
let mut buf_out = vec![0u8; buf_in.len()];
397-
while self.common.rows_to_skip > 0 {
397+
while self.common.rows_to_skip > 0 && !buf_in.is_empty() {
398398
let (res, n_in) = self.read_record(buf_in, &mut buf_out, &mut file_status)?;
399399
buf_in = &buf_in[n_in..];
400400
if matches!(res, ReadRecordOutput::Record { .. }) {
401401
self.common.rows_to_skip -= 1;
402402
}
403403
}
404404

405+
if buf_in.is_empty() {
406+
return Ok(vec![]);
407+
}
408+
405409
let mut buf_out_pos = 0usize;
406410
let mut buf_out_row_end: usize = 0;
407411

@@ -490,6 +494,7 @@ impl AligningStateTextBased for CsvReaderState {
490494
self.read_record(&in_tmp, &mut out_tmp, &mut file_status)?;
491495
} else {
492496
let last_batch_remain_len = self.out.len();
497+
let rows = self.common.rows;
493498
let (out, _n_in) = self.read_record(&in_tmp, &mut out_tmp, &mut file_status)?;
494499
if let ReadRecordOutput::Record { num_fields, bytes } = out {
495500
let data = mem::take(&mut self.out);
@@ -502,7 +507,7 @@ impl AligningStateTextBased for CsvReaderState {
502507
split_info: self.split_info.clone(),
503508
batch_id: self.common.batch_id,
504509
start_offset_in_split: self.common.offset,
505-
start_row_in_split: self.common.rows,
510+
start_row_in_split: rows,
506511
start_row_of_split: Some(0),
507512
};
508513
res.push(row_batch);

0 commit comments

Comments
 (0)