Skip to content

Commit e5f0c46

Browse files
author
Anton Melnikov
committed
feat(raft): readiness waiting + termination
1 parent c63c9e6 commit e5f0c46

File tree

3 files changed

+47
-22
lines changed

3 files changed

+47
-22
lines changed

src/raft/inner.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,19 @@ use crate::raft::net::ConnectionId;
66
use crate::raft::rpc;
77

88
pub struct NodeInner {
9-
state: State,
9+
state: NodeState,
1010
local_id: u64,
1111
peers: BTreeMap<u64, Vec<SocketAddr>>,
1212
responded_ids: HashSet<u64>,
1313
bootstrap_addrs: Vec<Vec<SocketAddr>>,
1414
}
1515

16-
#[derive(Debug, Copy, Clone)]
17-
enum State {
16+
#[derive(Debug, Copy, Clone, PartialEq)]
17+
pub enum NodeState {
1818
Init,
1919
ColdBootstrap,
2020
WarmBootstrap,
21+
Ready,
2122
Offline,
2223
Done,
2324
}
@@ -27,6 +28,7 @@ pub enum NodeEvent {
2728
Request(rpc::BootstrapMsg),
2829
Response(rpc::BootstrapMsg),
2930
Timeout,
31+
Stop,
3032
}
3133

3234
#[derive(Debug)]
@@ -35,7 +37,7 @@ pub enum NodeAction {
3537
UpgradeSeed(ConnectionId, u64),
3638
Request(ConnectionId, rpc::BootstrapMsg),
3739
Response(Result<rpc::Response, Error>),
38-
Completed,
40+
StateChangeNotification(NodeState),
3941
}
4042

4143
impl NodeInner {
@@ -48,7 +50,7 @@ impl NodeInner {
4850
peers.insert(local_id, local_addrs);
4951

5052
NodeInner {
51-
state: State::Init,
53+
state: NodeState::Init,
5254
local_id,
5355
peers,
5456
responded_ids: Default::default(),
@@ -57,7 +59,7 @@ impl NodeInner {
5759
}
5860

5961
pub fn update(&mut self, events: &mut VecDeque<NodeEvent>, actions: &mut VecDeque<NodeAction>) {
60-
if let State::Init = self.state {
62+
if let NodeState::Init = self.state {
6163
self.init(actions);
6264
}
6365

@@ -66,19 +68,23 @@ impl NodeInner {
6668
}
6769
}
6870

71+
pub fn state(&self) -> &NodeState {
72+
&self.state
73+
}
74+
6975
fn init(&mut self, actions_buf: &mut VecDeque<NodeAction>) {
7076
for (id, seed_addrs) in self.bootstrap_addrs.clone().into_iter().enumerate() {
7177
let id = ConnectionId::Seed(id);
7278
actions_buf.push_back(NodeAction::Connect(id.clone(), seed_addrs));
7379
self.send_bootstrap_request(id, actions_buf);
7480
}
7581

76-
self.state = State::ColdBootstrap;
82+
self.state = NodeState::ColdBootstrap;
7783
}
7884

7985
fn handle_event(&mut self, event: NodeEvent, actions_buf: &mut VecDeque<NodeAction>) {
8086
use NodeEvent as E;
81-
use State as S;
87+
use NodeState as S;
8288

8389
let new_state = match (self.state, event) {
8490
(S::ColdBootstrap, E::Request(req))
@@ -93,19 +99,22 @@ impl NodeInner {
9399
let num_peers = self.peers.len();
94100
let num_responded = self.responded_ids.len();
95101
if num_peers == (num_responded + 1) {
96-
actions_buf.push_back(NodeAction::Completed);
97-
Some(S::Done)
102+
Some(S::Ready)
98103
} else {
99104
None
100105
}
101106
}
102107
(S::ColdBootstrap, E::Timeout) => Some(S::Offline),
103108
(S::Offline, E::Timeout) => None,
109+
(_, E::Stop) => Some(S::Done),
104110
_ => panic!("invalid state"),
105111
};
106112

107113
if let Some(new_state) = new_state {
108-
self.state = new_state;
114+
if self.state != new_state {
115+
actions_buf.push_back(NodeAction::StateChangeNotification(new_state.clone()));
116+
self.state = new_state;
117+
}
109118
}
110119
}
111120

src/raft/mod.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22

33
use std::cell::RefCell;
44
use std::collections::VecDeque;
5+
use std::io;
56
use std::net::ToSocketAddrs;
67
use std::time::Duration;
78

89
use rand::random;
910

10-
use inner::{NodeAction, NodeInner};
11+
use inner::{NodeAction, NodeInner, NodeState};
1112
use net::{get_local_addrs, ConnectionPool};
1213

1314
use crate::error::{Error, TarantoolErrorCode};
@@ -29,6 +30,7 @@ pub struct Node {
2930
events_cond: Cond,
3031
events_buffer: RefCell<VecDeque<NodeEvent>>,
3132
actions_buffer: RefCell<VecDeque<NodeAction>>,
33+
ready_cond: Cond,
3234
options: NodeOptions,
3335
}
3436

@@ -75,6 +77,7 @@ impl Node {
7577
events_cond: Cond::new(),
7678
events_buffer: RefCell::new(VecDeque::with_capacity(options.recv_queue_size)),
7779
actions_buffer: RefCell::new(VecDeque::with_capacity(options.send_queue_size)),
80+
ready_cond: Cond::new(),
7881
options,
7982
})
8083
}
@@ -97,9 +100,13 @@ impl Node {
97100
self.send(conn_pool.get(&to).unwrap(), rpc::Request::Bootstrap(msg))?;
98101
}
99102
NodeAction::Response(_) => {}
100-
NodeAction::Completed => {
101-
return Ok(());
102-
}
103+
NodeAction::StateChangeNotification(state) => match state {
104+
NodeState::Ready => {
105+
self.ready_cond.signal();
106+
}
107+
NodeState::Done => return Ok(()),
108+
_ => {}
109+
},
103110
_ => {}
104111
};
105112
}
@@ -119,7 +126,7 @@ impl Node {
119126
self.events_buffer
120127
.borrow_mut()
121128
.push_back(NodeEvent::Request(msg));
122-
self.events_cond.wait();
129+
self.events_cond.signal();
123130
}
124131
_ => unimplemented!(),
125132
};
@@ -131,11 +138,17 @@ impl Node {
131138
}
132139

