Skip to content

Commit 46d4228

Browse files
author
Anton Melnikov
committed
feat(raft): single entry point for inner (update method)
1 parent c857f6a commit 46d4228

File tree

3 files changed

+58
-36
lines changed

3 files changed

+58
-36
lines changed

src/raft/inner.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ enum State {
2121
Done,
2222
}
2323

24+
#[derive(Debug)]
2425
pub enum NodeEvent {
2526
Request(rpc::BootstrapMsg),
2627
Response(rpc::BootstrapMsg),
@@ -56,13 +57,17 @@ impl NodeInner {
5657
node_inner
5758
}
5859

59-
pub fn pending_actions(&mut self) -> Vec<NodeAction> {
60-
self.pending_actions_buffer
61-
.drain(..)
62-
.collect::<Vec<NodeAction>>()
60+
pub fn update(&mut self, events: &mut VecDeque<NodeEvent>, actions: &mut VecDeque<NodeAction>) {
61+
while let Some(event) = events.pop_front() {
62+
self.handle_event(event);
63+
}
64+
65+
for action in self.pending_actions_buffer.drain(..) {
66+
actions.push_back(action);
67+
}
6368
}
6469

65-
pub fn handle_event(&mut self, event: NodeEvent) {
70+
fn handle_event(&mut self, event: NodeEvent) {
6671
use NodeEvent as E;
6772
use State as S;
6873

src/raft/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![cfg(feature = "raft_node")]
22

33
use std::cell::RefCell;
4+
use std::collections::VecDeque;
45
use std::net::ToSocketAddrs;
56
use std::time::Duration;
67

@@ -71,8 +72,11 @@ impl Node {
7172
}
7273

7374
pub fn run(&self) -> Result<(), Error> {
75+
let mut events = VecDeque::new();
76+
let mut actions = VecDeque::new();
7477
loop {
75-
for action in self.inner.borrow_mut().pending_actions() {
78+
self.inner.borrow_mut().update(&mut events, &mut actions);
79+
for action in actions.drain(..) {
7680
match action {
7781
NodeAction::Request(to, msg) => {
7882
let mut conn_pool = self.connections.borrow_mut();

tests/src/test_raft.rs

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::VecDeque;
2+
13
use tarantool::raft::inner::{NodeAction, NodeEvent, NodeInner};
24
use tarantool::raft::net::ConnectionId;
35
use tarantool::raft::rpc;
@@ -6,9 +8,12 @@ pub fn test_bootstrap_solo() {
68
let local_addrs = vec!["127.0.0.1:3301".parse().unwrap()];
79
let remote_addrs = vec!["127.0.0.1:3302".parse().unwrap()];
810

11+
let mut events = VecDeque::new();
12+
let mut actions = VecDeque::new();
13+
914
let mut node = NodeInner::new(1, local_addrs.clone(), vec![remote_addrs.clone()]);
15+
node.update(&mut events, &mut actions);
1016

11-
let actions = node.pending_actions();
1217
assert_eq!(actions.len(), 2);
1318
assert!(matches!(
1419
&actions[0],
@@ -21,13 +26,14 @@ pub fn test_bootstrap_solo() {
2126
nodes: vec![(1, local_addrs.clone())],
2227
}
2328
));
29+
actions.clear();
2430

25-
node.handle_event(NodeEvent::Response(rpc::BootstrapMsg {
31+
events.push_back(NodeEvent::Response(rpc::BootstrapMsg {
2632
from_id: 2,
2733
nodes: vec![(2, remote_addrs.clone())],
2834
}));
35+
node.update(&mut events, &mut actions);
2936

30-
let actions = node.pending_actions();
3137
assert_eq!(actions.len(), 2);
3238
assert!(matches!(
3339
&actions[0],
@@ -43,13 +49,14 @@ pub fn test_bootstrap_solo() {
4349
],
4450
}
4551
));
52+
actions.clear();
4653

47-
node.handle_event(NodeEvent::Response(rpc::BootstrapMsg {
54+
events.push_back(NodeEvent::Response(rpc::BootstrapMsg {
4855
from_id: 2,
4956
nodes: vec![(1, local_addrs.clone()), (2, remote_addrs.clone())],
5057
}));
58+
node.update(&mut events, &mut actions);
5159

52-
let actions = node.pending_actions();
5360
assert_eq!(actions.len(), 1);
5461
assert!(matches!(&actions[0], NodeAction::Completed))
5562
}
@@ -58,44 +65,50 @@ pub fn test_bootstrap_2n() {
5865
let n1_addrs = vec!["127.0.0.1:3301".parse().unwrap()];
5966
let n2_addrs = vec!["127.0.0.1:3302".parse().unwrap()];
6067

68+
let mut n1_events = VecDeque::new();
69+
let mut n1_actions = VecDeque::new();
6170
let mut n1_ctrl = NodeInner::new(1, n1_addrs.clone(), vec![n2_addrs.clone()]);
71+
72+
let mut n2_events = VecDeque::new();
73+
let mut n2_actions = VecDeque::new();
6274
let mut n2_ctrl = NodeInner::new(2, n2_addrs.clone(), vec![n1_addrs.clone()]);
6375

64-
assert_eq!(communicate(&mut n1_ctrl, &mut n2_ctrl), (false, false));
65-
assert_eq!(communicate(&mut n1_ctrl, &mut n2_ctrl), (false, false));
66-
assert_eq!(communicate(&mut n1_ctrl, &mut n2_ctrl), (true, true));
67-
}
76+
let mut n1_is_completed = false;
77+
let mut n2_is_completed = false;
6878

69-
fn communicate(n1_ctrl: &mut NodeInner, n2_ctrl: &mut NodeInner) -> (bool, bool) {
70-
let n1_actions = n1_ctrl.pending_actions();
71-
let n2_actions = n2_ctrl.pending_actions();
79+
for _ in 0..3 {
80+
n1_ctrl.update(&mut n1_events, &mut n1_actions);
81+
n2_ctrl.update(&mut n2_events, &mut n2_actions);
7282

73-
let mut n1_is_completed = false;
74-
for action in n1_actions {
75-
if let NodeAction::Completed = action {
76-
n1_is_completed = true;
77-
}
78-
forward_action(action, n2_ctrl);
83+
n1_is_completed = n1_is_completed || communicate(&mut n1_actions, &mut n2_events);
84+
n2_is_completed = n2_is_completed || communicate(&mut n2_actions, &mut n1_events);
7985
}
8086

81-
let mut n2_is_completed = false;
82-
for action in n2_actions {
87+
assert!(n1_is_completed);
88+
assert!(n2_is_completed);
89+
}
90+
91+
fn communicate(from: &mut VecDeque<NodeAction>, to: &mut VecDeque<NodeEvent>) -> bool {
92+
let mut is_completed = false;
93+
for action in from.drain(..) {
8394
if let NodeAction::Completed = action {
84-
n2_is_completed = true;
95+
is_completed = true;
8596
}
86-
forward_action(action, n1_ctrl);
87-
}
8897

89-
(n1_is_completed, n2_is_completed)
98+
if let Some(event) = forward_action(action) {
99+
to.push_back(event);
100+
}
101+
}
102+
is_completed
90103
}
91104

92-
fn forward_action(action: NodeAction, node_ctrl: &mut NodeInner) {
105+
fn forward_action(action: NodeAction) -> Option<NodeEvent> {
93106
match action {
94-
NodeAction::Request(_, msg) => node_ctrl.handle_event(NodeEvent::Request(msg)),
107+
NodeAction::Request(_, msg) => Some(NodeEvent::Request(msg)),
95108
NodeAction::Response(resp) => match resp.unwrap() {
96-
rpc::Response::Bootstrap(msg) => node_ctrl.handle_event(NodeEvent::Response(msg)),
97-
_ => {}
109+
rpc::Response::Bootstrap(msg) => Some(NodeEvent::Response(msg)),
110+
_ => None,
98111
},
99-
_ => {}
100-
};
112+
_ => None,
113+
}
101114
}

0 commit comments

Comments
 (0)