Skip to content

Commit a4a883d

Browse files
author
Anton Melnikov
committed
feat(coio): mpsc with closable rx and tx
1 parent 57a4594 commit a4a883d

File tree

3 files changed

+79
-35
lines changed

3 files changed

+79
-35
lines changed

src/coio.rs

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,24 @@
22
//!
33
//! See also:
44
//! - [C API reference: Module coio](https://www.tarantool.io/en/doc/latest/dev_guide/reference_capi/coio/)
5+
use std::cell::{Cell, RefCell};
6+
use std::collections::VecDeque;
57
use std::convert::TryFrom;
68
use std::ffi::c_void;
7-
use std::io;
8-
use std::io::{Read, Write};
9+
use std::io::{self, Read, Write};
910
use std::mem::forget;
1011
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
1112
use std::os::raw::c_char;
1213
use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
14+
use std::rc::Rc;
1315
use std::time::Duration;
1416

1517
use failure::_core::ptr::null_mut;
18+
use num_traits::Zero;
1619

1720
use crate::error::{Error, TarantoolError};
1821
use crate::ffi::tarantool as ffi;
1922
use crate::fiber::{unpack_callback, Cond};
20-
use std::cell::RefCell;
21-
use std::collections::VecDeque;
22-
use std::rc::Rc;
2323

2424
const TIMEOUT_INFINITY: f64 = 365.0 * 86400.0 * 100.0;
2525

@@ -294,6 +294,8 @@ pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
294294
let chan = Rc::new(Chan {
295295
buffer: RefCell::new(VecDeque::with_capacity(capacity)),
296296
cond: Cond::new(),
297+
tx_count: Cell::new(1),
298+
rx_is_active: Cell::new(true),
297299
});
298300

299301
(Sender(chan.clone()), Receiver(chan))
@@ -302,49 +304,65 @@ pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
302304
pub struct Sender<T>(Rc<Chan<T>>);
303305

304306
impl<T> Sender<T> {
305-
pub fn send(&self, value: T) {
306-
self.0.send(value)
307+
pub fn send(&self, value: T) -> Result<(), io::Error> {
308+
if !self.0.rx_is_active.get() {
309+
return Err(io::ErrorKind::NotConnected.into());
310+
}
311+
312+
let was_empty = {
313+
let mut buffer = self.0.buffer.borrow_mut();
314+
let was_empty = buffer.len() == 0;
315+
buffer.push_back(value);
316+
was_empty
317+
};
318+
319+
if was_empty {
320+
self.0.cond.signal();
321+
}
322+
323+
Ok(())
307324
}
308325
}
309326

310327
impl<T> Clone for Sender<T> {
311328
fn clone(&self) -> Self {
329+
self.0.tx_count.set(self.0.tx_count.get() + 1);
312330
Sender(self.0.clone())
313331
}
314332
}
315333

316-
pub struct Receiver<T>(Rc<Chan<T>>);
317-
318-
impl<T> Receiver<T> {
319-
pub fn recv(&self) -> T {
320-
self.0.recv()
334+
impl<T> Drop for Sender<T> {
335+
fn drop(&mut self) {
336+
self.0.tx_count.set(self.0.tx_count.get() - 1);
337+
self.0.cond.signal();
321338
}
322339
}
323340

324-
struct Chan<T> {
325-
buffer: RefCell<VecDeque<T>>,
326-
cond: Cond,
327-
}
341+
pub struct Receiver<T>(Rc<Chan<T>>);
328342

329-
impl<T> Chan<T> {
330-
fn send(&self, value: T) {
331-
let was_empty = {
332-
let mut buffer = self.buffer.borrow_mut();
333-
let was_empty = buffer.len() == 0;
334-
buffer.push_back(value);
335-
was_empty
336-
};
343+
impl<T> Receiver<T> {
344+
pub fn recv(&self) -> Option<T> {
345+
if self.0.buffer.borrow().len() == 0 {
346+
if self.0.tx_count.get().is_zero() {
347+
return None;
348+
}
337349

338-
if was_empty {
339-
self.cond.signal();
350+
self.0.cond.wait();
340351
}
341-
}
342352

343-
fn recv(&self) -> T {
344-
while self.buffer.borrow().len() == 0 {
345-
self.cond.wait();
346-
}
353+
self.0.buffer.borrow_mut().pop_front()
354+
}
355+
}
347356

348-
self.buffer.borrow_mut().pop_front().unwrap()
357+
impl<T> Drop for Receiver<T> {
358+
fn drop(&mut self) {
359+
self.0.rx_is_active.set(false);
349360
}
350361
}
362+
363+
struct Chan<T> {
364+
buffer: RefCell<VecDeque<T>>,
365+
cond: Cond,
366+
tx_count: Cell<usize>,
367+
rx_is_active: Cell<bool>,
368+
}

tests/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ fn run_tests(cfg: TestConfig) -> Result<bool, io::Error> {
225225
test_coio::test_coio_read_write,
226226
test_coio::test_coio_call,
227227
test_coio::test_channel,
228+
test_coio::test_channel_rx_closed,
229+
test_coio::test_channel_tx_closed,
228230
test_transaction::test_transaction_commit,
229231
test_transaction::test_transaction_rollback,
230232
test_log::test_log,

tests/src/test_coio.rs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::convert::TryInto;
2-
use std::io::{Read, Write};
2+
use std::io::{self, Read, Write};
33
use std::net::{TcpListener, TcpStream};
44
use std::os::unix::io::{AsRawFd, FromRawFd};
55
use std::os::unix::net::UnixStream;
@@ -64,14 +64,14 @@ pub fn test_channel() {
6464
let (tx, rx) = channel::<i32>(1);
6565

6666
let mut fiber_a = Fiber::new("test_fiber_a", &mut |tx: Box<Sender<i32>>| {
67-
tx.send(99);
67+
tx.send(99).unwrap();
6868
0
6969
});
7070
fiber_a.set_joinable(true);
7171
fiber_a.start(tx);
7272

7373
let mut fiber_b = Fiber::new("test_fiber_b", &mut |rx: Box<Receiver<i32>>| {
74-
let value = rx.recv();
74+
let value = rx.recv().unwrap();
7575
assert_eq!(value, 99);
7676
0
7777
});
@@ -81,3 +81,27 @@ pub fn test_channel() {
8181
fiber_a.join();
8282
fiber_b.join();
8383
}
84+
85+
pub fn test_channel_rx_closed() {
86+
let (tx, _) = channel::<i32>(1);
87+
88+
let mut fiber = Fiber::new("test_fiber", &mut |tx: Box<Sender<i32>>| {
89+
assert!(tx.send(99).is_err());
90+
0
91+
});
92+
fiber.set_joinable(true);
93+
fiber.start(tx);
94+
fiber.join();
95+
}
96+
97+
pub fn test_channel_tx_closed() {
98+
let (_, rx) = channel::<i32>(1);
99+
100+
let mut fiber = Fiber::new("test_fiber", &mut |rx: Box<Receiver<i32>>| {
101+
assert!(rx.recv().is_none());
102+
0
103+
});
104+
fiber.set_joinable(true);
105+
fiber.start(rx);
106+
fiber.join();
107+
}

0 commit comments

Comments
 (0)