Skip to content

Commit da8754d

Browse files
committed
fix CSV aligner.
1 parent 1f30087 commit da8754d

File tree

1 file changed

+26
-8
lines changed

1 file changed

+26
-8
lines changed

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ impl InputFormatTextBase for InputFormatCSV {
158158
}
159159

160160
state.rows_to_skip -= 1;
161+
tracing::debug!(
162+
"csv aligner: skip a header row, remain {}",
163+
state.rows_to_skip
164+
);
161165
state.rows += 1;
162166
endlen = 0;
163167
}
@@ -179,7 +183,7 @@ impl InputFormatTextBase for InputFormatCSV {
179183
path: state.path.to_string(),
180184
batch_id: state.batch_id,
181185
offset: 0,
182-
start_row: None,
186+
start_row: Some(state.rows),
183187
};
184188

185189
while !buf.is_empty() {
@@ -191,9 +195,7 @@ impl InputFormatTextBase for InputFormatCSV {
191195
endlen += n_end;
192196
out_pos += n_out;
193197
match result {
194-
ReadRecordResult::InputEmpty => {
195-
break;
196-
}
198+
ReadRecordResult::InputEmpty => break,
197199
ReadRecordResult::OutputFull => {
198200
return Err(csv_error(
199201
"output more than input",
@@ -243,21 +245,37 @@ impl InputFormatTextBase for InputFormatCSV {
243245
}
244246
}
245247

248+
reader.n_end = endlen;
249+
out_tmp.truncate(out_pos);
246250
if row_batch.row_ends.is_empty() {
247-
reader.out.extend_from_slice(&out_tmp[..out_pos]);
251+
tracing::debug!(
252+
"csv aligner: {} + {} bytes => 0 rows",
253+
reader.out.len(),
254+
buf_in.len(),
255+
);
256+
reader.out.extend_from_slice(&out_tmp);
248257
Ok(vec![])
249258
} else {
250-
state.rows += row_batch.row_ends.len();
251259
let last_remain = mem::take(&mut reader.out);
260+
261+
state.batch_id += 1;
262+
state.rows += row_batch.row_ends.len();
252263
reader.out.extend_from_slice(&out_tmp[row_batch_end..]);
264+
265+
tracing::debug!(
266+
"csv aligner: {} + {} bytes => {} rows + {} bytes remain",
267+
last_remain.len(),
268+
buf_in.len(),
269+
row_batch.row_ends.len(),
270+
reader.out.len()
271+
);
272+
253273
out_tmp.truncate(row_batch_end);
254-
row_batch.start_row = Some(state.rows);
255274
row_batch.data = if last_remain.is_empty() {
256275
out_tmp
257276
} else {
258277
vec![last_remain, out_tmp].concat()
259278
};
260-
state.batch_id += 1;
261279
Ok(vec![row_batch])
262280
}
263281
}

0 commit comments

Comments
 (0)