Skip to content

Commit e352ac1

Browse files
committed
Merge branch 'feature/mpsc' into 'master'
MPSC (channels) See merge request picodata/brod/tarantool-module!39
2 parents d23045c + 87772a9 commit e352ac1

File tree

3 files changed

+145
-5
lines changed

3 files changed

+145
-5
lines changed

src/coio.rs

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,24 @@
22
//!
33
//! See also:
44
//! - [C API reference: Module coio](https://www.tarantool.io/en/doc/latest/dev_guide/reference_capi/coio/)
5+
use std::cell::{Cell, RefCell};
6+
use std::collections::VecDeque;
57
use std::convert::TryFrom;
68
use std::ffi::c_void;
7-
use std::io;
8-
use std::io::{Read, Write};
9+
use std::io::{self, Read, Write};
910
use std::mem::forget;
1011
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
1112
use std::os::raw::c_char;
1213
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
14+
use std::rc::Rc;
1315
use std::time::Duration;
1416

1517
use failure::_core::ptr::null_mut;
18+
use num_traits::Zero;
1619

1720
use crate::error::{Error, TarantoolError};
1821
use crate::ffi::tarantool as ffi;
19-
use crate::fiber::unpack_callback;
22+
use crate::fiber::{unpack_callback, Cond};
2023

2124
const TIMEOUT_INFINITY: f64 = 365.0 * 86400.0 * 100.0;
2225

@@ -286,3 +289,91 @@ pub(crate) fn write(fd: RawFd, buf: &[u8], timeout: Option<Duration>) -> Result<
286289
Ok(result as usize)
287290
}
288291
}
292+
293+
/// Creates a new asynchronous channel, returning the sender/receiver halves.
294+
///
295+
/// All data sent on the Sender will become available on the [Receiver] in the same order as it was sent,
296+
/// and no `send` will block the calling fiber, `recv` will block until a message is available.
297+
pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
298+
let chan = Rc::new(Chan {
299+
buffer: RefCell::new(VecDeque::with_capacity(capacity)),
300+
cond: Cond::new(),
301+
tx_count: Cell::new(1),
302+
rx_is_active: Cell::new(true),
303+
});
304+
305+
(Sender(chan.clone()), Receiver(chan))
306+
}
307+
308+
/// The sending-half of channel.
309+
///
310+
/// Messages can be sent through this channel with `send`. Can be cloned.
311+
pub struct Sender<T>(Rc<Chan<T>>);
312+
313+
impl<T> Sender<T> {
314+
/// Attempts to send a value on this channel, returning it back if it could not be sent.
315+
/// This method will never block.
316+
pub fn send(&self, value: T) -> Result<(), io::Error> {
317+
if !self.0.rx_is_active.get() {
318+
return Err(io::ErrorKind::NotConnected.into());
319+
}
320+
321+
let was_empty = {
322+
let mut buffer = self.0.buffer.borrow_mut();
323+
let was_empty = buffer.len() == 0;
324+
buffer.push_back(value);
325+
was_empty
326+
};
327+
328+
if was_empty {
329+
self.0.cond.signal();
330+
}
331+
332+
Ok(())
333+
}
334+
}
335+
336+
impl<T> Clone for Sender<T> {
337+
fn clone(&self) -> Self {
338+
self.0.tx_count.set(self.0.tx_count.get() + 1);
339+
Sender(self.0.clone())
340+
}
341+
}
342+
343+
impl<T> Drop for Sender<T> {
344+
fn drop(&mut self) {
345+
self.0.tx_count.set(self.0.tx_count.get() - 1);
346+
self.0.cond.signal();
347+
}
348+
}
349+
350+
/// The receiving half of channel.
351+
pub struct Receiver<T>(Rc<Chan<T>>);
352+
353+
impl<T> Receiver<T> {
354+
/// Attempts to wait for a value on this receiver, returning `None` if the corresponding channel has hung up.
355+
pub fn recv(&self) -> Option<T> {
356+
if self.0.buffer.borrow().len() == 0 {
357+
if self.0.tx_count.get().is_zero() {
358+
return None;
359+
}
360+
361+
self.0.cond.wait();
362+
}
363+
364+
self.0.buffer.borrow_mut().pop_front()
365+
}
366+
}
367+
368+
impl<T> Drop for Receiver<T> {
369+
fn drop(&mut self) {
370+
self.0.rx_is_active.set(false);
371+
}
372+
}
373+
374+
struct Chan<T> {
375+
buffer: RefCell<VecDeque<T>>,
376+
cond: Cond,
377+
tx_count: Cell<usize>,
378+
rx_is_active: Cell<bool>,
379+
}

tests/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,9 @@ fn run_tests(cfg: TestConfig) -> Result<bool, io::Error> {
224224
test_coio::test_coio_accept,
225225
test_coio::test_coio_read_write,
226226
test_coio::test_coio_call,
227+
test_coio::test_channel,
228+
test_coio::test_channel_rx_closed,
229+
test_coio::test_channel_tx_closed,
227230
test_transaction::test_transaction_commit,
228231
test_transaction::test_transaction_rollback,
229232
test_log::test_log,

tests/src/test_coio.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::convert::TryInto;
2-
use std::io::{Read, Write};
2+
use std::io::{self, Read, Write};
33
use std::net::{TcpListener, TcpStream};
44
use std::os::unix::io::{AsRawFd, FromRawFd};
55
use std::os::unix::net::UnixStream;
66

7-
use tarantool::coio::{coio_call, CoIOListener, CoIOStream};
7+
use tarantool::coio::{channel, coio_call, CoIOListener, CoIOStream, Receiver, Sender};
88
use tarantool::fiber::{sleep, Fiber};
99

1010
pub fn test_coio_accept() {
@@ -59,3 +59,49 @@ pub fn test_coio_call() {
5959
);
6060
assert_eq!(res, 100)
6161
}
62+
63+
pub fn test_channel() {
64+
let (tx, rx) = channel::<i32>(1);
65+
66+
let mut fiber_a = Fiber::new("test_fiber_a", &mut |tx: Box<Sender<i32>>| {
67+
tx.send(99).unwrap();
68+
0
69+
});
70+
fiber_a.set_joinable(true);
71+
fiber_a.start(tx);
72+
73+
let mut fiber_b = Fiber::new("test_fiber_b", &mut |rx: Box<Receiver<i32>>| {
74+
let value = rx.recv().unwrap();
75+
assert_eq!(value, 99);
76+
0
77+
});
78+
fiber_b.set_joinable(true);
79+
fiber_b.start(rx);
80+
81+
fiber_a.join();
82+
fiber_b.join();
83+
}
84+
85+
pub fn test_channel_rx_closed() {
86+
let (tx, _) = channel::<i32>(1);
87+
88+
let mut fiber = Fiber::new("test_fiber", &mut |tx: Box<Sender<i32>>| {
89+
assert!(tx.send(99).is_err());
90+
0
91+
});
92+
fiber.set_joinable(true);
93+
fiber.start(tx);
94+
fiber.join();
95+
}
96+
97+
pub fn test_channel_tx_closed() {
98+
let (_, rx) = channel::<i32>(1);
99+
100+
let mut fiber = Fiber::new("test_fiber", &mut |rx: Box<Receiver<i32>>| {
101+
assert!(rx.recv().is_none());
102+
0
103+
});
104+
fiber.set_joinable(true);
105+
fiber.start(rx);
106+
fiber.join();
107+
}

0 commit comments

Comments
 (0)