Skip to content

Commit 121667e

Browse files
committed
add RowBatch.batch_id to debug aligner.
1 parent c5764f3 commit 121667e

File tree

3 files changed

+23
-2
lines changed

3 files changed

+23
-2
lines changed

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ impl InputFormatTextBase for InputFormatCSV {
177177
row_ends: vec![],
178178
field_ends: vec![],
179179
path: state.path.to_string(),
180+
batch_id: state.batch_id,
180181
offset: 0,
181182
start_row: None,
182183
};
@@ -256,6 +257,7 @@ impl InputFormatTextBase for InputFormatCSV {
256257
} else {
257258
vec![last_remain, out_tmp].concat()
258259
};
260+
state.batch_id += 1;
259261
Ok(vec![row_batch])
260262
}
261263
}

src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ impl InputFormatTSV {
3434
deserializers: &mut Vec<common_datavalues::TypeDeserializerImpl>,
3535
format_settings: &FormatSettings,
3636
path: &str,
37+
batch_id: usize,
3738
offset: usize,
3839
row_index: Option<usize>,
3940
) -> Result<()> {
@@ -86,8 +87,9 @@ impl InputFormatTSV {
8687
String::new()
8788
};
8889
let mut msg = format!(
89-
"fail to parse tsv {} at offset {}, {} reason={}, row data: ",
90+
"fail to parse tsv {} batch {} at offset {}, {} reason={}, row data: ",
9091
path,
92+
batch_id,
9193
offset + pos,
9294
row_info,
9395
m
@@ -110,6 +112,13 @@ impl InputFormatTextBase for InputFormatTSV {
110112
}
111113

112114
fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
115+
tracing::debug!(
116+
"tsv deserializing row batch {}, id={}, start_row={:?}, offset={}",
117+
batch.path,
118+
batch.id,
119+
batch.start_row,
120+
batch.offset
121+
);
113122
let columns = &mut builder.mutable_columns;
114123
let mut start = 0usize;
115124
let start_row = batch.start_row;
@@ -120,6 +129,7 @@ impl InputFormatTextBase for InputFormatTSV {
120129
columns,
121130
&builder.ctx.format_settings,
122131
&batch.path,
132+
batch.batch_id,
123133
batch.offset + start,
124134
start_row.map(|n| n + i),
125135
)?;

src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ pub struct RowBatch {
143143

144144
// for error info
145145
pub path: String,
146+
pub batch_id: usize,
146147
pub offset: usize,
147148
pub start_row: Option<usize>,
148149
}
@@ -151,6 +152,7 @@ pub struct AligningState<T> {
151152
pub path: String,
152153
pub record_delimiter_end: u8,
153154
pub field_delimiter: u8,
155+
pub batch_id: usize,
154156
pub rows: usize,
155157
pub offset: usize,
156158
pub rows_to_skip: usize,
@@ -203,7 +205,12 @@ impl<T: InputFormatTextBase> AligningState<T> {
203205
output.data.extend_from_slice(&buf[0..last + 1]);
204206
let size = output.data.len();
205207
output.path = self.path.to_string();
208+
output.start_row = Some(self.rows);
209+
output.offset = self.offset;
210+
output.batch_id = self.batch_id;
206211
self.offset += size;
212+
self.rows += rows.len();
213+
self.batch_id += 1;
207214
tracing::debug!(
208215
"align {} bytes to {} rows: {} .. {}",
209216
size,
@@ -227,8 +234,9 @@ impl<T: InputFormatTextBase> AligningState<T> {
227234
row_ends: vec![end],
228235
field_ends: vec![],
229236
path: self.path.to_string(),
237+
batch_id: self.batch_id,
230238
offset: self.offset,
231-
start_row: None,
239+
start_row: Some(self.rows),
232240
};
233241
vec![row_batch]
234242
}
@@ -260,6 +268,7 @@ impl<T: InputFormatTextBase> AligningStateTrait for AligningState<T> {
260268
csv_reader,
261269
tail_of_last_batch: vec![],
262270
rows: 0,
271+
batch_id: 0,
263272
num_fields: ctx.schema.num_fields(),
264273
offset: split_info.offset,
265274
record_delimiter_end: ctx.record_delimiter.end(),

0 commit comments

Comments
 (0)