Skip to content

Commit 16d8b84

Browse files
committed
refactor(meta-service): clean up MetaError
MetaError sub errors: Add `MetaStartupError` as a collection of errors raised when starting up meta-service. These errors do not have to be part of MetaError. Remove `MetaRaftError`. This error is not clear about when a error occurs. Raft related errors could occur locally or on a remote peer. Add `MetaAPIError`, which is the root error for a meta-service API. `MetaNetworkError`: add InvalidArgument and InvalidReply.
1 parent 02c3618 commit 16d8b84

File tree

19 files changed

+510
-209
lines changed

19 files changed

+510
-209
lines changed

src/meta/api/src/kv_api_utils.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@ use common_meta_types::app_error::UnknownTable;
3131
use common_meta_types::txn_condition::Target;
3232
use common_meta_types::txn_op::Request;
3333
use common_meta_types::ConditionResult;
34+
use common_meta_types::InvalidArgument;
35+
use common_meta_types::InvalidReply;
3436
use common_meta_types::MatchSeq;
3537
use common_meta_types::MetaBytesError;
3638
use common_meta_types::MetaError;
39+
use common_meta_types::MetaNetworkError;
3740
use common_meta_types::Operation;
3841
use common_meta_types::TxnCondition;
3942
use common_meta_types::TxnDeleteRequest;
@@ -108,7 +111,10 @@ pub async fn list_keys<K: KVApiKey>(
108111
let mut structured_keys = Vec::with_capacity(n);
109112

110113
for (str_key, _seq_id) in res.iter() {
111-
let struct_key = K::from_key(str_key).map_err(to_bytes_err)?;
114+
let struct_key = K::from_key(str_key).map_err(to_bytes_err).map_err(|e| {
115+
let inv = InvalidReply::new("fail to list_keys", &e);
116+
MetaNetworkError::InvalidReply(inv)
117+
})?;
112118
structured_keys.push(struct_key);
113119
}
114120

@@ -138,20 +144,29 @@ pub async fn list_u64_value<K: KVApiKey>(
138144
values.push(id);
139145

140146
// Parse key
141-
let struct_key = K::from_key(str_key).map_err(to_bytes_err)?;
147+
let struct_key = K::from_key(str_key).map_err(to_bytes_err).map_err(|e| {
148+
let inv = InvalidReply::new("list_u64_value", &e);
149+
MetaNetworkError::InvalidReply(inv)
150+
})?;
142151
structured_keys.push(struct_key);
143152
}
144153

145154
Ok((structured_keys, values))
146155
}
147156

