Skip to content

Commit b62118e

Browse files
committed
feature(meta/error): add MetaBytesError
Add `MetaBytesError` as a container of errors when encoding/decoding bytes. Narrow down error usage: codec function only needs `MetaBytesError`, not `MetaError` Merge `MetaError::SerdeError` to `MetaError::BytesError`: they both represent a codec error. Do not need to distinguish.
1 parent 7de3d3e commit b62118e

File tree

11 files changed

+93
-49
lines changed

11 files changed

+93
-49
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/api/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ common-proto-conv = { path = "../proto-conv" }
2222
common-protos = { path = "../protos" }
2323
common-tracing = { path = "../../common/tracing" }
2424

25-
anyerror = "=0.1.6"
2625
anyhow = "1.0.58"
2726
async-trait = "0.1.56"
2827
enumflags2 = { version = "0.7.5", features = ["serde"] }

src/meta/api/src/kv_api_utils.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
use std::fmt::Display;
1616

17-
use anyerror::AnyError;
1817
use common_meta_app::schema::DatabaseId;
1918
use common_meta_app::schema::DatabaseIdToName;
2019
use common_meta_app::schema::DatabaseMeta;
@@ -32,6 +31,7 @@ use common_meta_types::txn_condition::Target;
3231
use common_meta_types::txn_op::Request;
3332
use common_meta_types::ConditionResult;
3433
use common_meta_types::MatchSeq;
34+
use common_meta_types::MetaBytesError;
3535
use common_meta_types::MetaError;
3636
use common_meta_types::Operation;
3737
use common_meta_types::TxnCondition;
@@ -107,7 +107,7 @@ pub async fn list_keys<K: KVApiKey>(
107107
let mut structured_keys = Vec::with_capacity(n);
108108

109109
for (str_key, _seq_id) in res.iter() {
110-
let struct_key = K::from_key(str_key).map_err(meta_encode_err)?;
110+
let struct_key = K::from_key(str_key).map_err(to_bytes_err)?;
111111
structured_keys.push(struct_key);
112112
}
113113

@@ -133,24 +133,24 @@ pub async fn list_u64_value<K: KVApiKey>(
133133
let mut values = Vec::with_capacity(n);
134134

135135
for (str_key, seqv) in res.iter() {
136-
let id = *deserialize_u64(&seqv.data).map_err(meta_encode_err)?;
136+
let id = *deserialize_u64(&seqv.data)?;
137137
values.push(id);
138138

139139
// Parse key
140-
let struct_key = K::from_key(str_key).map_err(meta_encode_err)?;
140+
let struct_key = K::from_key(str_key).map_err(to_bytes_err)?;
141141
structured_keys.push(struct_key);
142142
}
143143

144144
Ok((structured_keys, values))
145145
}
146146

147-
pub fn serialize_u64(value: impl Into<Id>) -> Result<Vec<u8>, MetaError> {
148-
let v = serde_json::to_vec(&*value.into()).map_err(meta_encode_err)?;
147+
pub fn serialize_u64(value: impl Into<Id>) -> Result<Vec<u8>, MetaBytesError> {
148+
let v = serde_json::to_vec(&*value.into())?;
149149
Ok(v)
150150
}
151151

152-
pub fn deserialize_u64(v: &[u8]) -> Result<Id, MetaError> {
153-
let id = serde_json::from_slice(v).map_err(meta_encode_err)?;
152+
pub fn deserialize_u64(v: &[u8]) -> Result<Id, MetaBytesError> {
153+
let id = serde_json::from_slice(v)?;
154154
Ok(Id::new(id))
155155
}
156156

@@ -173,30 +173,30 @@ pub async fn fetch_id<T: KVApiKey>(kv_api: &impl KVApi, generator: T) -> Result<
173173
Ok(seq_v.seq)
174174
}
175175

176-
pub fn serialize_struct<T>(value: &T) -> Result<Vec<u8>, MetaError>
176+
pub fn serialize_struct<T>(value: &T) -> Result<Vec<u8>, MetaBytesError>
177177
where
178178
T: FromToProto + 'static,
179179
T::PB: common_protos::prost::Message,
180180
{
181-
let p = value.to_pb().map_err(meta_encode_err)?;
181+
let p = value.to_pb().map_err(to_bytes_err)?;
182182
let mut buf = vec![];
183-
common_protos::prost::Message::encode(&p, &mut buf).map_err(meta_encode_err)?;
183+
common_protos::prost::Message::encode(&p, &mut buf)?;
184184
Ok(buf)
185185
}
186186

187-
pub fn deserialize_struct<T>(buf: &[u8]) -> Result<T, MetaError>
187+
pub fn deserialize_struct<T>(buf: &[u8]) -> Result<T, MetaBytesError>
188188
where
189189
T: FromToProto,
190190
T::PB: common_protos::prost::Message + Default,
191191
{
192-
let p: T::PB = common_protos::prost::Message::decode(buf).map_err(meta_encode_err)?;
193-
let v: T = FromToProto::from_pb(p).map_err(meta_encode_err)?;
192+
let p: T::PB = common_protos::prost::Message::decode(buf)?;
193+
let v: T = FromToProto::from_pb(p).map_err(to_bytes_err)?;
194194

195195
Ok(v)
196196
}
197197

198-
pub fn meta_encode_err<E: std::error::Error + 'static>(e: E) -> MetaError {
199-
MetaError::EncodeError(AnyError::new(&e))
198+
pub fn to_bytes_err<E: std::error::Error + 'static>(e: E) -> MetaBytesError {
199+
MetaBytesError::new(&e)
200200
}
201201

202202
pub async fn send_txn(
@@ -485,6 +485,7 @@ where
485485
Ok((false, None))
486486
}
487487

488+
/// Get existing value by key. Panic if key is absent.
488489
pub async fn get_kv_data<T>(
489490
kv_api: &(impl KVApi + ?Sized),
490491
key: &impl KVApiKey,
@@ -495,13 +496,11 @@ where
495496
{
496497
let res = kv_api.get_kv(&key.to_key()).await?;
497498
if let Some(res) = res {
498-
return deserialize_struct(&res.data);
499+
let s = deserialize_struct(&res.data)?;
500+
return Ok(s);
499501
};
500502

501-
Err(MetaError::SerdeError(AnyError::error(format!(
502-
"get_kv {:?} fail",
503-
key.to_key()
504-
))))
503+
unreachable!("failed to get {}", key.to_key())
505504
}
506505

507506
pub async fn get_object_shared_by_share_ids(

src/meta/api/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ pub use kv_api_utils::is_all_db_data_removed;
5757
pub use kv_api_utils::is_db_need_to_be_remove;
5858
pub use kv_api_utils::list_keys;
5959
pub use kv_api_utils::list_u64_value;
60-
pub use kv_api_utils::meta_encode_err;
6160
pub use kv_api_utils::send_txn;
6261
pub use kv_api_utils::serialize_struct;
6362
pub use kv_api_utils::serialize_u64;

src/meta/api/src/schema_api_impl.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ use crate::get_u64_value;
107107
use crate::is_db_need_to_be_remove;
108108
use crate::list_keys;
109109
use crate::list_u64_value;
110-
use crate::meta_encode_err;
111110
use crate::send_txn;
112111
use crate::serialize_struct;
113112
use crate::serialize_u64;
@@ -818,8 +817,7 @@ impl<KV: KVApi> SchemaApi for KV {
818817

819818
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
820819
if let Some(seq_meta) = seq_meta_opt {
821-
let db_meta: DatabaseMeta =
822-
deserialize_struct(&seq_meta.data).map_err(meta_encode_err)?;
820+
let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?;
823821

824822
let db_info = DatabaseInfo {
825823
ident: DatabaseIdent {
@@ -1665,8 +1663,7 @@ impl<KV: KVApi> SchemaApi for KV {
16651663

16661664
for (i, seq_meta_opt) in seq_tb_metas.iter().enumerate() {
16671665
if let Some(seq_meta) = seq_meta_opt {
1668-
let tb_meta: TableMeta =
1669-
deserialize_struct(&seq_meta.data).map_err(meta_encode_err)?;
1666+
let tb_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
16701667

16711668
let tb_info = TableInfo {
16721669
ident: TableIdent {

src/meta/grpc/src/grpc_client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -802,11 +802,11 @@ impl MetaGrpcClient {
802802
T: Into<MetaGrpcReadReq>,
803803
R: DeserializeOwned,
804804
{
805-
let act: MetaGrpcReadReq = v.into();
805+
let read_req: MetaGrpcReadReq = v.into();
806806

807-
debug!(req = debug(&act), "MetaGrpcClient::do_read request");
807+
debug!(req = debug(&read_req), "MetaGrpcClient::do_read request");
808808

809-
let req: Request<RaftRequest> = act.clone().try_into()?;
809+
let req: Request<RaftRequest> = read_req.clone().try_into()?;
810810

811811
debug!(
812812
req = debug(&req),
@@ -826,7 +826,7 @@ impl MetaGrpcClient {
826826
if status_is_retryable(&s) {
827827
self.mark_as_unhealthy().await;
828828
let mut client = self.make_client().await?;
829-
let req: Request<RaftRequest> = act.try_into()?;
829+
let req: Request<RaftRequest> = read_req.try_into()?;
830830
let req = common_tracing::inject_span_to_tonic_request(req);
831831
Ok(client.read_msg(req).await?.into_inner())
832832
} else {

src/meta/types/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod kv_message;
2727
mod log_entry;
2828
mod match_seq;
2929
mod message;
30+
mod meta_bytes_error;
3031
mod meta_errors;
3132
mod meta_errors_into;
3233
mod meta_management_error;
@@ -112,6 +113,7 @@ pub use message::ForwardRequestBody;
112113
pub use message::ForwardResponse;
113114
pub use message::JoinRequest;
114115
pub use message::LeaveRequest;
116+
pub use meta_bytes_error::MetaBytesError;
115117
pub use meta_errors::MetaError;
116118
pub use meta_errors::MetaResult;
117119
pub use meta_errors_into::ToMetaError;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use anyerror::AnyError;
16+
17+
/// Errors that occur when encode/decode
18+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, thiserror::Error)]
19+
#[error("MetaBytesError: {source}")]
20+
pub struct MetaBytesError {
21+
#[source]
22+
pub source: AnyError,
23+
}
24+
25+
impl MetaBytesError {
26+
pub fn new(error: &(impl std::error::Error + 'static)) -> Self {
27+
Self {
28+
source: AnyError::new(error),
29+
}
30+
}
31+
}
32+
33+
impl From<serde_json::Error> for MetaBytesError {
34+
fn from(e: serde_json::Error) -> Self {
35+
Self::new(&e)
36+
}
37+
}
38+
39+
impl From<std::string::FromUtf8Error> for MetaBytesError {
40+
fn from(e: std::string::FromUtf8Error) -> Self {
41+
Self::new(&e)
42+
}
43+
}
44+
45+
impl From<prost::EncodeError> for MetaBytesError {
46+
fn from(e: prost::EncodeError) -> Self {
47+
Self::new(&e)
48+
}
49+
}
50+
51+
impl From<prost::DecodeError> for MetaBytesError {
52+
fn from(e: prost::DecodeError) -> Self {
53+
Self::new(&e)
54+
}
55+
}

src/meta/types/src/meta_errors.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use serde::Serialize;
1818
use thiserror::Error;
1919

2020
use crate::app_error::AppError;
21+
use crate::MetaBytesError;
2122
use crate::MetaNetworkError;
2223
use crate::MetaRaftError;
2324
use crate::MetaStorageError;
@@ -46,13 +47,8 @@ pub enum MetaError {
4647
#[error("{0}")]
4748
MetaServiceError(String),
4849

49-
/// type to represent serialize/deserialize errors
5050
#[error(transparent)]
51-
SerdeError(AnyError),
52-
53-
/// Error when encoding auth
54-
#[error(transparent)]
55-
EncodeError(AnyError),
51+
BytesError(#[from] MetaBytesError),
5652

5753
#[error(transparent)]
5854
AppError(#[from] AppError),

src/meta/types/src/meta_errors_into.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
use std::error::Error;
1616
use std::fmt::Display;
1717

18-
use anyerror::AnyError;
1918
use common_exception::ErrorCode;
2019
use prost::EncodeError;
2120
use tonic::Code;
2221

2322
use crate::meta_network_errors::InvalidArgument;
2423
use crate::ConnectionError;
24+
use crate::MetaBytesError;
2525
use crate::MetaError;
2626
use crate::MetaNetworkError;
2727
use crate::MetaResult;
@@ -45,10 +45,7 @@ impl From<MetaError> for ErrorCode {
4545
}
4646
MetaError::MetaStoreNotFound => ErrorCode::MetaServiceError("MetaStoreNotFound"),
4747
MetaError::MetaServiceError(err_str) => ErrorCode::MetaServiceError(err_str),
48-
MetaError::SerdeError(ae) => {
49-
ErrorCode::MetaServiceError(ae.to_string()).set_backtrace(ae.backtrace())
50-
}
51-
MetaError::EncodeError(ae) => {
48+
MetaError::BytesError(ae) => {
5249
ErrorCode::MetaServiceError(ae.to_string()).set_backtrace(ae.backtrace())
5350
}
5451
MetaError::Fatal(ae) => {
@@ -129,12 +126,12 @@ impl From<tonic::Status> for MetaError {
129126

130127
impl From<serde_json::Error> for MetaError {
131128
fn from(error: serde_json::Error) -> MetaError {
132-
MetaError::SerdeError(AnyError::new(&error))
129+
MetaError::BytesError(MetaBytesError::new(&error))
133130
}
134131
}
135132

136133
impl From<EncodeError> for MetaError {
137134
fn from(e: EncodeError) -> MetaError {
138-
MetaError::EncodeError(AnyError::new(&e))
135+
MetaError::BytesError(MetaBytesError::new(&e))
139136
}
140137
}

0 commit comments

Comments
 (0)