Skip to content

Commit 0826f5a

Browse files
author
Anton Melnikov
committed
feat(raft): refactor bootstrap fsm. Part 1
1 parent e352ac1 commit 0826f5a

File tree

5 files changed

+73
-179
lines changed

5 files changed

+73
-179
lines changed

src/raft/bootstrap.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::net::SocketAddr;
44

55
use crate::error::Error;
66

7+
use super::net::ConnectionId;
78
use super::rpc;
89

910
pub struct BoostrapController {
@@ -30,7 +31,9 @@ pub enum BootstrapEvent {
3031

3132
#[derive(Debug)]
3233
pub enum BootstrapAction {
33-
Request(rpc::BootstrapMsg, Vec<SocketAddr>),
34+
Connect(ConnectionId, Vec<SocketAddr>),
35+
UpgradeSeed(ConnectionId, u64),
36+
Request(ConnectionId, rpc::BootstrapMsg),
3437
Response(Result<rpc::Response, Error>),
3538
Completed,
3639
}
@@ -51,7 +54,7 @@ impl BoostrapController {
5154
responded_ids: Default::default(),
5255
pending_actions_buffer: Default::default(),
5356
};
54-
bootstrap_controller.broadcast(bootstrap_addrs.into_iter());
57+
bootstrap_controller.poll_seeds(bootstrap_addrs.into_iter());
5558
bootstrap_controller
5659
}
5760

@@ -103,38 +106,39 @@ impl BoostrapController {
103106
let mut responded_ids = self.responded_ids.borrow_mut();
104107
if !responded_ids.contains(&req.from_id) {
105108
let new_nodes = self.merge_nodes_list(&req.nodes);
106-
let new_nodes = new_nodes.iter().map(|(_, addrs)| addrs.clone());
107-
self.broadcast(new_nodes);
108-
109+
for (id, addrs) in new_nodes {
110+
let id = ConnectionId::Peer(id);
111+
self.send(BootstrapAction::Connect(id.clone(), addrs));
112+
self.send_bootstrap_request(id);
113+
}
109114
responded_ids.insert(req.from_id);
110115
}
111116
}
112117

113118
#[inline]
114-
fn broadcast(&self, addrs: impl Iterator<Item = Vec<SocketAddr>>) {
115-
for peer_addrs in addrs {
116-
self.send_request(peer_addrs.clone());
119+
fn poll_seeds(&self, addrs: impl Iterator<Item = Vec<SocketAddr>>) {
120+
for (id, seed_addrs) in addrs.enumerate() {
121+
let id = ConnectionId::Seed(id);
122+
self.send(BootstrapAction::Connect(id.clone(), seed_addrs));
123+
self.send_bootstrap_request(id);
117124
}
118125
}
119126

120127
#[inline]
121-
fn send_request(&self, to: Vec<SocketAddr>) {
128+
fn send_bootstrap_request(&self, to: ConnectionId) {
122129
let nodes = self
123130
.peers
124131
.borrow()
125132
.iter()
126-
.map(|(a, b)| (*a, b.clone()))
133+
.map(|(id, addrs)| (*id, addrs.clone()))
127134
.collect();
128135

129-
let peers = self.peers.borrow();
130-
let local_addrs = peers.get(&self.local_id).unwrap();
131136
self.send(BootstrapAction::Request(
137+
to,
132138
rpc::BootstrapMsg {
133139
from_id: self.local_id,
134-
from_addrs: local_addrs.clone(),
135140
nodes,
136141
},
137-
to,
138142
));
139143
}
140144

src/raft/mod.rs

Lines changed: 18 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,22 @@
11
#![cfg(feature = "raft_node")]
22

3-
use std::cell::{Cell, RefCell};
4-
use std::io;
3+
use std::cell::RefCell;
54
use std::net::ToSocketAddrs;
6-
use std::time::{Duration, Instant};
5+
use std::time::Duration;
76

8-
use protobuf::Message as _;
9-
use raft::prelude::Message;
107
use rand::random;
118

129
use bootstrap::{BoostrapController, BootstrapAction};
1310
use net::{get_local_addrs, ConnectionPoll};
1411

15-
use crate::error::{Error, TarantoolErrorCode};
16-
use crate::fiber::{Cond, Latch};
12+
use crate::error::Error;
1713
use crate::net_box::{Conn, ConnOptions, Options};
18-
use crate::tuple::{FunctionArgs, FunctionCtx, Tuple};
19-
20-
use self::inner::NodeInner;
14+
use crate::tuple::{FunctionArgs, FunctionCtx};
2115

2216
pub mod bootstrap;
2317
mod fsm;
2418
mod inner;
25-
mod net;
19+
pub mod net;
2620
pub mod rpc;
2721
mod storage;
2822

@@ -34,11 +28,7 @@ pub enum NodeState {
3428
}
3529

3630
pub struct Node {
37-
state: Cell<NodeState>,
38-
state_lock: Latch,
39-
state_cond: Cond,
4031
bootstrap_ctrl: BoostrapController,
41-
inner: RefCell<NodeInner>,
4232
connections: RefCell<ConnectionPoll>,
4333
rpc_function: String,
4434
options: NodeOptions,
@@ -81,111 +71,41 @@ impl Node {
8171
}
8272

8373
Ok(Node {
84-
state: Cell::new(NodeState::Bootstrapping),
85-
state_lock: Latch::new(),
86-
state_cond: Cond::new(),
8774
bootstrap_ctrl: BoostrapController::new(id, local_addrs, bootstrap_addrs_cfg),
88-
inner: RefCell::new(NodeInner::new(id, &options)?),
89-
connections: Default::default(),
75+
connections: RefCell::new(ConnectionPoll::new(options.connection_options.clone())),
9076
rpc_function: rpc_function.to_string(),
9177
options,
9278
})
9379
}
9480

9581
pub fn run(&self) -> Result<(), Error> {
9682
loop {
97-
let mut is_state_changed = false;
98-
{
99-
let _lock = self.state_lock.lock();
100-
let next_state = match self.state.get() {
101-
NodeState::Bootstrapping => {
102-
for action in self.bootstrap_ctrl.pending_actions() {
103-
match action {
104-
BootstrapAction::Request(msg, to) => {
105-
let mut conn_pool = self.connections.borrow_mut();
106-
self.send(
107-
conn_pool.connect_or_get(None, &to)?,
108-
rpc::Request::Bootstrap(msg),
109-
)?;
110-
}
111-
BootstrapAction::Response(_) => {}
112-
BootstrapAction::Completed => {}
113-
};
114-
}
115-
116-
Some(NodeState::Closed)
83+
for action in self.bootstrap_ctrl.pending_actions() {
84+
match action {
85+
BootstrapAction::Request(to, msg) => {
86+
let mut conn_pool = self.connections.borrow_mut();
87+
self.send(conn_pool.get(&to).unwrap(), rpc::Request::Bootstrap(msg))?;
11788
}
118-
NodeState::Active => {
119-
unimplemented!()
89+
BootstrapAction::Response(_) => {}
90+
BootstrapAction::Completed => {
91+
return Ok(());
12092
}
121-
NodeState::Closed => break,
93+
_ => {}
12294
};
123-
124-
if let Some(next_state) = next_state {
125-
self.state.replace(next_state);
126-
is_state_changed = true;
127-
}
128-
}
129-
130-
if is_state_changed {
131-
self.state_cond.broadcast();
13295
}
13396
}
134-
135-
Ok(())
13697
}
13798

13899
pub fn handle_rpc(&self, ctx: FunctionCtx, args: FunctionArgs) -> i32 {
139-
let args: Tuple = args.into();
140-
141-
match args.into_struct::<rpc::Request>() {
142-
Err(e) => set_error!(TarantoolErrorCode::Protocol, "{}", e),
143-
Ok(request) => {
144-
let response = match request {
145-
rpc::Request::Bootstrap(msg) => self.recv_bootstrap_request(msg),
146-
rpc::Request::Raft { data: msg_data } => {
147-
let mut msg = Message::default();
148-
match msg.merge_from_bytes(&msg_data) {
149-
Err(e) => {
150-
return set_error!(TarantoolErrorCode::Protocol, "{}", e);
151-
}
152-
Ok(()) => {
153-
let _lock = self.state_lock.lock();
154-
if let NodeState::Active = self.state.get() {
155-
self.inner.borrow().handle_msg(msg);
156-
}
157-
}
158-
}
159-
rpc::Response::Ack
160-
}
161-
_ => unimplemented!(),
162-
};
163-
164-
ctx.return_mp(&response)
165-
.unwrap_or_else(|e| set_error!(TarantoolErrorCode::ProcC, "{}", e))
166-
}
167-
}
100+
unimplemented!();
168101
}
169102

170103
pub fn wait_ready(&self, timeout: Duration) -> Result<(), Error> {
171-
let started_at = Instant::now();
172-
while self.state.get() != NodeState::Active {
173-
let is_timeout = !match timeout.checked_sub(started_at.elapsed()) {
174-
None => false,
175-
Some(timeout) => self.state_cond.wait_timeout(timeout),
176-
};
177-
178-
if is_timeout {
179-
return Err(io::Error::from(io::ErrorKind::TimedOut).into());
180-
}
181-
}
182-
183-
Ok(())
104+
unimplemented!();
184105
}
185106

186107
pub fn close(&self) {
187-
let _lock = self.state_lock.lock();
188-
self.state.replace(NodeState::Closed);
108+
unimplemented!();
189109
}
190110

191111
fn send(&self, conn: &Conn, request: rpc::Request) -> Result<Option<rpc::Response>, Error> {
@@ -207,12 +127,4 @@ impl Node {
207127
},
208128
}
209129
}
210-
211-
fn recv_bootstrap_request(&self, request: rpc::BootstrapMsg) -> rpc::Response {
212-
unimplemented!()
213-
}
214-
215-
fn send_raft_batch(&self, msgs: &mut dyn Iterator<Item = Message>) -> Result<(), Error> {
216-
unimplemented!()
217-
}
218130
}

src/raft/net.rs

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,56 +7,36 @@ use std::ptr::null_mut;
77
use ipnetwork::{Ipv4Network, Ipv6Network};
88

99
use crate::error::Error;
10-
11-
use super::{Conn, ConnOptions};
10+
use crate::net_box::{Conn, ConnOptions};
1211

1312
#[derive(Default)]
1413
pub struct ConnectionPoll {
15-
connections: Vec<Conn>,
16-
ids_index: HashMap<u64, usize>,
17-
addrs_index: HashMap<SocketAddr, usize>,
1814
options: ConnOptions,
15+
inner: HashMap<ConnectionId, Conn>,
1916
}
2017

21-
impl ConnectionPoll {
22-
pub fn connect_or_get(
23-
&mut self,
24-
id: Option<u64>,
25-
addrs: &Vec<SocketAddr>,
26-
) -> Result<&Conn, Error> {
27-
let inner_id = if let Some(id) = id {
28-
self.ids_index.get(&id)
29-
} else {
30-
let mut inner_id = None;
31-
for addr in addrs {
32-
inner_id = self.addrs_index.get(addr);
33-
if inner_id.is_some() {
34-
break;
35-
}
36-
}
37-
inner_id
38-
};
39-
40-
let inner_id = match inner_id {
41-
Some(inner_id) => *inner_id,
42-
None => {
43-
let inner_id = self.connections.len();
44-
self.connections
45-
.push(Conn::new(addrs.as_slice(), self.options.clone(), None)?);
46-
47-
if let Some(id) = id {
48-
self.ids_index.insert(id, inner_id);
49-
}
18+
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
19+
pub enum ConnectionId {
20+
Seed(usize),
21+
Peer(u64),
22+
}
5023

51-
for addr in addrs {
52-
self.addrs_index.insert(addr.clone(), inner_id);
53-
}
24+
impl ConnectionPoll {
25+
pub fn new(options: ConnOptions) -> Self {
26+
ConnectionPoll {
27+
options,
28+
inner: HashMap::new(),
29+
}
30+
}
5431

55-
inner_id
56-
}
57-
};
32+
pub fn connect(&mut self, id: ConnectionId, addrs: impl ToSocketAddrs) -> Result<(), Error> {
33+
let conn = Conn::new(addrs, self.options.clone(), None)?;
34+
self.inner.insert(id, conn);
35+
Ok(())
36+
}
5837

59-
Ok(self.connections.get(inner_id).unwrap())
38+
pub fn get(&mut self, id: &ConnectionId) -> Option<&mut Conn> {
39+
self.inner.get_mut(id)
6040
}
6141
}
6242

src/raft/rpc.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ impl AsTuple for Response {}
3434
#[derive(Debug, PartialEq, Serialize, Deserialize)]
3535
pub struct BootstrapMsg {
3636
pub from_id: u64,
37-
pub from_addrs: Vec<SocketAddr>,
3837
pub nodes: Vec<(u64, Vec<SocketAddr>)>,
3938
}
4039

0 commit comments

Comments
 (0)