Skip to content

Commit a06b538

Browse files
author
Anton Melnikov
committed
feat(raft): use external actions buffer
1 parent 46d4228 commit a06b538

File tree

1 file changed

+35
-40
lines changed

1 file changed

+35
-40
lines changed

src/raft/inner.rs

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ pub struct NodeInner {
1010
local_id: u64,
1111
peers: BTreeMap<u64, Vec<SocketAddr>>,
1212
responded_ids: HashSet<u64>,
13-
pending_actions_buffer: VecDeque<NodeAction>,
13+
bootstrap_addrs: Vec<Vec<SocketAddr>>,
1414
}
1515

1616
#[derive(Debug, Copy, Clone)]
1717
enum State {
18-
Cold,
19-
Warm,
18+
Init,
19+
ColdBootstrap,
20+
WarmBootstrap,
2021
Offline,
2122
Done,
2223
}
@@ -46,51 +47,59 @@ impl NodeInner {
4647
let mut peers = BTreeMap::new();
4748
peers.insert(local_id, local_addrs);
4849

49-
let mut node_inner = NodeInner {
50-
state: State::Cold,
50+
NodeInner {
51+
state: State::Init,
5152
local_id,
5253
peers,
5354
responded_ids: Default::default(),
54-
pending_actions_buffer: Default::default(),
55-
};
56-
node_inner.poll_seeds(bootstrap_addrs.into_iter());
57-
node_inner
55+
bootstrap_addrs,
56+
}
5857
}
5958

6059
pub fn update(&mut self, events: &mut VecDeque<NodeEvent>, actions: &mut VecDeque<NodeAction>) {
60+
if let State::Init = self.state {
61+
self.init(actions);
62+
}
63+
6164
while let Some(event) = events.pop_front() {
62-
self.handle_event(event);
65+
self.handle_event(event, actions);
6366
}
67+
}
6468

65-
for action in self.pending_actions_buffer.drain(..) {
66-
actions.push_back(action);
69+
fn init(&mut self, actions_buf: &mut VecDeque<NodeAction>) {
70+
for (id, seed_addrs) in self.bootstrap_addrs.clone().into_iter().enumerate() {
71+
let id = ConnectionId::Seed(id);
72+
actions_buf.push_back(NodeAction::Connect(id.clone(), seed_addrs));
73+
self.send_bootstrap_request(id, actions_buf);
6774
}
75+
76+
self.state = State::ColdBootstrap;
6877
}
6978

70-
fn handle_event(&mut self, event: NodeEvent) {
79+
fn handle_event(&mut self, event: NodeEvent, actions_buf: &mut VecDeque<NodeAction>) {
7180
use NodeEvent as E;
7281
use State as S;
7382

7483
let new_state = match (self.state, event) {
75-
(S::Cold, E::Request(req))
76-
| (S::Cold, E::Response(req))
84+
(S::ColdBootstrap, E::Request(req))
85+
| (S::ColdBootstrap, E::Response(req))
7786
| (S::Offline, E::Request(req)) => {
78-
self.handle_msg(req);
79-
Some(S::Warm)
87+
self.handle_msg(req, actions_buf);
88+
Some(S::WarmBootstrap)
8089
}
81-
(S::Warm, E::Request(req)) | (S::Warm, E::Response(req)) => {
82-
self.handle_msg(req);
90+
(S::WarmBootstrap, E::Request(req)) | (S::WarmBootstrap, E::Response(req)) => {
91+
self.handle_msg(req, actions_buf);
8392

8493
let num_peers = self.peers.len();
8594
let num_responded = self.responded_ids.len();
8695
if num_peers == (num_responded + 1) {
87-
self.send(NodeAction::Completed);
96+
actions_buf.push_back(NodeAction::Completed);
8897
Some(S::Done)
8998
} else {
9099
None
91100
}
92101
}
93-
(S::Cold, E::Timeout) => Some(S::Offline),
102+
(S::ColdBootstrap, E::Timeout) => Some(S::Offline),
94103
(S::Offline, E::Timeout) => None,
95104
_ => panic!("invalid state"),
96105
};
@@ -100,7 +109,7 @@ impl NodeInner {
100109
}
101110
}
102111

103-
fn handle_msg(&mut self, req: rpc::BootstrapMsg) {
112+
fn handle_msg(&mut self, req: rpc::BootstrapMsg, actions_buf: &mut VecDeque<NodeAction>) {
104113
if req.from_id == self.local_id {
105114
return;
106115
}
@@ -109,31 +118,22 @@ impl NodeInner {
109118
let new_nodes = self.merge_nodes_list(&req.nodes);
110119
for (id, addrs) in new_nodes {
111120
let id = ConnectionId::Peer(id);
112-
self.send(NodeAction::Connect(id.clone(), addrs));
113-
self.send_bootstrap_request(id);
121+
actions_buf.push_back(NodeAction::Connect(id.clone(), addrs));
122+
self.send_bootstrap_request(id, actions_buf);
114123
}
115124
self.responded_ids.insert(req.from_id);
116125
}
117126
}
118127

119128
#[inline]
120-
fn poll_seeds(&mut self, addrs: impl Iterator<Item = Vec<SocketAddr>>) {
121-
for (id, seed_addrs) in addrs.enumerate() {
122-
let id = ConnectionId::Seed(id);
123-
self.send(NodeAction::Connect(id.clone(), seed_addrs));
124-
self.send_bootstrap_request(id);
125-
}
126-
}
127-
128-
#[inline]
129-
fn send_bootstrap_request(&mut self, to: ConnectionId) {
129+
fn send_bootstrap_request(&mut self, to: ConnectionId, actions_buf: &mut VecDeque<NodeAction>) {
130130
let nodes = self
131131
.peers
132132
.iter()
133133
.map(|(id, addrs)| (*id, addrs.clone()))
134134
.collect();
135135

136-
self.send(NodeAction::Request(
136+
actions_buf.push_back(NodeAction::Request(
137137
to,
138138
rpc::BootstrapMsg {
139139
from_id: self.local_id,
@@ -142,11 +142,6 @@ impl NodeInner {
142142
));
143143
}
144144

145-
#[inline]
146-
fn send(&mut self, action: NodeAction) {
147-
self.pending_actions_buffer.push_back(action)
148-
}
149-
150145
/// Merges `other` nodes list to already known. Returns new nodes count
151146
fn merge_nodes_list(
152147
&mut self,

0 commit comments

Comments
 (0)