Skip to content

Commit 0209073

Browse files
authored
Merge branch 'main' into enlarge-sleep-limit
2 parents 8afa5fc + 74aa09a commit 0209073

File tree

12 files changed

+99
-48
lines changed

12 files changed

+99
-48
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/api/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ common-proto-conv = { path = "../proto-conv" }
2222
common-protos = { path = "../protos" }
2323
common-tracing = { path = "../../common/tracing" }
2424

25-
anyerror = "=0.1.6"
2625
anyhow = "1.0.58"
2726
async-trait = "0.1.56"
2827
enumflags2 = { version = "0.7.5", features = ["serde"] }

src/meta/api/src/kv_api_utils.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414

1515
use std::fmt::Display;
1616

17-
use anyerror::AnyError;
1817
use common_meta_app::schema::DatabaseId;
1918
use common_meta_app::schema::DatabaseIdToName;
2019
use common_meta_app::schema::DatabaseMeta;
2120
use common_meta_app::schema::DatabaseNameIdent;
2221
use common_meta_app::schema::TableNameIdent;
2322
use common_meta_app::share::*;
23+
use common_meta_types::anyerror::AnyError;
2424
use common_meta_types::app_error::AppError;
2525
use common_meta_types::app_error::ShareHasNoGrantedDatabase;
2626
use common_meta_types::app_error::UnknownDatabase;
@@ -32,6 +32,7 @@ use common_meta_types::txn_condition::Target;
3232
use common_meta_types::txn_op::Request;
3333
use common_meta_types::ConditionResult;
3434
use common_meta_types::MatchSeq;
35+
use common_meta_types::MetaBytesError;
3536
use common_meta_types::MetaError;
3637
use common_meta_types::Operation;
3738
use common_meta_types::TxnCondition;
@@ -107,7 +108,7 @@ pub async fn list_keys<K: KVApiKey>(
107108
let mut structured_keys = Vec::with_capacity(n);
108109

109110
for (str_key, _seq_id) in res.iter() {
110-
let struct_key = K::from_key(str_key).map_err(meta_encode_err)?;
111+
let struct_key = K::from_key(str_key).map_err(to_bytes_err)?;
111112
structured_keys.push(struct_key);
112113
}
113114

@@ -133,24 +134,24 @@ pub async fn list_u64_value<K: KVApiKey>(
133134
let mut values = Vec::with_capacity(n);
134135

135136
for (str_key, seqv) in res.iter() {
136-
let id = *deserialize_u64(&seqv.data).map_err(meta_encode_err)?;
137+
let id = *deserialize_u64(&seqv.data)?;
137138
values.push(id);
138139

139140
// Parse key
140-
let struct_key = K::from_key(str_key).map_err(meta_encode_err)?;
141+
let struct_key = K::from_key(str_key).map_err(to_bytes_err)?;
141142
structured_keys.push(struct_key);
142143
}
143144

144145
Ok((structured_keys, values))
145146
}
146147

147-
pub fn serialize_u64(value: impl Into<Id>) -> Result<Vec<u8>, MetaError> {
148-
let v = serde_json::to_vec(&*value.into()).map_err(meta_encode_err)?;
148+
pub fn serialize_u64(value: impl Into<Id>) -> Result<Vec<u8>, MetaBytesError> {
149+
let v = serde_json::to_vec(&*value.into())?;
149150
Ok(v)
150151
}
151152

152-
pub fn deserialize_u64(v: &[u8]) -> Result<Id, MetaError> {
153-
let id = serde_json::from_slice(v).map_err(meta_encode_err)?;
153+
pub fn deserialize_u64(v: &[u8]) -> Result<Id, MetaBytesError> {
154+
let id = serde_json::from_slice(v)?;
154155
Ok(Id::new(id))
155156
}
156157

@@ -173,30 +174,30 @@ pub async fn fetch_id<T: KVApiKey>(kv_api: &impl KVApi, generator: T) -> Result<
173174
Ok(seq_v.seq)
174175
}
175176

176-
pub fn serialize_struct<T>(value: &T) -> Result<Vec<u8>, MetaError>
177+
pub fn serialize_struct<T>(value: &T) -> Result<Vec<u8>, MetaBytesError>
177178
where
178179
T: FromToProto + 'static,
179180
T::PB: common_protos::prost::Message,
180181
{
181-
let p = value.to_pb().map_err(meta_encode_err)?;
182+
let p = value.to_pb().map_err(to_bytes_err)?;
182183
let mut buf = vec![];
183-
common_protos::prost::Message::encode(&p, &mut buf).map_err(meta_encode_err)?;
184+
common_protos::prost::Message::encode(&p, &mut buf)?;
184185
Ok(buf)
185186
}
186187

187-
pub fn deserialize_struct<T>(buf: &[u8]) -> Result<T, MetaError>
188+
pub fn deserialize_struct<T>(buf: &[u8]) -> Result<T, MetaBytesError>
188189
where
189190
T: FromToProto,
190191
T::PB: common_protos::prost::Message + Default,
191192
{
192-
let p: T::PB = common_protos::prost::Message::decode(buf).map_err(meta_encode_err)?;
193-
let v: T = FromToProto::from_pb(p).map_err(meta_encode_err)?;
193+
let p: T::PB = common_protos::prost::Message::decode(buf)?;
194+
let v: T = FromToProto::from_pb(p).map_err(to_bytes_err)?;
194195

195196
Ok(v)
196197
}
197198

198-
pub fn meta_encode_err<E: std::error::Error + 'static>(e: E) -> MetaError {
199-
MetaError::EncodeError(AnyError::new(&e))
199+
pub fn to_bytes_err<E: std::error::Error + 'static>(e: E) -> MetaBytesError {
200+
MetaBytesError::new(&e)
200201
}
201202

202203
pub async fn send_txn(
@@ -485,6 +486,8 @@ where
485486
Ok((false, None))
486487
}
487488

489+
/// Get existing value by key. Panic if key is absent.
490+
/// This function is only used for testing.
488491
pub async fn get_kv_data<T>(
489492
kv_api: &(impl KVApi + ?Sized),
490493
key: &impl KVApiKey,
@@ -495,11 +498,12 @@ where
495498
{
496499
let res = kv_api.get_kv(&key.to_key()).await?;
497500
if let Some(res) = res {
498-
return deserialize_struct(&res.data);
501+
let s = deserialize_struct(&res.data)?;
502+
return Ok(s);
499503
};
500504

501-
Err(MetaError::SerdeError(AnyError::error(format!(
502-
"get_kv {:?} fail",
505+
Err(MetaError::Fatal(AnyError::error(format!(
506+
"failed to get {}",
503507
key.to_key()
504508
))))
505509
}

src/meta/api/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ pub use kv_api_utils::is_all_db_data_removed;
5757
pub use kv_api_utils::is_db_need_to_be_remove;
5858
pub use kv_api_utils::list_keys;
5959
pub use kv_api_utils::list_u64_value;
60-
pub use kv_api_utils::meta_encode_err;
6160
pub use kv_api_utils::send_txn;
6261
pub use kv_api_utils::serialize_struct;
6362
pub use kv_api_utils::serialize_u64;

src/meta/api/src/schema_api_impl.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ use crate::get_u64_value;
107107
use crate::is_db_need_to_be_remove;
108108
use crate::list_keys;
109109
use crate::list_u64_value;
110-
use crate::meta_encode_err;
111110
use crate::send_txn;
112111
use crate::serialize_struct;
113112
use crate::serialize_u64;
@@ -818,8 +817,7 @@ impl<KV: KVApi> SchemaApi for KV {
818817

819818
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
820819
if let Some(seq_meta) = seq_meta_opt {
821-
let db_meta: DatabaseMeta =
822-
deserialize_struct(&seq_meta.data).map_err(meta_encode_err)?;
820+
let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?;
823821

824822
let db_info = DatabaseInfo {
825823
ident: DatabaseIdent {
@@ -1665,8 +1663,7 @@ impl<KV: KVApi> SchemaApi for KV {
16651663

16661664
for (i, seq_meta_opt) in seq_tb_metas.iter().enumerate() {
16671665
if let Some(seq_meta) = seq_meta_opt {
1668-
let tb_meta: TableMeta =
1669-
deserialize_struct(&seq_meta.data).map_err(meta_encode_err)?;
1666+
let tb_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
16701667

16711668
let tb_info = TableInfo {
16721669
ident: TableIdent {

src/meta/api/src/schema_api_test_suite.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2374,11 +2374,13 @@ impl SchemaApiTestSuite {
23742374
for db_id in old_id_list.iter() {
23752375
let id_key = DatabaseId { db_id: *db_id };
23762376
let id_mapping = DatabaseIdToName { db_id: *db_id };
2377+
23772378
let meta_res: Result<DatabaseMeta, MetaError> =
23782379
get_kv_data(mt.as_kv_api(), &id_key).await;
2380+
assert!(meta_res.is_err());
2381+
23792382
let mapping_res: Result<DatabaseNameIdent, MetaError> =
23802383
get_kv_data(mt.as_kv_api(), &id_mapping).await;
2381-
assert!(meta_res.is_err());
23822384
assert!(mapping_res.is_err());
23832385
}
23842386

src/meta/grpc/src/grpc_client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -802,11 +802,11 @@ impl MetaGrpcClient {
802802
T: Into<MetaGrpcReadReq>,
803803
R: DeserializeOwned,
804804
{
805-
let act: MetaGrpcReadReq = v.into();
805+
let read_req: MetaGrpcReadReq = v.into();
806806

807-
debug!(req = debug(&act), "MetaGrpcClient::do_read request");
807+
debug!(req = debug(&read_req), "MetaGrpcClient::do_read request");
808808

809-
let req: Request<RaftRequest> = act.clone().try_into()?;
809+
let req: Request<RaftRequest> = read_req.clone().try_into()?;
810810

811811
debug!(
812812
req = debug(&req),
@@ -826,7 +826,7 @@ impl MetaGrpcClient {
826826
if status_is_retryable(&s) {
827827
self.mark_as_unhealthy().await;
828828
let mut client = self.make_client().await?;
829-
let req: Request<RaftRequest> = act.try_into()?;
829+
let req: Request<RaftRequest> = read_req.try_into()?;
830830
let req = common_tracing::inject_span_to_tonic_request(req);
831831
Ok(client.read_msg(req).await?.into_inner())
832832
} else {

src/meta/types/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod kv_message;
2727
mod log_entry;
2828
mod match_seq;
2929
mod message;
30+
mod meta_bytes_error;
3031
mod meta_errors;
3132
mod meta_errors_into;
3233
mod meta_management_error;
@@ -112,6 +113,7 @@ pub use message::ForwardRequestBody;
112113
pub use message::ForwardResponse;
113114
pub use message::JoinRequest;
114115
pub use message::LeaveRequest;
116+
pub use meta_bytes_error::MetaBytesError;
115117
pub use meta_errors::MetaError;
116118
pub use meta_errors::MetaResult;
117119
pub use meta_errors_into::ToMetaError;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use anyerror::AnyError;
16+
17+
/// Errors that occur when encode/decode
18+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, thiserror::Error)]
19+
#[error("MetaBytesError: {source}")]
20+
pub struct MetaBytesError {
21+
#[source]
22+
pub source: AnyError,
23+
}
24+
25+
impl MetaBytesError {
26+
pub fn new(error: &(impl std::error::Error + 'static)) -> Self {
27+
Self {
28+
source: AnyError::new(error),
29+
}
30+
}
31+
}
32+
33+
impl From<serde_json::Error> for MetaBytesError {
34+
fn from(e: serde_json::Error) -> Self {
35+
Self::new(&e)
36+
}
37+
}
38+
39+
impl From<std::string::FromUtf8Error> for MetaBytesError {
40+
fn from(e: std::string::FromUtf8Error) -> Self {
41+
Self::new(&e)
42+
}
43+
}
44+
45+
impl From<prost::EncodeError> for MetaBytesError {
46+
fn from(e: prost::EncodeError) -> Self {
47+
Self::new(&e)
48+
}
49+
}
50+
51+
impl From<prost::DecodeError> for MetaBytesError {
52+
fn from(e: prost::DecodeError) -> Self {
53+
Self::new(&e)
54+
}
55+
}

src/meta/types/src/meta_errors.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use serde::Serialize;
1818
use thiserror::Error;
1919

2020
use crate::app_error::AppError;
21+
use crate::MetaBytesError;
2122
use crate::MetaNetworkError;
2223
use crate::MetaRaftError;
2324
use crate::MetaStorageError;
@@ -46,13 +47,8 @@ pub enum MetaError {
4647
#[error("{0}")]
4748
MetaServiceError(String),
4849

49-
/// type to represent serialize/deserialize errors
5050
#[error(transparent)]
51-
SerdeError(AnyError),
52-
53-
/// Error when encoding auth
54-
#[error(transparent)]
55-
EncodeError(AnyError),
51+
BytesError(#[from] MetaBytesError),
5652

5753
#[error(transparent)]
5854
AppError(#[from] AppError),

0 commit comments

Comments
 (0)