Skip to content

feat: show backtrace from iceberg::Error by introducing a newtype #21919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev =
prost = { version = "0.13" }
prost-build = { version = "0.13" }
# branch dev_rebase_main_20250325
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "ddef529", features = [
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bc7dec7fea2b6d00066e7acf2f399d4a85eb0d6f", features = [
"storage-s3",
"storage-gcs",
"storage-azblob",
] }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "ddef529" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "ddef529" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bc7dec7fea2b6d00066e7acf2f399d4a85eb0d6f" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bc7dec7fea2b6d00066e7acf2f399d4a85eb0d6f" }
opendal = "0.49"
arrow-udf-runtime = "0.8.0"
clap = { version = "4", features = ["cargo", "derive", "env"] }
Expand Down
1 change: 1 addition & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ disallowed-types = [
{ path = "num_traits::ToPrimitive", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." },
{ path = "num_traits::NumCast", reason = "Please use `From` or `TryFrom` with `OrderedFloat` instead." },
{ path = "aws_smithy_types::error::display::DisplayErrorContext", reason = "Please use `thiserror_ext::AsReport` instead." },
{ path = "iceberg::Error", reason = "Please use `risingwave_common::error::IcebergError` instead." },
]
disallowed-macros = [
{ path = "lazy_static::lazy_static", reason = "Please use `std::sync::LazyLock` instead." },
Expand Down
4 changes: 3 additions & 1 deletion src/batch/clippy.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
disallowed-methods = []

disallowed-types = []
disallowed-types = [
{ path = "iceberg::Error", reason = "Please use `risingwave_common::error::IcebergError` instead." },
]

doc-valid-idents = [
"RisingWave",
Expand Down
12 changes: 8 additions & 4 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(clippy::disallowed_types, clippy::disallowed_methods)]

use std::sync::Arc;

pub use anyhow::anyhow;
use iceberg::Error as IcebergError;
use mysql_async::Error as MySqlError;
use parquet::errors::ParquetError;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{BoxedError, def_anyhow_newtype, def_anyhow_variant};
use risingwave_common::error::{BoxedError, IcebergError, def_anyhow_newtype, def_anyhow_variant};
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_connector::error::ConnectorError;
use risingwave_dml::error::DmlError;
Expand Down Expand Up @@ -199,3 +196,10 @@ def_anyhow_variant! {
ParquetError => "Parquet error",
MySqlError => "MySQL error",
}

#[expect(clippy::disallowed_types)]
impl From<iceberg::Error> for BatchExternalSystemError {
fn from(value: iceberg::Error) -> Self {
risingwave_common::error::IcebergError::from(value).into()
}
}
1 change: 1 addition & 0 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
//! only re-export them here.

pub use risingwave_error::common::*;
pub use risingwave_error::wrappers::*;
pub use risingwave_error::*;
5 changes: 5 additions & 0 deletions src/connector/src/connector_common/iceberg/jni_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

//! This module provide jni catalog.

#![expect(
clippy::disallowed_types,
reason = "construct iceberg::Error to implement the trait"
)]

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/connector_common/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

//! This module provide storage catalog.

#![expect(
clippy::disallowed_types,
reason = "construct iceberg::Error to implement the trait"
)]

use std::collections::HashMap;

use async_trait::async_trait;
Expand Down
9 changes: 8 additions & 1 deletion src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def_anyhow_newtype! {
async_nats::error::Error<async_nats::jetstream::context::RequestErrorKind> => "Nats error",
NatsJetStreamError => "Nats error",

iceberg::Error => "IcebergV2 error",
risingwave_common::error::IcebergError => "IcebergV2 error",
redis::RedisError => "Redis error",
risingwave_common::array::arrow::arrow_schema_iceberg::ArrowError => "Arrow error",
google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error",
Expand All @@ -93,3 +93,10 @@ impl From<ConnectorError> for RpcError {
RpcError::Internal(value.0)
}
}

#[expect(clippy::disallowed_types)]
impl From<iceberg::Error> for ConnectorError {
fn from(value: iceberg::Error) -> Self {
risingwave_common::error::IcebergError::from(value).into()
}
}
49 changes: 16 additions & 33 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;