133140
pub fn wait_ready(&self, timeout: Duration) -> Result<(), Error> {
134-
unimplemented!();
141+
if self.inner.borrow().state() != &NodeState::Ready {
142+
if !self.ready_cond.wait_timeout(timeout) {
143+
return Err(Error::IO(io::ErrorKind::TimedOut.into()));
144+
}
145+
}
146+
Ok(())
135147
}
136148

137149
pub fn close(&self) {
138-
unimplemented!();
150+
self.events_buffer.borrow_mut().push_back(NodeEvent::Stop);
151+
self.events_cond.signal();
139152
}
140153

141154
fn send(&self, conn: &Conn, request: rpc::Request) -> Result<Option<rpc::Response>, Error> {

tests/src/test_raft.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::VecDeque;
22

3-
use tarantool::raft::inner::{NodeAction, NodeEvent, NodeInner};
3+
use tarantool::raft::inner::{NodeAction, NodeEvent, NodeInner, NodeState};
44
use tarantool::raft::net::ConnectionId;
55
use tarantool::raft::rpc;
66

@@ -34,7 +34,7 @@ pub fn test_bootstrap_solo() {
3434
}));
3535
node.update(&mut events, &mut actions);
3636

37-
assert_eq!(actions.len(), 2);
37+
assert_eq!(actions.len(), 3);
3838
assert!(matches!(
3939
&actions[0],
4040
NodeAction::Connect(ConnectionId::Peer(2), addrs) if addrs == &remote_addrs
@@ -58,7 +58,10 @@ pub fn test_bootstrap_solo() {
5858
node.update(&mut events, &mut actions);
5959

6060
assert_eq!(actions.len(), 1);
61-
assert!(matches!(&actions[0], NodeAction::Completed))
61+
assert!(matches!(
62+
&actions[0],
63+
NodeAction::StateChangeNotification(NodeState::Ready)
64+
))
6265
}
6366

6467
pub fn test_bootstrap_2n() {
@@ -91,7 +94,7 @@ pub fn test_bootstrap_2n() {
9194
fn communicate(from: &mut VecDeque<NodeAction>, to: &mut VecDeque<NodeEvent>) -> bool {
9295
let mut is_completed = false;
9396
for action in from.drain(..) {
94-
if let NodeAction::Completed = action {
97+
if let NodeAction::StateChangeNotification(NodeState::Ready) = action {
9598
is_completed = true;
9699
}
97100

0 commit comments

Comments
 (0)