12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: collections:: BTreeSet ;
16
-
17
15
use common_meta_api:: KVApi ;
18
- use common_meta_sled_store:: openraft:: error:: ChangeMembershipError ;
19
- use common_meta_sled_store:: openraft:: error:: ClientWriteError ;
20
- use common_meta_sled_store:: openraft:: error:: InProgress ;
21
16
use common_meta_types:: AppliedState ;
22
17
use common_meta_types:: Cmd ;
23
18
use common_meta_types:: ForwardRequest ;
24
19
use common_meta_types:: ForwardResponse ;
25
- use common_meta_types:: ForwardToLeader ;
26
20
use common_meta_types:: LogEntry ;
27
21
use common_meta_types:: MetaError ;
28
- use common_meta_types:: MetaRaftError ;
29
22
use common_meta_types:: Node ;
30
- use common_meta_types:: NodeId ;
23
+ use common_meta_types:: RaftChangeMembershipError ;
24
+ use common_meta_types:: RaftWriteError ;
31
25
use common_meta_types:: SeqV ;
26
+ use common_metrics:: counter:: WithCount ;
32
27
use tracing:: debug;
33
28
use tracing:: info;
34
29
35
30
use crate :: meta_service:: ForwardRequestBody ;
36
31
use crate :: meta_service:: JoinRequest ;
37
32
use crate :: meta_service:: LeaveRequest ;
38
33
use crate :: meta_service:: MetaNode ;
34
+ use crate :: metrics:: ProposalPending ;
39
35
40
36
/// The container of APIs of a metasrv leader in a metasrv cluster.
41
37
///
@@ -98,7 +94,7 @@ impl<'a> MetaLeader<'a> {
98
94
///
99
95
/// If the node is already in cluster membership, it still returns Ok.
100
96
#[ tracing:: instrument( level = "debug" , skip( self ) ) ]
101
- pub async fn join ( & self , req : JoinRequest ) -> Result < ( ) , MetaError > {
97
+ pub async fn join ( & self , req : JoinRequest ) -> Result < ( ) , RaftChangeMembershipError > {
102
98
let node_id = req. node_id ;
103
99
let endpoint = req. endpoint ;
104
100
let grpc_api_addr = req. grpc_api_addr ;
@@ -109,35 +105,28 @@ impl<'a> MetaLeader<'a> {
109
105
return Ok ( ( ) ) ;
110
106
}
111
107
112
- // deal with joint config,If the cluster is in joint config,
113
- // we need to return an Inprogress error with membership log id.
114
- match membership. get_ith_config ( 1 ) {
115
- Some ( _membership) => Err ( MetaRaftError :: ChangeMembershipError (
116
- ChangeMembershipError :: InProgress ( InProgress {
117
- membership_log_id : metrics. membership_config . log_id ,
118
- } ) ,
119
- )
120
- . into ( ) ) ,
121
- None => {
122
- // safe unwrap: if the first config is None, panic is the expected behavior here.
123
- let mut membership = membership. get_ith_config ( 0 ) . unwrap ( ) . clone ( ) ;
124
- membership. insert ( node_id) ;
125
- let ent = LogEntry {
126
- txid : None ,
127
- time_ms : None ,
128
- cmd : Cmd :: AddNode {
129
- node_id,
130
- node : Node {
131
- name : node_id. to_string ( ) ,
132
- endpoint,
133
- grpc_api_addr : Some ( grpc_api_addr) ,
134
- } ,
135
- } ,
136
- } ;
137
- self . write ( ent) . await ?;
138
- self . change_membership ( membership) . await
139
- }
140
- }
108
+ // safe unwrap: if the first config is None, panic is the expected behavior here.
109
+ let mut membership = membership. get_ith_config ( 0 ) . unwrap ( ) . clone ( ) ;
110
+ membership. insert ( node_id) ;
111
+ let ent = LogEntry {
112
+ txid : None ,
113
+ time_ms : None ,
114
+ cmd : Cmd :: AddNode {
115
+ node_id,
116
+ node : Node {
117
+ name : node_id. to_string ( ) ,
118
+ endpoint,
119
+ grpc_api_addr : Some ( grpc_api_addr) ,
120
+ } ,
121
+ } ,
122
+ } ;
123
+ self . write ( ent) . await ?;
124
+
125
+ self . meta_node
126
+ . raft
127
+ . change_membership ( membership, true )
128
+ . await ?;
129
+ Ok ( ( ) )
141
130
}
142
131
143
132
/// A node leave the cluster.
@@ -147,7 +136,7 @@ impl<'a> MetaLeader<'a> {
147
136
///
148
137
/// If the node is not in cluster membership, it still returns Ok.
149
138
#[ tracing:: instrument( level = "debug" , skip( self ) ) ]
150
- pub async fn leave ( & self , req : LeaveRequest ) -> Result < ( ) , MetaError > {
139
+ pub async fn leave ( & self , req : LeaveRequest ) -> Result < ( ) , RaftChangeMembershipError > {
151
140
let node_id = req. node_id ;
152
141
let metrics = self . meta_node . raft . metrics ( ) . borrow ( ) . clone ( ) ;
153
142
let membership = metrics. membership_config . membership . clone ( ) ;
@@ -156,93 +145,42 @@ impl<'a> MetaLeader<'a> {
156
145
return Ok ( ( ) ) ;
157
146
}
158
147
159
- // deal with joint config,If the cluster is in joint config,
160
- // we need to return an Inprogress error with membership log id.
161
- match membership. get_ith_config ( 1 ) {
162
- Some ( _membership) => Err ( MetaRaftError :: ChangeMembershipError (
163
- ChangeMembershipError :: InProgress ( InProgress {
164
- membership_log_id : metrics. membership_config . log_id ,
165
- } ) ,
166
- )
167
- . into ( ) ) ,
168
- None => {
169
- // safe unwrap: if the first config is None, panic is the expected behavior here.
170
- let mut membership = membership. get_ith_config ( 0 ) . unwrap ( ) . clone ( ) ;
171
- membership. remove ( & node_id) ;
172
-
173
- self . change_membership ( membership) . await ?;
174
- let ent = LogEntry {
175
- txid : None ,
176
- time_ms : None ,
177
- cmd : Cmd :: RemoveNode { node_id } ,
178
- } ;
179
- self . write ( ent) . await ?;
180
- Ok ( ( ) )
181
- }
182
- }
183
- }
148
+ // safe unwrap: if the first config is None, panic is the expected behavior here.
149
+ let mut membership = membership. get_ith_config ( 0 ) . unwrap ( ) . clone ( ) ;
150
+ membership. remove ( & node_id) ;
184
151
185
- #[ tracing:: instrument( level = "debug" , skip( self ) ) ]
186
- pub async fn change_membership ( & self , membership : BTreeSet < NodeId > ) -> Result < ( ) , MetaError > {
187
- let res = self
188
- . meta_node
152
+ self . meta_node
189
153
. raft
190
154
. change_membership ( membership, true )
191
- . await ;
155
+ . await ? ;
192
156
193
- let err = match res {
194
- Ok ( _) => return Ok ( ( ) ) ,
195
- Err ( e) => e,
157
+ let ent = LogEntry {
158
+ txid : None ,
159
+ time_ms : None ,
160
+ cmd : Cmd :: RemoveNode { node_id } ,
196
161
} ;
197
-
198
- match err {
199
- ClientWriteError :: ChangeMembershipError ( e) => {
200
- Err ( MetaRaftError :: ChangeMembershipError ( e) . into ( ) )
201
- }
202
- // TODO(xp): enable MetaNode::RaftError when RaftError impl Serialized
203
- ClientWriteError :: Fatal ( fatal) => Err ( MetaRaftError :: RaftFatal ( fatal) . into ( ) ) ,
204
- ClientWriteError :: ForwardToLeader ( to_leader) => {
205
- Err ( MetaRaftError :: ForwardToLeader ( ForwardToLeader {
206
- leader_id : to_leader. leader_id ,
207
- } )
208
- . into ( ) )
209
- }
210
- }
162
+ self . write ( ent) . await ?;
163
+ Ok ( ( ) )
211
164
}
212
165
213
166
/// Write a log through local raft node and return the states before and after applying the log.
214
167
///
215
168
/// If the raft node is not a leader, it returns MetaRaftError::ForwardToLeader.
216
169
#[ tracing:: instrument( level = "debug" , skip( self , entry) ) ]
217
- pub async fn write ( & self , mut entry : LogEntry ) -> Result < AppliedState , MetaError > {
170
+ pub async fn write ( & self , mut entry : LogEntry ) -> Result < AppliedState , RaftWriteError > {
218
171
// Add consistent clock time to log entry.
219
172
entry. time_ms = Some ( SeqV :: < ( ) > :: now_ms ( ) ) ;
220
173
174
+ // report metrics
175
+ let _guard = WithCount :: new ( ( ) , ProposalPending ) ;
176
+
221
177
info ! ( "write LogEntry: {:?}" , entry) ;
222
178
let write_rst = self . meta_node . raft . client_write ( entry) . await ;
223
-
224
179
info ! ( "raft.client_write rst: {:?}" , write_rst) ;
225
180
226
181
match write_rst {
227
- Ok ( resp) => {
228
- let data = resp. data ;
229
- Ok ( data)
230
- }
231
-
232
- Err ( cli_write_err) => match cli_write_err {
233
- // fatal error
234
- ClientWriteError :: Fatal ( fatal) => Err ( MetaRaftError :: RaftFatal ( fatal) . into ( ) ) ,
235
- // retryable error
236
- ClientWriteError :: ForwardToLeader ( to_leader) => {
237
- Err ( MetaRaftError :: ForwardToLeader ( ForwardToLeader {
238
- leader_id : to_leader. leader_id ,
239
- } )
240
- . into ( ) )
241
- }
242
- ClientWriteError :: ChangeMembershipError ( _) => {
243
- unreachable ! ( "there should not be a ChangeMembershipError for client_write" )
244
- }
245
- } ,
182
+ Ok ( resp) => Ok ( resp. data ) ,
183
+ Err ( cli_write_err) => Err ( RaftWriteError :: from_raft_err ( cli_write_err) ) ,
246
184
}
247
185
}
248
186
}
0 commit comments