use anyhow::anyhow;
use anyhow::{Context, bail};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{FutureExt, Stream, TryFutureExt};
use iceberg::io::{
FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY,
};
use iceberg::{Error, ErrorKind};
use itertools::Itertools;
use opendal::Operator;
use opendal::layers::{LoggingLayer, RetryLayer};
Expand Down Expand Up @@ -89,19 +88,16 @@ pub async fn create_parquet_stream_builder(
props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone());

let file_io_builder = FileIOBuilder::new("s3");
let file_io = file_io_builder
.with_props(props.into_iter())
.build()
.map_err(|e| anyhow!(e))?;
let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?;
let file_io = file_io_builder.with_props(props.into_iter()).build()?;
let parquet_file = file_io.new_input(&location)?;

let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?;
let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?;
let parquet_metadata = parquet_file.metadata().await?;
let parquet_reader = parquet_file.reader().await?;
let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader);

ParquetRecordBatchStreamBuilder::new(parquet_file_reader)
.await
.map_err(|e| anyhow!(e))
.map_err(Into::into)
}

pub fn new_s3_operator(
Expand Down Expand Up @@ -173,12 +169,7 @@ pub fn extract_bucket_and_file_name(
let url = Url::parse(location)?;
let bucket = url
.host_str()
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid url: {}, missing bucket", location),
)
})?
.with_context(|| format!("Invalid url: {}, missing bucket", location))?
.to_owned();
let prefix = match file_scan_backend {
FileScanBackend::S3 => format!("s3://{}/", bucket),
Expand All @@ -201,19 +192,13 @@ pub async fn list_data_directory(
FileScanBackend::Azblob => format!("azblob://{}/", bucket),
};
if dir.starts_with(&prefix) {
op.list(&file_name)
.await
.map_err(|e| anyhow!(e))
.map(|list| {
list.into_iter()
.map(|entry| prefix.clone() + entry.path())
.collect()
})
op.list(&file_name).await.map_err(Into::into).map(|list| {
list.into_iter()
.map(|entry| prefix.clone() + entry.path())
.collect()
})
} else {
Err(Error::new(
ErrorKind::DataInvalid,
format!("Invalid url: {}, should start with {}", dir, prefix),
))?
bail!("Invalid url: {}, should start with {}", dir, prefix)
}
}

Expand Down Expand Up @@ -317,8 +302,7 @@ pub async fn read_parquet_file(
let converted_arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.map_err(anyhow::Error::from)?;
)?;
let columns = match parser_columns {
Some(columns) => columns,
None => converted_arrow_schema
Expand Down Expand Up @@ -358,14 +342,13 @@ pub async fn get_parquet_fields(
.into_futures_async_read(..)
.await?
.compat();
let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;
let parquet_metadata = reader.get_metadata().await?;

let file_metadata = parquet_metadata.file_metadata();
let converted_arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.map_err(anyhow::Error::from)?;
)?;
let fields: risingwave_common::array::arrow::arrow_schema_iceberg::Fields =
converted_arrow_schema.fields;
Ok(fields)
Expand Down
1 change: 1 addition & 0 deletions src/error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ repository = { workspace = true }
anyhow = "1"
bincode = "1"
easy-ext = "1"
iceberg = { workspace = true }
serde = "1"
serde-error = "0.1"
thiserror = "2"
Expand Down
1 change: 1 addition & 0 deletions src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod anyhow;
pub mod common;
pub mod macros;
pub mod tonic;
pub mod wrappers;

// Re-export the `thiserror-ext` crate.
pub use thiserror_ext;
Expand Down
42 changes: 42 additions & 0 deletions src/error/src/wrappers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![expect(clippy::disallowed_types)]

#[derive(::std::fmt::Debug)]
pub struct IcebergError(iceberg::Error);

impl std::fmt::Display for IcebergError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl std::error::Error for IcebergError {
fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) {
request.provide_ref::<std::backtrace::Backtrace>(self.0.backtrace());
}

fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
}

impl From<iceberg::Error> for IcebergError {
fn from(value: iceberg::Error) -> Self {
IcebergError(value)
}
}

// TODO: add opendal after opendal 0.54
4 changes: 3 additions & 1 deletion src/stream/clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ disallowed-methods = [
{ path = "risingwave_expr::expr::Expression::eval_row", reason = "Please use `NonStrictExpression::eval_row_infallible` instead." },
]

disallowed-types = []
disallowed-types = [
{ path = "iceberg::Error", reason = "Please use `risingwave_common::error::IcebergError` instead." },
]

doc-valid-idents = [
"RisingWave",
Expand Down
Loading