Skip to content

Commit c63c9e6

Browse files
author
Anton Melnikov
committed
feat(raft): rpc handler
1 parent 466ab29 commit c63c9e6

File tree

2 files changed

+48
-17
lines changed

2 files changed

+48
-17
lines changed

examples/cluster_node/src/lib.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::os::raw::c_int;
66
use std::rc::{Rc, Weak};
77

88
use tarantool::raft::Node;
9-
use tarantool::tuple::{FunctionArgs, FunctionCtx};
9+
use tarantool::tuple::{FunctionArgs, FunctionCtx, Tuple};
1010

1111
#[derive(Default)]
1212
struct Global {
@@ -21,15 +21,12 @@ lazy_static! {
2121
}
2222

2323
#[no_mangle]
24-
pub extern "C" fn run_node(_: FunctionCtx, _: FunctionArgs) -> c_int {
25-
let node = Rc::new(
26-
Node::new(
27-
"libcluster_node.rpc",
28-
vec!["127.0.0.1:3301", "127.0.0.1:3302", "127.0.0.1:3303"],
29-
Default::default(),
30-
)
31-
.unwrap(),
32-
);
24+
pub extern "C" fn run_node(_: FunctionCtx, args: FunctionArgs) -> c_int {
25+
let args: Tuple = args.into();
26+
let (bootstrap_addrs,) = args.into_struct::<(Vec<String>,)>().unwrap();
27+
28+
let node =
29+
Rc::new(Node::new("libcluster_node.rpc", bootstrap_addrs, Default::default()).unwrap());
3330
GLOBAL.node.replace(Rc::downgrade(&node));
3431
node.run().unwrap();
3532
0

src/raft/mod.rs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use rand::random;
1010
use inner::{NodeAction, NodeInner};
1111
use net::{get_local_addrs, ConnectionPool};
1212

13-
use crate::error::Error;
13+
use crate::error::{Error, TarantoolErrorCode};
14+
use crate::fiber::Cond;
1415
use crate::net_box::{Conn, ConnOptions, Options};
15-
use crate::tuple::{FunctionArgs, FunctionCtx};
16+
use crate::raft::inner::NodeEvent;
17+
use crate::tuple::{FunctionArgs, FunctionCtx, Tuple};
1618

1719
mod fsm;
1820
pub mod inner;
@@ -24,6 +26,9 @@ pub struct Node {
2426
inner: RefCell<NodeInner>,
2527
connections: RefCell<ConnectionPool>,
2628
rpc_function: String,
29+
events_cond: Cond,
30+
events_buffer: RefCell<VecDeque<NodeEvent>>,
31+
actions_buffer: RefCell<VecDeque<NodeAction>>,
2732
options: NodeOptions,
2833
}
2934

@@ -67,17 +72,26 @@ impl Node {
6772
inner: RefCell::new(NodeInner::new(id, local_addrs, bootstrap_addrs_cfg)),
6873
connections: RefCell::new(ConnectionPool::new(options.connection_options.clone())),
6974
rpc_function: rpc_function.to_string(),
75+
events_cond: Cond::new(),
76+
events_buffer: RefCell::new(VecDeque::with_capacity(options.recv_queue_size)),
77+
actions_buffer: RefCell::new(VecDeque::with_capacity(options.send_queue_size)),
7078
options,
7179
})
7280
}
7381

7482
pub fn run(&self) -> Result<(), Error> {
75-
let mut events = VecDeque::new();
76-
let mut actions = VecDeque::new();
7783
loop {
78-
self.inner.borrow_mut().update(&mut events, &mut actions);
79-
for action in actions.drain(..) {
84+
{
85+
let mut actions = self.actions_buffer.borrow_mut();
86+
let mut events = self.events_buffer.borrow_mut();
87+
self.inner.borrow_mut().update(&mut events, &mut actions);
88+
}
89+
90+
for action in self.actions_buffer.borrow_mut().drain(..) {
8091
match action {
92+
NodeAction::Connect(id, addrs) => {
93+
self.connections.borrow_mut().connect(id, &addrs[..])?;
94+
}
8195
NodeAction::Request(to, msg) => {
8296
let mut conn_pool = self.connections.borrow_mut();
8397
self.send(conn_pool.get(&to).unwrap(), rpc::Request::Bootstrap(msg))?;
@@ -89,11 +103,31 @@ impl Node {
89103
_ => {}
90104
};
91105
}
106+
107+
self.events_cond.wait();
92108
}
93109
}
94110

95111
pub fn handle_rpc(&self, ctx: FunctionCtx, args: FunctionArgs) -> i32 {
96-
unimplemented!();
112+
let args: Tuple = args.into();
113+
114+
match args.into_struct::<rpc::Request>() {
115+
Err(e) => set_error!(TarantoolErrorCode::Protocol, "{}", e),
116+
Ok(request) => {
117+
match request {
118+
rpc::Request::Bootstrap(msg) => {
119+
self.events_buffer
120+
.borrow_mut()
121+
.push_back(NodeEvent::Request(msg));
122+
self.events_cond.wait();
123+
}
124+
_ => unimplemented!(),
125+
};
126+
127+
ctx.return_mp(&rpc::Response::Ack)
128+
.unwrap_or_else(|e| set_error!(TarantoolErrorCode::ProcC, "{}", e))
129+
}
130+
}
97131
}
98132

99133
pub fn wait_ready(&self, timeout: Duration) -> Result<(), Error> {

0 commit comments

Comments
 (0)