Skip to content

Commit b3d75e9

Browse files
authored
chore: polish error message when read AVRO. (#18329)
1 parent f7be028 commit b3d75e9

File tree

11 files changed

+109
-49
lines changed

11 files changed

+109
-49
lines changed

src/common/storage/src/copy.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ impl FileStatus {
5757
None => {
5858
self.error = Some(FileErrorsInfo {
5959
num_errors: 1,
60-
first_error: FileErrorInfo { error, line },
60+
first_error: FileParseErrorAtLine { error, line },
6161
});
6262
}
6363
Some(info) => {
6464
info.num_errors += 1;
6565
if info.first_error.line > line {
66-
info.first_error = FileErrorInfo { error, line };
66+
info.first_error = FileParseErrorAtLine { error, line };
6767
}
6868
}
6969
};
@@ -82,7 +82,7 @@ impl FileStatus {
8282
#[derive(Clone, Serialize, Deserialize)]
8383
pub struct FileErrorsInfo {
8484
pub num_errors: usize,
85-
pub first_error: FileErrorInfo,
85+
pub first_error: FileParseErrorAtLine,
8686
}
8787

8888
impl FileErrorsInfo {
@@ -94,20 +94,20 @@ impl FileErrorsInfo {
9494
}
9595
}
9696

97-
#[derive(Clone, Serialize, Deserialize)]
98-
pub struct FileErrorInfo {
99-
pub error: FileParseError,
100-
pub line: usize,
101-
}
102-
10397
#[derive(Error, Debug, Clone, Serialize, Deserialize)]
10498
pub enum FileParseError {
99+
#[error("Not a {format} file ({message})")]
100+
WrongFileType { format: String, message: String },
101+
#[error("Bad {format} file ({message})")]
102+
BadFile { format: String, message: String },
103+
#[error("Wrong root type: {message}")]
104+
WrongRootType { message: String },
105+
#[error("Invalid {format} row: {message}")]
106+
InvalidRow { format: String, message: String },
105107
#[error(
106108
"Number of columns in file ({file}) does not match that of the corresponding table ({table})"
107109
)]
108110
NumberOfColumnsMismatch { table: usize, file: usize },
109-
#[error("Invalid JSON row: {message}")]
110-
InvalidNDJsonRow { message: String },
111111
#[error(
112112
"Invalid value '{column_data}' for column {column_index} ({column_name} {column_type}): {decode_error}"
113113
)]
@@ -151,13 +151,25 @@ pub enum FileParseError {
151151
}
152152

