Skip to content

Commit c857f6a

Browse files
author
Anton Melnikov
committed
feat(raft): mv interior mutability from NodeInner to Node
1 parent 61f49b2 commit c857f6a

File tree

3 files changed

+33
-38
lines changed

3 files changed

+33
-38
lines changed

src/raft/inner.rs

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::cell::{Cell, RefCell};
21
use std::collections::{BTreeMap, HashSet, VecDeque};
32
use std::net::SocketAddr;
43

@@ -7,11 +6,11 @@ use crate::raft::net::ConnectionId;
76
use crate::raft::rpc;
87

98
pub struct NodeInner {
10-
state: Cell<State>,
9+
state: State,
1110
local_id: u64,
12-
peers: RefCell<BTreeMap<u64, Vec<SocketAddr>>>,
13-
responded_ids: RefCell<HashSet<u64>>,
14-
pending_actions_buffer: RefCell<VecDeque<NodeAction>>,
11+
peers: BTreeMap<u64, Vec<SocketAddr>>,
12+
responded_ids: HashSet<u64>,
13+
pending_actions_buffer: VecDeque<NodeAction>,
1514
}
1615

1716
#[derive(Debug, Copy, Clone)]
@@ -46,29 +45,28 @@ impl NodeInner {
4645
let mut peers = BTreeMap::new();
4746
peers.insert(local_id, local_addrs);
4847

49-
let bootstrap_controller = NodeInner {
50-
state: Cell::new(State::Cold),
48+
let mut node_inner = NodeInner {
49+
state: State::Cold,
5150
local_id,
52-
peers: RefCell::new(peers),
51+
peers,
5352
responded_ids: Default::default(),
5453
pending_actions_buffer: Default::default(),
5554
};
56-
bootstrap_controller.poll_seeds(bootstrap_addrs.into_iter());
57-
bootstrap_controller
55+
node_inner.poll_seeds(bootstrap_addrs.into_iter());
56+
node_inner
5857
}
5958

60-
pub fn pending_actions(&self) -> Vec<NodeAction> {
59+
pub fn pending_actions(&mut self) -> Vec<NodeAction> {
6160
self.pending_actions_buffer
62-
.borrow_mut()
6361
.drain(..)
6462
.collect::<Vec<NodeAction>>()
6563
}
6664

67-
pub fn handle_event(&self, event: NodeEvent) {
65+
pub fn handle_event(&mut self, event: NodeEvent) {
6866
use NodeEvent as E;
6967
use State as S;
7068

71-
let new_state = match (self.state.get(), event) {
69+
let new_state = match (self.state, event) {
7270
(S::Cold, E::Request(req))
7371
| (S::Cold, E::Response(req))
7472
| (S::Offline, E::Request(req)) => {
@@ -78,8 +76,8 @@ impl NodeInner {
7876
(S::Warm, E::Request(req)) | (S::Warm, E::Response(req)) => {
7977
self.handle_msg(req);
8078

81-
let num_peers = self.peers.borrow().len();
82-
let num_responded = self.responded_ids.borrow().len();
79+
let num_peers = self.peers.len();
80+
let num_responded = self.responded_ids.len();
8381
if num_peers == (num_responded + 1) {
8482
self.send(NodeAction::Completed);
8583
Some(S::Done)
@@ -93,29 +91,28 @@ impl NodeInner {
9391
};
9492

9593
if let Some(new_state) = new_state {
96-
self.state.set(new_state);
94+
self.state = new_state;
9795
}
9896
}
9997

100-
fn handle_msg(&self, req: rpc::BootstrapMsg) {
98+
fn handle_msg(&mut self, req: rpc::BootstrapMsg) {
10199
if req.from_id == self.local_id {
102100
return;
103101
}
104102

105-
let mut responded_ids = self.responded_ids.borrow_mut();
106-
if !responded_ids.contains(&req.from_id) {
103+
if !self.responded_ids.contains(&req.from_id) {
107104
let new_nodes = self.merge_nodes_list(&req.nodes);
108105
for (id, addrs) in new_nodes {
109106
let id = ConnectionId::Peer(id);
110107
self.send(NodeAction::Connect(id.clone(), addrs));
111108
self.send_bootstrap_request(id);
112109
}
113-
responded_ids.insert(req.from_id);
110+
self.responded_ids.insert(req.from_id);
114111
}
115112
}
116113

117114
#[inline]
118-
fn poll_seeds(&self, addrs: impl Iterator<Item = Vec<SocketAddr>>) {
115+
fn poll_seeds(&mut self, addrs: impl Iterator<Item = Vec<SocketAddr>>) {
119116
for (id, seed_addrs) in addrs.enumerate() {
120117
let id = ConnectionId::Seed(id);
121118
self.send(NodeAction::Connect(id.clone(), seed_addrs));
@@ -124,10 +121,9 @@ impl NodeInner {
124121
}
125122

126123
#[inline]
127-
fn send_bootstrap_request(&self, to: ConnectionId) {
124+
fn send_bootstrap_request(&mut self, to: ConnectionId) {
128125
let nodes = self
129126
.peers
130-
.borrow()
131127
.iter()
132128
.map(|(id, addrs)| (*id, addrs.clone()))
133129
.collect();
@@ -142,21 +138,20 @@ impl NodeInner {
142138
}
143139

144140
#[inline]
145-
fn send(&self, action: NodeAction) {
146-
self.pending_actions_buffer.borrow_mut().push_back(action)
141+
fn send(&mut self, action: NodeAction) {
142+
self.pending_actions_buffer.push_back(action)
147143
}
148144

149145
/// Merges `other` nodes list to already known. Returns new nodes count
150146
fn merge_nodes_list(
151-
&self,
147+
&mut self,
152148
nodes_from: &Vec<(u64, Vec<SocketAddr>)>,
153149
) -> Vec<(u64, Vec<SocketAddr>)> {
154150
let mut new_nodes = Vec::<(u64, Vec<SocketAddr>)>::with_capacity(nodes_from.len());
155151
{
156-
let mut nodes_into = self.peers.borrow_mut();
157152
for (id, addrs) in nodes_from.into_iter() {
158-
if !nodes_into.contains_key(id) {
159-
nodes_into.insert(*id, addrs.clone());
153+
if !self.peers.contains_key(id) {
154+
self.peers.insert(*id, addrs.clone());
160155
new_nodes.push((*id, addrs.clone()));
161156
}
162157
}

src/raft/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub mod rpc;
2020
mod storage;
2121

2222
pub struct Node {
23-
inner: NodeInner,
23+
inner: RefCell<NodeInner>,
2424
connections: RefCell<ConnectionPool>,
2525
rpc_function: String,
2626
options: NodeOptions,
@@ -63,7 +63,7 @@ impl Node {
6363
}
6464

6565
Ok(Node {
66-
inner: NodeInner::new(id, local_addrs, bootstrap_addrs_cfg),
66+
inner: RefCell::new(NodeInner::new(id, local_addrs, bootstrap_addrs_cfg)),
6767
connections: RefCell::new(ConnectionPool::new(options.connection_options.clone())),
6868
rpc_function: rpc_function.to_string(),
6969
options,
@@ -72,7 +72,7 @@ impl Node {
7272

7373
pub fn run(&self) -> Result<(), Error> {
7474
loop {
75-
for action in self.inner.pending_actions() {
75+
for action in self.inner.borrow_mut().pending_actions() {
7676
match action {
7777
NodeAction::Request(to, msg) => {
7878
let mut conn_pool = self.connections.borrow_mut();

tests/src/test_raft.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ pub fn test_bootstrap_2n() {
6161
let mut n1_ctrl = NodeInner::new(1, n1_addrs.clone(), vec![n2_addrs.clone()]);
6262
let mut n2_ctrl = NodeInner::new(2, n2_addrs.clone(), vec![n1_addrs.clone()]);
6363

64-
assert_eq!(communicate(&n1_ctrl, &n2_ctrl), (false, false));
65-
assert_eq!(communicate(&n1_ctrl, &n2_ctrl), (false, false));
66-
assert_eq!(communicate(&n1_ctrl, &n2_ctrl), (true, true));
64+
assert_eq!(communicate(&mut n1_ctrl, &mut n2_ctrl), (false, false));
65+
assert_eq!(communicate(&mut n1_ctrl, &mut n2_ctrl), (false, false));
66+
assert_eq!(communicate(&mut n1_ctrl, &mut n2_ctrl), (true, true));
6767
}
6868

69-
fn communicate(n1_ctrl: &NodeInner, n2_ctrl: &NodeInner) -> (bool, bool) {
69+
fn communicate(n1_ctrl: &mut NodeInner, n2_ctrl: &mut NodeInner) -> (bool, bool) {
7070
let n1_actions = n1_ctrl.pending_actions();
7171
let n2_actions = n2_ctrl.pending_actions();
7272

@@ -89,7 +89,7 @@ fn communicate(n1_ctrl: &NodeInner, n2_ctrl: &NodeInner) -> (bool, bool) {
8989
(n1_is_completed, n2_is_completed)
9090
}
9191

92-
fn forward_action(action: NodeAction, node_ctrl: &NodeInner) {
92+
fn forward_action(action: NodeAction, node_ctrl: &mut NodeInner) {
9393
match action {
9494
NodeAction::Request(_, msg) => node_ctrl.handle_event(NodeEvent::Request(msg)),
9595
NodeAction::Response(resp) => match resp.unwrap() {

0 commit comments

Comments
 (0)