Skip to content

Commit be4aeff

Browse files
committed
Merge branch 'feature/raft-bootstrap-part5' into 'master'
Feature/raft bootstrap part5 See merge request picodata/brod/tarantool-module!41
2 parents 466ab29 + e5f0c46 commit be4aeff

File tree

4 files changed

+94
-38
lines changed

4 files changed

+94
-38
lines changed

examples/cluster_node/src/lib.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::os::raw::c_int;
66
use std::rc::{Rc, Weak};
77

88
use tarantool::raft::Node;
9-
use tarantool::tuple::{FunctionArgs, FunctionCtx};
9+
use tarantool::tuple::{FunctionArgs, FunctionCtx, Tuple};
1010

1111
#[derive(Default)]
1212
struct Global {
@@ -21,15 +21,12 @@ lazy_static! {
2121
}
2222

2323
#[no_mangle]
24-
pub extern "C" fn run_node(_: FunctionCtx, _: FunctionArgs) -> c_int {
25-
let node = Rc::new(
26-
Node::new(
27-
"libcluster_node.rpc",
28-
vec!["127.0.0.1:3301", "127.0.0.1:3302", "127.0.0.1:3303"],
29-
Default::default(),
30-
)
31-
.unwrap(),
32-
);
24+
pub extern "C" fn run_node(_: FunctionCtx, args: FunctionArgs) -> c_int {
25+
let args: Tuple = args.into();
26+
let (bootstrap_addrs,) = args.into_struct::<(Vec<String>,)>().unwrap();
27+
28+
let node =
29+
Rc::new(Node::new("libcluster_node.rpc", bootstrap_addrs, Default::default()).unwrap());
3330
GLOBAL.node.replace(Rc::downgrade(&node));
3431
node.run().unwrap();
3532
0

src/raft/inner.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,19 @@ use crate::raft::net::ConnectionId;
66
use crate::raft::rpc;
77

88
pub struct NodeInner {
9-
state: State,
9+
state: NodeState,
1010
local_id: u64,
1111
peers: BTreeMap<u64, Vec<SocketAddr>>,
1212
responded_ids: HashSet<u64>,
1313
bootstrap_addrs: Vec<Vec<SocketAddr>>,
1414
}
1515

16-
#[derive(Debug, Copy, Clone)]
17-
enum State {
16+
#[derive(Debug, Copy, Clone, PartialEq)]
17+
pub enum NodeState {
1818
Init,
1919
ColdBootstrap,
2020
WarmBootstrap,
21+
Ready,
2122
Offline,
2223
Done,
2324
}
@@ -27,6 +28,7 @@ pub enum NodeEvent {
2728
Request(rpc::BootstrapMsg),
2829
Response(rpc::BootstrapMsg),
2930
Timeout,
31+
Stop,
3032
}
3133

3234
#[derive(Debug)]
@@ -35,7 +37,7 @@ pub enum NodeAction {
3537
UpgradeSeed(ConnectionId, u64),
3638
Request(ConnectionId, rpc::BootstrapMsg),
3739
Response(Result<rpc::Response, Error>),
38-
Completed,
40+
StateChangeNotification(NodeState),
3941
}
4042

4143
impl NodeInner {
@@ -48,7 +50,7 @@ impl NodeInner {
4850
peers.insert(local_id, local_addrs);
4951

5052
NodeInner {
51-
state: State::Init,
53+
state: NodeState::Init,
5254
local_id,
5355
peers,
5456
responded_ids: Default::default(),
@@ -57,7 +59,7 @@ impl NodeInner {
5759
}
5860

5961
pub fn update(&mut self, events: &mut VecDeque<NodeEvent>, actions: &mut VecDeque<NodeAction>) {
60-
if let State::Init = self.state {
62+
if let NodeState::Init = self.state {
6163
self.init(actions);
6264
}
6365

@@ -66,19 +68,23 @@ impl NodeInner {
6668
}
6769
}
6870

71+
pub fn state(&self) -> &NodeState {
72+
&self.state
73+
}
74+
6975
fn init(&mut self, actions_buf: &mut VecDeque<NodeAction>) {
7076
for (id, seed_addrs) in self.bootstrap_addrs.clone().into_iter().enumerate() {
7177
let id = ConnectionId::Seed(id);
7278
actions_buf.push_back(NodeAction::Connect(id.clone(), seed_addrs));
7379
self.send_bootstrap_request(id, actions_buf);
7480
}
7581

76-
self.state = State::ColdBootstrap;
82+
self.state = NodeState::ColdBootstrap;
7783
}
7884

7985
fn handle_event(&mut self, event: NodeEvent, actions_buf: &mut VecDeque<NodeAction>) {
8086
use NodeEvent as E;
81-
use State as S;
87+
use NodeState as S;
8288

8389
let new_state = match (self.state, event) {
8490
(S::ColdBootstrap, E::Request(req))
@@ -93,19 +99,22 @@ impl NodeInner {
9399
let num_peers = self.peers.len();
94100
let num_responded = self.responded_ids.len();
95101
if num_peers == (num_responded + 1) {
96-
actions_buf.push_back(NodeAction::Completed);
97-
Some(S::Done)
102+
Some(S::Ready)
98103
} else {
99104
None
100105
}
101106
}
102107
(S::ColdBootstrap, E::Timeout) => Some(S::Offline),
103108
(S::Offline, E::Timeout) => None,
109+
(_, E::Stop) => Some(S::Done),
104110
_ => panic!("invalid state"),
105111
};
106112

107113
if let Some(new_state) = new_state {
108-
self.state = new_state;
114+
if self.state != new_state {
115+
actions_buf.push_back(NodeAction::StateChangeNotification(new_state.clone()));
116+
self.state = new_state;
117+
}
109118
}
110119
}
111120

src/raft/mod.rs

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,20 @@
22

33
use std::cell::RefCell;
44
use std::collections::VecDeque;
5+
use std::io;
56
use std::net::ToSocketAddrs;
67
use std::time::Duration;
78

89
use rand::random;
910

10-
use inner::{NodeAction, NodeInner};
11+
use inner::{NodeAction, NodeInner, NodeState};
1112
use net::{get_local_addrs, ConnectionPool};
1213

13-
use crate::error::Error;
14+
use crate::error::{Error, TarantoolErrorCode};
15+
use crate::fiber::Cond;
1416
use crate::net_box::{Conn, ConnOptions, Options};
15-
use crate::tuple::{FunctionArgs, FunctionCtx};
17+
use crate::raft::inner::NodeEvent;
18+
use crate::tuple::{FunctionArgs, FunctionCtx, Tuple};
1619

1720
mod fsm;
1821
pub mod inner;
@@ -24,6 +27,10 @@ pub struct Node {
2427
inner: RefCell<NodeInner>,
2528
connections: RefCell<ConnectionPool>,
2629
rpc_function: String,
30+
events_cond: Cond,
31+
events_buffer: RefCell<VecDeque<NodeEvent>>,
32+
actions_buffer: RefCell<VecDeque<NodeAction>>,
33+
ready_cond: Cond,
2734
options: NodeOptions,
2835
}
2936

@@ -67,41 +74,81 @@ impl Node {
6774
inner: RefCell::new(NodeInner::new(id, local_addrs, bootstrap_addrs_cfg)),
6875
connections: RefCell::new(ConnectionPool::new(options.connection_options.clone())),
6976
rpc_function: rpc_function.to_string(),
77+
events_cond: Cond::new(),
78+
events_buffer: RefCell::new(VecDeque::with_capacity(options.recv_queue_size)),
79+
actions_buffer: RefCell::new(VecDeque::with_capacity(options.send_queue_size)),
80+
ready_cond: Cond::new(),
7081
options,
7182
})
7283
}
7384

7485
pub fn run(&self) -> Result<(), Error> {
75-
let mut events = VecDeque::new();
76-
let mut actions = VecDeque::new();
7786
loop {
78-
self.inner.borrow_mut().update(&mut events, &mut actions);
79-
for action in actions.drain(..) {
87+
{
88+
let mut actions = self.actions_buffer.borrow_mut();
89+
let mut events = self.events_buffer.borrow_mut();
90+
self.inner.borrow_mut().update(&mut events, &mut actions);
91+
}
92+
93+
for action in self.actions_buffer.borrow_mut().drain(..) {
8094
match action {
95+
NodeAction::Connect(id, addrs) => {
96+
self.connections.borrow_mut().connect(id, &addrs[..])?;
97+
}
8198
NodeAction::Request(to, msg) => {
8299
let mut conn_pool = self.connections.borrow_mut();
83100
self.send(conn_pool.get(&to).unwrap(), rpc::Request::Bootstrap(msg))?;
84101
}
85102
NodeAction::Response(_) => {}
86-
NodeAction::Completed => {
87-
return Ok(());
88-
}
103+
NodeAction::StateChangeNotification(state) => match state {
104+
NodeState::Ready => {
105+
self.ready_cond.signal();
106+
}
107+
NodeState::Done => return Ok(()),
108+
_ => {}
109+
},
89110
_ => {}
90111
};
91112
}
113+
114+
self.events_cond.wait();
92115
}
93116
}
94117

95118
pub fn handle_rpc(&self, ctx: FunctionCtx, args: FunctionArgs) -> i32 {
96-
unimplemented!();
119+
let args: Tuple = args.into();
120+
121+
match args.into_struct::<rpc::Request>() {
122+
Err(e) => set_error!(TarantoolErrorCode::Protocol, "{}", e),
123+
Ok(request) => {
124+
match request {
125+
rpc::Request::Bootstrap(msg) => {
126+
self.events_buffer
127+
.borrow_mut()
128+
.push_back(NodeEvent::Request(msg));
129+
self.events_cond.signal();
130+
}
131+
_ => unimplemented!(),
132+
};
133+
134+
ctx.return_mp(&rpc::Response::Ack)
135+
.unwrap_or_else(|e| set_error!(TarantoolErrorCode::ProcC, "{}", e))
136+
}
137+
}
97138
}
98139

99140
pub fn wait_ready(&self, timeout: Duration) -> Result<(), Error> {
100-
unimplemented!();
141+
if self.inner.borrow().state() != &NodeState::Ready {
142+
if !self.ready_cond.wait_timeout(timeout) {
143+
return Err(Error::IO(io::ErrorKind::TimedOut.into()));
144+
}
145+
}
146+
Ok(())
101147
}
102148

103149
pub fn close(&self) {
104-
unimplemented!();
150+
self.events_buffer.borrow_mut().push_back(NodeEvent::Stop);
151+
self.events_cond.signal();
105152
}
106153

107154
fn send(&self, conn: &Conn, request: rpc::Request) -> Result<Option<rpc::Response>, Error> {

tests/src/test_raft.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::VecDeque;
22

3-
use tarantool::raft::inner::{NodeAction, NodeEvent, NodeInner};
3+
use tarantool::raft::inner::{NodeAction, NodeEvent, NodeInner, NodeState};
44
use tarantool::raft::net::ConnectionId;
55
use tarantool::raft::rpc;
66

@@ -34,7 +34,7 @@ pub fn test_bootstrap_solo() {
3434
}));
3535
node.update(&mut events, &mut actions);
3636

37-
assert_eq!(actions.len(), 2);
37+
assert_eq!(actions.len(), 3);
3838
assert!(matches!(
3939
&actions[0],
4040
NodeAction::Connect(ConnectionId::Peer(2), addrs) if addrs == &remote_addrs
@@ -58,7 +58,10 @@ pub fn test_bootstrap_solo() {
5858
node.update(&mut events, &mut actions);
5959

6060
assert_eq!(actions.len(), 1);
61-
assert!(matches!(&actions[0], NodeAction::Completed))
61+
assert!(matches!(
62+
&actions[0],
63+
NodeAction::StateChangeNotification(NodeState::Ready)
64+
))
6265
}
6366

6467
pub fn test_bootstrap_2n() {
@@ -91,7 +94,7 @@ pub fn test_bootstrap_2n() {
9194
fn communicate(from: &mut VecDeque<NodeAction>, to: &mut VecDeque<NodeEvent>) -> bool {
9295
let mut is_completed = false;
9396
for action in from.drain(..) {
94-
if let NodeAction::Completed = action {
97+
if let NodeAction::StateChangeNotification(NodeState::Ready) = action {
9598
is_completed = true;
9699
}
97100

0 commit comments

Comments
 (0)