153153
impl FileParseError {
154-
pub fn to_error_code(&self, mode: &OnErrorMode, file_path: &str, line: usize) -> ErrorCode {
155-
let pos: String = format!("at file '{}', line {}", file_path, line);
154+
pub fn with_row(self, line: usize) -> FileParseErrorAtLine {
155+
FileParseErrorAtLine { line, error: self }
156+
}
157+
}
158+
159+
#[derive(Clone, Serialize, Deserialize)]
160+
pub struct FileParseErrorAtLine {
161+
pub error: FileParseError,
162+
pub line: usize,
163+
}
164+
165+
impl FileParseErrorAtLine {
166+
pub fn to_error_code(&self, mode: &OnErrorMode, file_path: &str) -> ErrorCode {
167+
let pos: String = format!("at file '{}', line {}", file_path, self.line);
156168
let message = match mode {
157169
OnErrorMode::AbortNum(n) if *n > 1u64 => {
158-
format!("abort after {n} errors! the last error: {self}",)
170+
format!("abort after {n} errors! the last error: {}", self.error)
159171
}
160-
_ => format!("{self}"),
172+
_ => format!("{}", self.error),
161173
};
162174
ErrorCode::BadBytes(message).add_detail_back(pos)
163175
}

src/common/storage/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ mod statistics;
7777

7878
pub use copy::CopyStatus;
7979
pub use copy::FileParseError;
80+
pub use copy::FileParseErrorAtLine;
8081
pub use copy::FileStatus;
8182
pub use histogram::Histogram;
8283
pub use histogram::HistogramBucket;

src/query/storages/stage/src/read/avro/block_builder_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl AccumulatingTransform for BlockBuilderProcessor {
5555
self.state.file_path = data.path.clone();
5656
let num_rows = self.state.num_rows;
5757
self.state.file_full_path = format!("{}{}", self.ctx.stage_root, data.path);
58-
self.decoder.add(&mut self.state, data)?;
58+
self.decoder.read_file(&data.data, &mut self.state)?;
5959
self.state
6060
.add_internals_columns_batch(self.state.num_rows - num_rows);
6161

src/query/storages/stage/src/read/avro/decoder.rs

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use databend_common_expression::ColumnBuilder;
4040
use databend_common_expression::TableSchemaRef;
4141
use databend_common_meta_app::principal::AvroFileFormatParams;
4242
use databend_common_meta_app::principal::NullAs;
43+
use databend_common_storage::FileParseError;
4344
use lexical_core::FromLexical;
4445
use num_bigint::BigInt;
4546
use num_traits::NumCast;
@@ -52,7 +53,6 @@ use crate::read::block_builder_state::BlockBuilderState;
5253
use crate::read::default_expr_evaluator::DefaultExprEvaluator;
5354
use crate::read::error_handler::ErrorHandler;
5455
use crate::read::load_context::LoadContext;
55-
use crate::read::whole_file_reader::WholeFileData;
5656

5757
#[derive(Default, Debug)]
5858
struct Error {
@@ -79,6 +79,7 @@ impl Error {
7979
}
8080

8181
type ReadFieldResult = std::result::Result<(), Error>;
82+
8283
pub(super) struct AvroDecoder {
8384
pub is_rounding_mode: bool,
8485
pub params: AvroFileFormatParams,
@@ -130,10 +131,6 @@ impl AvroDecoder {
130131
}
131132
}
132133

133-
pub fn add(&self, state: &mut BlockBuilderState, data: WholeFileData) -> Result<()> {
134-
self.read_file(&data.data, state)
135-
}
136-
137134
fn read_file_for_select(
138135
&self,
139136
reader: Reader<&[u8]>,
@@ -149,16 +146,47 @@ impl AvroDecoder {
149146
"invalid column builder when querying avro",
150147
));
151148
};
152-
self.read_variant(column_builder, value.unwrap(), &schema)
153-
.map_err(|e| {
154-
ErrorCode::BadBytes(format!("fail to read row {row}: {:?}", e.reason))
155-
})?;
149+
if let Err(e) = self.read_variant(column_builder, value.unwrap(), &schema) {
150+
self.error_handler.on_error(
151+
FileParseError::InvalidRow {
152+
format: "AVRO".to_string(),
153+
message: e.reason.unwrap_or_default().to_string(),
154+
}
155+
.with_row(row),
156+
None,
157+
&mut state.file_status,
158+
&state.file_path,
159+
)?;
160+
}
156161
state.add_row(row)
157162
}
158163
Ok(())
159164
}
160-
fn read_file(&self, file_data: &[u8], state: &mut BlockBuilderState) -> Result<()> {
161-
let reader = Reader::new(file_data).unwrap();
165+
pub fn read_file(&self, file_data: &[u8], state: &mut BlockBuilderState) -> Result<()> {
166+
let reader = match Reader::new(file_data) {
167+
Ok(r) => r,
168+
Err(e) => {
169+
let e = match e {
170+
apache_avro::Error::HeaderMagic | apache_avro::Error::ReadHeader(_) => {
171+
FileParseError::WrongFileType {
172+
format: "AVRO".to_string(),
173+
message: e.to_string(),
174+
}
175+
}
176+
_ => FileParseError::BadFile {
177+
format: "AVRO".to_string(),
178+
message: e.to_string(),
179+
},
180+
};
181+
return self.error_handler.on_error(
182+
e.with_row(0),
183+
None,
184+
&mut state.file_status,
185+
&state.file_path,
186+
);
187+
}
188+
};
189+
162190
let mut schema = reader.writer_schema().clone();
163191
if !self.params.use_logic_type {
164192
date_time_to_int(&mut schema);
@@ -169,9 +197,15 @@ impl AvroDecoder {
169197
let src_schema = if let Schema::Record(record) = schema {
170198
record
171199
} else {
172-
return Err(ErrorCode::BadArguments(
173-
"only support avro with record as top level type",
174-
));
200+
let e = FileParseError::WrongRootType {
201+
message: "copy from AVRO only support record as root type".to_string(),
202+
};
203+
return self.error_handler.on_error(
204+
e.with_row(0),
205+
None,
206+
&mut state.file_status,
207+
&state.file_path,
208+
);
175209
};
176210
let names = self
177211
.schema

src/query/storages/stage/src/read/error_handler.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::sync::atomic::Ordering;
1818
use databend_common_ast::ast::OnErrorMode;
1919
use databend_common_exception::Result;
2020
use databend_common_expression::ColumnBuilder;
21-
use databend_common_storage::FileParseError;
21+
use databend_common_storage::FileParseErrorAtLine;
2222
use databend_common_storage::FileStatus;
2323

2424
pub struct ErrorHandler {
@@ -29,11 +29,10 @@ pub struct ErrorHandler {
2929
impl ErrorHandler {
3030
pub fn on_error(
3131
&self,
32-
e: FileParseError,
32+
e: FileParseErrorAtLine,
3333
columns: Option<(&mut [ColumnBuilder], usize)>,
3434
file_status: &mut FileStatus,
3535
file_path: &str,
36-
line: usize,
3736
) -> Result<()> {
3837
if let Some((columns, num_rows)) = columns {
3938
columns.iter_mut().for_each(|c| {
@@ -48,19 +47,20 @@ impl ErrorHandler {
4847

4948
match &self.on_error_mode {
5049
OnErrorMode::Continue => {
51-
file_status.add_error(e, line);
50+
let line = e.line;
51+
file_status.add_error(e.error, line);
5252
Ok(())
5353
}
5454
OnErrorMode::AbortNum(abort_num) => {
5555
if *abort_num <= 1
5656
|| self.on_error_count.fetch_add(1, Ordering::Relaxed) >= *abort_num - 1
5757
{
58-
Err(e.to_error_code(&self.on_error_mode, file_path, line))
58+
Err(e.to_error_code(&self.on_error_mode, file_path))
5959
} else {
6060
Ok(())
6161
}
6262
}
63-
_ => Err(e.to_error_code(&self.on_error_mode, file_path, line)),
63+
_ => Err(e.to_error_code(&self.on_error_mode, file_path)),
6464
}
6565
}
6666
}

src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,10 @@ impl RowDecoder for CsvDecoder {
164164
&data.field_ends[field_end_idx..field_end_idx + num_fields],
165165
) {
166166
self.load_context.error_handler.on_error(
167-
e,
167+
e.with_row(row_id),
168168
Some((columns, state.num_rows)),
169169
&mut state.file_status,
170170
&batch.start_pos.path,
171-
row_id,
172171
)?
173172
} else {
174173
state.add_row(row_id);

src/query/storages/stage/src/read/row_based/formats/csv/separator.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,10 @@ impl CsvReader {
164164

165165
if let Err(e) = self.check_num_field() {
166166
self.load_ctx.error_handler.on_error(
167-
e,
167+
e.with_row(self.pos.rows),
168168
None,
169169
file_status,
170170
&self.pos.path,
171-
self.pos.rows,
172171
)?;
173172
ReadRecordOutput::RecordSkipped
174173
} else {

src/query/storages/stage/src/read/row_based/formats/ndjson/block_builder.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ impl NdJsonDecoder {
5555
if self.field_decoder.is_select {
5656
self.field_decoder
5757
.read_field(&mut columns[0], &json)
58-
.map_err(|e| FileParseError::InvalidNDJsonRow {
58+
.map_err(|e| FileParseError::InvalidRow {
59+
format: "NDJSON".to_string(),
5960
message: e.to_string(),
6061
})?;
6162
} else {
@@ -169,11 +170,10 @@ impl RowDecoder for NdJsonDecoder {
169170
if !row.is_empty() {
170171
if let Err(e) = self.read_row(row, columns, &null_if) {
171172
self.load_context.error_handler.on_error(
172-
e,
173+
e.with_row(row_id),
173174
Some((columns, state.num_rows)),
174175
&mut state.file_status,
175176
&batch.start_pos.path,
176-
row_id,
177177
)?
178178
} else {
179179
state.add_row(row_id);
@@ -206,7 +206,10 @@ fn map_json_error(err: serde_json::Error, data: &[u8]) -> FileParseError {
206206
if err.column() < len {
207207
message = format!("{message}, next byte is '{}'", data[pos] as char)
208208
}
209-
FileParseError::InvalidNDJsonRow { message }
209+
FileParseError::InvalidRow {
210+
format: "NDJSON".to_string(),
211+
message,
212+
}
210213
}
211214

212215
#[cfg(test)]
@@ -218,7 +221,7 @@ mod test {
218221
serde_json::from_slice::<serde_json::Value>(data.as_bytes())
219222
.map_err(|e| {
220223
let e = map_json_error(e, data.as_bytes());
221-
if let FileParseError::InvalidNDJsonRow { message } = e {
224+
if let FileParseError::InvalidRow { message, .. } = e {
222225
message
223226
} else {
224227
unreachable!()

src/query/storages/stage/src/read/row_based/formats/tsv/block_builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,10 @@ impl RowDecoder for TsvDecoder {
198198
if !row.is_empty() {
199199
if let Err(e) = self.read_row(row, columns) {
200200
self.load_context.error_handler.on_error(
201-
e,
201+
e.with_row(row_id),
202202
Some((columns, state.num_rows)),
203203
&mut state.file_status,
204204
&batch.start_pos.path,
205-
row_id,
206205
)?
207206
} else {
208207
state.add_row(row_id);
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
statement ok
2+
create or replace table t1 (a variant);
3+
4+
statement error
5+
select $1 from @data/parquet/tuple.parquet (file_format=>'avro')
6+
7+
query ??
8+
copy into t1 from @data/parquet/tuple.parquet file_format = (type=avro) on_error = continue
9+
----
10+
parquet/tuple.parquet 0 1 Not a AVRO file (wrong magic in header) 1
11+
12+
statement error Not a AVRO file
13+
copy into t1 from @data/parquet/variant.parquet file_format = (type=avro)

0 commit comments

Comments
 (0)