Skip to content

Commit 57a4594

Browse files
author
Anton Melnikov
committed
feat(coio): mpsc
1 parent d23045c commit 57a4594

File tree

3 files changed

+87
-2
lines changed

3 files changed

+87
-2
lines changed

src/coio.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use failure::_core::ptr::null_mut;
1616

1717
use crate::error::{Error, TarantoolError};
1818
use crate::ffi::tarantool as ffi;
19-
use crate::fiber::unpack_callback;
19+
use crate::fiber::{unpack_callback, Cond};
20+
use std::cell::RefCell;
21+
use std::collections::VecDeque;
22+
use std::rc::Rc;
2023

2124
const TIMEOUT_INFINITY: f64 = 365.0 * 86400.0 * 100.0;
2225

@@ -286,3 +289,62 @@ pub(crate) fn write(fd: RawFd, buf: &[u8], timeout: Option<Duration>) -> Result<
286289
Ok(result as usize)
287290
}
288291
}
292+
293+
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
294+
let chan = Rc::new(Chan {
295+
buffer: RefCell::new(VecDeque::with_capacity(capacity)),
296+
cond: Cond::new(),
297+
});
298+
299+
(Sender(chan.clone()), Receiver(chan))
300+
}
301+
302+
pub struct Sender<T>(Rc<Chan<T>>);
303+
304+
impl<T> Sender<T> {
305+
pub fn send(&self, value: T) {
306+
self.0.send(value)
307+
}
308+
}
309+
310+
impl<T> Clone for Sender<T> {
311+
fn clone(&self) -> Self {
312+
Sender(self.0.clone())
313+
}
314+
}
315+
316+
pub struct Receiver<T>(Rc<Chan<T>>);
317+
318+
impl<T> Receiver<T> {
319+
pub fn recv(&self) -> T {
320+
self.0.recv()
321+
}
322+
}
323+
324+
struct Chan<T> {
325+
buffer: RefCell<VecDeque<T>>,
326+
cond: Cond,
327+
}
328+
329+
impl<T> Chan<T> {
330+
fn send(&self, value: T) {
331+
let was_empty = {
332+
let mut buffer = self.buffer.borrow_mut();
333+
let was_empty = buffer.len() == 0;
334+
buffer.push_back(value);
335+
was_empty
336+
};
337+
338+
if was_empty {
339+
self.cond.signal();
340+
}
341+
}
342+
343+
fn recv(&self) -> T {
344+
while self.buffer.borrow().len() == 0 {
345+
self.cond.wait();
346+
}
347+
348+
self.buffer.borrow_mut().pop_front().unwrap()
349+
}
350+
}

tests/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ fn run_tests(cfg: TestConfig) -> Result<bool, io::Error> {
224224
test_coio::test_coio_accept,
225225
test_coio::test_coio_read_write,
226226
test_coio::test_coio_call,
227+
test_coio::test_channel,
227228
test_transaction::test_transaction_commit,
228229
test_transaction::test_transaction_rollback,
229230
test_log::test_log,

tests/src/test_coio.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::net::{TcpListener, TcpStream};
44
use std::os::unix::io::{AsRawFd, FromRawFd};
55
use std::os::unix::net::UnixStream;
66

7-
use tarantool::coio::{coio_call, CoIOListener, CoIOStream};
7+
use tarantool::coio::{channel, coio_call, CoIOListener, CoIOStream, Receiver, Sender};
88
use tarantool::fiber::{sleep, Fiber};
99

1010
pub fn test_coio_accept() {
@@ -59,3 +59,25 @@ pub fn test_coio_call() {
5959
);
6060
assert_eq!(res, 100)
6161
}
62+
63+
pub fn test_channel() {
64+
let (tx, rx) = channel::<i32>(1);
65+
66+
let mut fiber_a = Fiber::new("test_fiber_a", &mut |tx: Box<Sender<i32>>| {
67+
tx.send(99);
68+
0
69+
});
70+
fiber_a.set_joinable(true);
71+
fiber_a.start(tx);
72+
73+
let mut fiber_b = Fiber::new("test_fiber_b", &mut |rx: Box<Receiver<i32>>| {
74+
let value = rx.recv();
75+
assert_eq!(value, 99);
76+
0
77+
});
78+
fiber_b.set_joinable(true);
79+
fiber_b.start(rx);
80+
81+
fiber_a.join();
82+
fiber_b.join();
83+
}

0 commit comments

Comments
 (0)