148-
pub fn serialize_u64(value: impl Into<Id>) -> Result<Vec<u8>, MetaBytesError> {
149-
let v = serde_json::to_vec(&*value.into())?;
157+
pub fn serialize_u64(value: impl Into<Id>) -> Result<Vec<u8>, MetaNetworkError> {
158+
let v = serde_json::to_vec(&*value.into()).map_err(|e| {
159+
let inv = InvalidArgument::new(e, "");
160+
MetaNetworkError::InvalidArgument(inv)
161+
})?;
150162
Ok(v)
151163
}
152164

153-
pub fn deserialize_u64(v: &[u8]) -> Result<Id, MetaBytesError> {
154-
let id = serde_json::from_slice(v)?;
165+
pub fn deserialize_u64(v: &[u8]) -> Result<Id, MetaNetworkError> {
166+
let id = serde_json::from_slice(v).map_err(|e| {
167+
let inv = InvalidReply::new("", &e);
168+
MetaNetworkError::InvalidReply(inv)
169+
})?;
155170
Ok(Id::new(id))
156171
}
157172

@@ -174,24 +189,36 @@ pub async fn fetch_id<T: KVApiKey>(kv_api: &impl KVApi, generator: T) -> Result<
174189
Ok(seq_v.seq)
175190
}
176191

177-
pub fn serialize_struct<T>(value: &T) -> Result<Vec<u8>, MetaBytesError>
192+
pub fn serialize_struct<T>(value: &T) -> Result<Vec<u8>, MetaNetworkError>
178193
where
179194
T: FromToProto + 'static,
180195
T::PB: common_protos::prost::Message,
181196
{
182-
let p = value.to_pb().map_err(to_bytes_err)?;
197+
let p = value.to_pb().map_err(to_bytes_err).map_err(|e| {
198+
let inv = InvalidArgument::new(e, "");
199+
MetaNetworkError::InvalidArgument(inv)
200+
})?;
183201
let mut buf = vec![];
184-
common_protos::prost::Message::encode(&p, &mut buf)?;
202+
common_protos::prost::Message::encode(&p, &mut buf).map_err(|e| {
203+
let inv = InvalidArgument::new(e, "");
204+
MetaNetworkError::InvalidArgument(inv)
205+
})?;
185206
Ok(buf)
186207
}
187208

188-
pub fn deserialize_struct<T>(buf: &[u8]) -> Result<T, MetaBytesError>
209+
pub fn deserialize_struct<T>(buf: &[u8]) -> Result<T, MetaNetworkError>
189210
where
190211
T: FromToProto,
191212
T::PB: common_protos::prost::Message + Default,
192213
{
193-
let p: T::PB = common_protos::prost::Message::decode(buf)?;
194-
let v: T = FromToProto::from_pb(p).map_err(to_bytes_err)?;
214+
let p: T::PB = common_protos::prost::Message::decode(buf).map_err(|e| {
215+
let inv = InvalidReply::new("", &e);
216+
MetaNetworkError::InvalidReply(inv)
217+
})?;
218+
let v: T = FromToProto::from_pb(p).map_err(to_bytes_err).map_err(|e| {
219+
let inv = InvalidReply::new("", &e);
220+
MetaNetworkError::InvalidReply(inv)
221+
})?;
195222

196223
Ok(v)
197224
}

src/meta/client/src/grpc_client.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use common_meta_types::protobuf::RaftRequest;
5050
use common_meta_types::protobuf::WatchRequest;
5151
use common_meta_types::protobuf::WatchResponse;
5252
use common_meta_types::ConnectionError;
53+
use common_meta_types::InvalidArgument;
5354
use common_meta_types::MetaClientError;
5455
use common_meta_types::MetaError;
5556
use common_meta_types::MetaHandshakeError;
@@ -492,7 +493,7 @@ impl MetaGrpcClient {
492493
if is_last {
493494
// reach to last addr
494495
let cli_err = MetaClientError::HandshakeError(handshake_err);
495-
return Err(MetaError::MetaClientError(cli_err));
496+
return Err(MetaError::ClientError(cli_err));
496497
}
497498
continue;
498499
}
@@ -785,7 +786,9 @@ impl MetaGrpcClient {
785786

786787
debug!(req = debug(&act), "MetaGrpcClient::do_write request");
787788

788-
let req: Request<RaftRequest> = act.clone().try_into()?;
789+
let req: Request<RaftRequest> = act.clone().try_into().map_err(|e| {
790+
MetaNetworkError::InvalidArgument(InvalidArgument::new(e, "fail to encode request"))
791+
})?;
789792

790793
debug!(
791794
req = debug(&req),
@@ -802,7 +805,12 @@ impl MetaGrpcClient {
802805
if status_is_retryable(&s) {
803806
self.mark_as_unhealthy().await;
804807
let mut client = self.make_client().await?;
805-
let req: Request<RaftRequest> = act.try_into()?;
808+
let req: Request<RaftRequest> = act.try_into().map_err(|e| {
809+
MetaNetworkError::InvalidArgument(InvalidArgument::new(
810+
e,
811+
"fail to encode request",
812+
))
813+
})?;
806814
let req = common_tracing::inject_span_to_tonic_request(req);
807815
Ok(client.write_msg(req).await?.into_inner())
808816
} else {
@@ -829,7 +837,9 @@ impl MetaGrpcClient {
829837

830838
debug!(req = debug(&read_req), "MetaGrpcClient::do_read request");
831839

832-
let req: Request<RaftRequest> = read_req.clone().try_into()?;
840+
let req: Request<RaftRequest> = read_req.clone().try_into().map_err(|e| {
841+
MetaNetworkError::InvalidArgument(InvalidArgument::new(e, "fail to encode request"))
842+
})?;
833843

834844
debug!(
835845
req = debug(&req),
@@ -849,7 +859,12 @@ impl MetaGrpcClient {
849859
if status_is_retryable(&s) {
850860
self.mark_as_unhealthy().await;
851861
let mut client = self.make_client().await?;
852-
let req: Request<RaftRequest> = read_req.try_into()?;
862+
let req: Request<RaftRequest> = read_req.try_into().map_err(|e| {
863+
MetaNetworkError::InvalidArgument(InvalidArgument::new(
864+
e,
865+
"fail to encode request",
866+
))
867+
})?;
853868
let req = common_tracing::inject_span_to_tonic_request(req);
854869
Ok(client.read_msg(req).await?.into_inner())
855870
} else {

src/meta/raft-store/src/config.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ use std::net::Ipv4Addr;
1717
use common_exception::Result;
1818
use common_grpc::DNSResolver;
1919
use common_meta_types::Endpoint;
20-
use common_meta_types::MetaError;
21-
use common_meta_types::MetaResult;
20+
use common_meta_types::MetaStartupError;
2221
use common_meta_types::NodeId;
2322
use once_cell::sync::Lazy;
2423

@@ -195,7 +194,7 @@ impl RaftConfig {
195194
!self.no_sync
196195
}
197196

198-
pub fn check(&self) -> MetaResult<()> {
197+
pub fn check(&self) -> std::result::Result<(), MetaStartupError> {
199198
// If just leaving, does not need to check other config
200199
if !self.leave_via.is_empty() {
201200
return Ok(());
@@ -205,14 +204,14 @@ impl RaftConfig {
205204
// - both join and single is set
206205
// - neither join nor single is set
207206
if self.join.is_empty() != self.single {
208-
return Err(MetaError::InvalidConfig(String::from(
207+
return Err(MetaStartupError::InvalidConfig(String::from(
209208
"at least one of `single` and `join` needs to be enabled",
210209
)));
211210
}
212211

213212
let self_addr = self.raft_api_listen_host_string();
214213
if self.join.contains(&self_addr) {
215-
return Err(MetaError::InvalidConfig(String::from(
214+
return Err(MetaStartupError::InvalidConfig(String::from(
216215
"--join must not be set to itself",
217216
)));
218217
}

src/meta/raft-store/src/state/raft_state.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ use common_meta_sled_store::openraft;
1616
use common_meta_sled_store::sled;
1717
use common_meta_sled_store::AsKeySpace;
1818
use common_meta_sled_store::SledTree;
19-
use common_meta_types::MetaError;
20-
use common_meta_types::MetaResult;
19+
use common_meta_types::MetaStartupError;
20+
use common_meta_types::MetaStorageError;
2121
use common_meta_types::MetaStorageResult;
2222
use common_meta_types::NodeId;
2323
use openraft::storage::HardState;
@@ -64,7 +64,7 @@ impl RaftState {
6464
config: &RaftConfig,
6565
open: Option<()>,
6666
create: Option<()>,
67-
) -> MetaResult<RaftState> {
67+
) -> Result<RaftState, MetaStartupError> {
6868
info!(?config);
6969
info!("open: {:?}, create: {:?}", open, create);
7070

@@ -80,15 +80,15 @@ impl RaftState {
8080
match (open, create) {
8181
(Some(_), _) => (curr_id, true),
8282
(None, Some(_)) => {
83-
return Err(MetaError::MetaStoreAlreadyExists(curr_id));
83+
return Err(MetaStartupError::MetaStoreAlreadyExists(curr_id));
8484
}
8585
(None, None) => panic!("no open no create"),
8686
}
8787
} else {
8888
match (open, create) {
8989
(Some(_), Some(_)) => (config.id, false),
9090
(Some(_), None) => {
91-
return Err(MetaError::MetaStoreNotFound);
91+
return Err(MetaStartupError::MetaStoreNotFound);
9292
}
9393
(None, Some(_)) => (config.id, false),
9494
(None, None) => panic!("no open no create"),
@@ -105,7 +105,7 @@ impl RaftState {
105105
}
106106

107107
#[tracing::instrument(level = "debug", skip(self))]
108-
pub async fn set_node_id(&self, id: NodeId) -> MetaResult<()> {
108+
pub async fn set_node_id(&self, id: NodeId) -> Result<(), MetaStorageError> {
109109
let state = self.state();
110110
state
111111
.insert(&RaftStateKey::Id, &RaftStateValue::NodeId(id))
@@ -116,7 +116,7 @@ impl RaftState {
116116
/// Initialize a raft state. The only thing to do is to persist the node id
117117
/// so that next time opening it the caller knows it is initialized.
118118
#[tracing::instrument(level = "debug", skip(self))]
119-
async fn init(&self) -> MetaResult<()> {
119+
async fn init(&self) -> Result<(), MetaStorageError> {
120120
self.set_node_id(self.id).await
121121
}
122122

src/meta/raft-store/tests/it/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use common_meta_raft_store::config::RaftConfig;
16-
use common_meta_types::MetaError;
16+
use common_meta_types::MetaStartupError;
1717

1818
#[test]
1919
fn test_raft_config() -> anyhow::Result<()> {
@@ -27,7 +27,7 @@ fn test_raft_config() -> anyhow::Result<()> {
2727

2828
assert_eq!(
2929
r,
30-
Err(MetaError::InvalidConfig(String::from(
30+
Err(MetaStartupError::InvalidConfig(String::from(
3131
"at least one of `single` and `join` needs to be enabled",
3232
)))
3333
)
@@ -47,7 +47,7 @@ fn test_raft_config() -> anyhow::Result<()> {
4747

4848
assert_eq!(
4949
r,
50-
Err(MetaError::InvalidConfig(String::from(
50+
Err(MetaStartupError::InvalidConfig(String::from(
5151
"--join must not be set to itself",
5252
)))
5353
)

src/meta/service/src/configs/inner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
use common_meta_raft_store::config::RaftConfig;
16-
use common_meta_types::MetaResult;
16+
use common_meta_types::MetaStartupError;
1717
use common_meta_types::Node;
1818
use common_tracing::Config as LogConfig;
1919

@@ -65,7 +65,7 @@ impl Config {
6565
/// As requires by [RFC: Config Backward Compatibility](https://github.com/datafuselabs/databend/pull/5324), we will load user's config via wrapper [`OuterV0Config`] and then convert from [`OuterV0Config`] to [`Config`].
6666
///
6767
/// In the future, we could have `ConfigV1` and `ConfigV2`.
68-
pub fn load() -> MetaResult<Self> {
68+
pub fn load() -> Result<Self, MetaStartupError> {
6969
let cfg = OuterV0Config::load()?.into();
7070

7171
Ok(cfg)

src/meta/service/src/configs/outer_v0.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ use clap::Args;
1818
use clap::Parser;
1919
use common_meta_raft_store::config::get_default_raft_advertise_host;
2020
use common_meta_raft_store::config::RaftConfig as InnerRaftConfig;
21-
use common_meta_types::MetaError;
22-
use common_meta_types::MetaResult;
21+
use common_meta_types::MetaStartupError;
2322
use common_tracing::Config as InnerLogConfig;
2423
use common_tracing::FileConfig as InnerFileLogConfig;
2524
use common_tracing::StderrConfig as InnerStderrLogConfig;
@@ -164,7 +163,7 @@ impl Config {
164163
/// - Load from file as default.
165164
/// - Load from env, will override config from file.
166165
/// - Load from args as finally override
167-
pub fn load() -> MetaResult<Self> {
166+
pub fn load() -> Result<Self, MetaStartupError> {
168167
let arg_conf = Self::parse();
169168

170169
let mut builder: serfig::Builder<Self> = serfig::Builder::default();
@@ -186,15 +185,15 @@ impl Config {
186185
let cfg_via_env: ConfigViaEnv = serfig::Builder::default()
187186
.collect(from_env())
188187
.build()
189-
.map_err(|e| MetaError::InvalidConfig(e.to_string()))?;
188+
.map_err(|e| MetaStartupError::InvalidConfig(e.to_string()))?;
190189
builder = builder.collect(from_self(cfg_via_env.into()));
191190

192191
// Finally, load from args.
193192
builder = builder.collect(from_self(arg_conf));
194193

195194
builder
196195
.build()
197-
.map_err(|e| MetaError::InvalidConfig(e.to_string()))
196+
.map_err(|e| MetaStartupError::InvalidConfig(e.to_string()))
198197
}
199198
}
200199

src/meta/service/src/meta_service/meta_leader.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use common_meta_types::Cmd;
1818
use common_meta_types::ForwardRequest;
1919
use common_meta_types::ForwardResponse;
2020
use common_meta_types::LogEntry;
21-
use common_meta_types::MetaError;
21+
use common_meta_types::MetaDataReadError;
22+
use common_meta_types::MetaOperationError;
2223
use common_meta_types::Node;
2324
use common_meta_types::RaftChangeMembershipError;
2425
use common_meta_types::RaftWriteError;
@@ -50,7 +51,7 @@ impl<'a> MetaLeader<'a> {
5051
pub async fn handle_forwardable_req(
5152
&self,
5253
req: ForwardRequest,
53-
) -> Result<ForwardResponse, MetaError> {
54+
) -> Result<ForwardResponse, MetaOperationError> {
5455
debug!("handle_forwardable_req: {:?}", req);
5556

5657
match req.body {
@@ -71,17 +72,26 @@ impl<'a> MetaLeader<'a> {
7172

7273
ForwardRequestBody::GetKV(req) => {
7374
let sm = self.meta_node.get_state_machine().await;
74-
let res = sm.get_kv(&req.key).await?;
75+
let res = sm
76+
.get_kv(&req.key)
77+
.await
78+
.map_err(|meta_err| MetaDataReadError::new("get_kv", "", &meta_err))?;
7579
Ok(ForwardResponse::GetKV(res))
7680
}
7781
ForwardRequestBody::MGetKV(req) => {
7882
let sm = self.meta_node.get_state_machine().await;
79-
let res = sm.mget_kv(&req.keys).await?;
83+
let res = sm
84+
.mget_kv(&req.keys)
85+
.await
86+
.map_err(|meta_err| MetaDataReadError::new("mget_kv", "", &meta_err))?;
8087
Ok(ForwardResponse::MGetKV(res))
8188
}
8289
ForwardRequestBody::ListKV(req) => {
8390
let sm = self.meta_node.get_state_machine().await;
84-
let res = sm.prefix_list_kv(&req.prefix).await?;
91+
let res = sm
92+
.prefix_list_kv(&req.prefix)
93+
.await
94+
.map_err(|meta_err| MetaDataReadError::new("list_kv", "", &meta_err))?;
8595
Ok(ForwardResponse::ListKV(res))
8696
}
8797
}

0 commit comments

Comments
 (0)