Skip to content

Commit fa2e9ca

Browse files
sqllqqgmoshkin
authored andcommitted
fix: use thread queue for cbus std
1 parent 926eb40 commit fa2e9ca

File tree

2 files changed

+68
-53
lines changed

2 files changed

+68
-53
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ restricting time connection establishment.
2020

2121
### Changed
2222
- `network::protocol::codec::IProtoType` uses C language representation
23+
- `cbus::sync::std::ThreadWaker` now uses internal thread FIFO queue when blocking threads on send.
2324

2425
### Fixed
2526
- `tlua::{Push, PushInto, LuaRead}` now work for HashSet & HashMap with custom hashers.

tarantool/src/cbus/sync/std.rs

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,68 +2,87 @@ use crate::cbus::{LCPipe, RecvError, SendError};
22
use crate::fiber::Cond;
33
use std::cell::RefCell;
44
use std::num::NonZeroUsize;
5-
use std::sync;
65
use std::sync::atomic::{AtomicBool, Ordering};
7-
use std::sync::Condvar as StdCondvar;
8-
use std::sync::{Arc, Mutex, Weak};
6+
use std::sync::{self, Arc, Mutex, Weak};
7+
use std::thread;
98

109
type CordWaker = crate::cbus::unbounded::Waker;
1110

11+
/// Current thread process handler.
12+
#[derive(Clone)]
13+
struct Thread {
14+
inner: thread::Thread,
15+
flag: Arc<AtomicBool>,
16+
}
17+
18+
impl Thread {
19+
fn current() -> Self {
20+
Self {
21+
inner: thread::current(),
22+
flag: Arc::new(AtomicBool::new(true)),
23+
}
24+
}
25+
26+
fn park(&self) {
27+
if self
28+
.flag
29+
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
30+
.is_ok()
31+
{
32+
thread::park();
33+
};
34+
}
35+
36+
fn unpark(&self) {
37+
if self
38+
.flag
39+
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
40+
.is_ok()
41+
{
42+
self.inner.unpark();
43+
};
44+
}
45+
}
46+
1247
/// A synchronization component between producers (an OS thread) and a consumer (a cord).
1348
/// The responsibility of this component is to wake up a producer when it's blocked because
1449
/// channel internal buffer is full.
1550
struct ThreadWaker {
16-
lock: Mutex<bool>,
17-
cond: StdCondvar,
51+
/// A queue of threads that are waiting to send data.
52+
list: crossbeam_queue::SegQueue<Thread>,
1853
}
1954

2055
impl ThreadWaker {
2156
fn new() -> Self {
2257
Self {
23-
lock: Mutex::new(false),
24-
cond: StdCondvar::new(),
58+
list: crossbeam_queue::SegQueue::new(),
2559
}
2660
}
2761

2862
/// Lock until waker is woken up.
2963
/// In context of sync-channels, return from this function mean that there's some free
3064
/// space in message buffer, or receiver is disconnected.
3165
fn wait(&self, disconnected: &AtomicBool) {
32-
let mut started = self
33-
.lock
34-
.lock()
35-
.expect("unexpected panic in consumer thread");
36-
3766
if disconnected.load(Ordering::Acquire) {
3867
return;
3968
}
40-
41-
while !*started {
42-
started = self
43-
.cond
44-
.wait(started)
45-
.expect("unexpected panic in consumer thread");
46-
}
69+
let t = Thread::current();
70+
self.list.push(t.clone());
71+
t.park();
4772
}
4873

4974
/// Send wakeup signal to a single [`ThreadWaker::wait`] caller.
5075
fn wakeup_one(&self) {
51-
let mut started = self
52-
.lock
53-
.lock()
54-
.expect("unexpected panic in producer thread");
55-
*started = true;
56-
self.cond.notify_one();
76+
if let Some(thread) = self.list.pop() {
77+
thread.unpark();
78+
}
5779
}
5880

5981
/// Send wakeup signal to all [`ThreadWaker::wait`] callers.
6082
fn wakeup_all(&self) {
61-
let mut started = self
62-
.lock
63-
.lock()
64-
.expect("unexpected panic in producer thread");
65-
*started = true;
66-
self.cond.notify_all();
83+
while let Some(thread) = self.list.pop() {
84+
thread.unpark();
85+
}
6786
}
6887
}
6988

@@ -215,29 +234,21 @@ impl<T> Sender<T> {
215234
/// * `message`: message to send
216235
pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
217236
let mut msg = msg;
218-
// We assume that this lock has a minimal impact on performance, in most of situations
219-
// lock of mutex will take the fast path.
220-
let _crit_section = self.arc_guard.lock().unwrap();
221-
222-
// wake up a sleeping receiver
223-
if let Some(waker) = self.cord_waker.upgrade() {
224-
loop {
225-
let push_result = self.inner.chan.list.push(msg);
226-
if let Err(not_accepted_msg) = push_result {
227-
self.thread_waker.wait(&self.inner.chan.disconnected);
228-
if self.inner.chan.disconnected.load(Ordering::Acquire) {
229-
return Err(SendError(not_accepted_msg));
230-
}
231-
msg = not_accepted_msg;
232-
} else {
233-
break;
234-
}
237+
loop {
238+
if self.inner.chan.disconnected.load(Ordering::Acquire) {
239+
return Err(SendError(msg));
235240
}
236-
237-
waker.wakeup(&mut self.lcpipe.borrow_mut());
238-
Ok(())
239-
} else {
240-
Err(SendError(msg))
241+
let crit_section = self.arc_guard.lock().unwrap();
242+
let Some(waker) = self.cord_waker.upgrade() else {
243+
return Err(SendError(msg));
244+
};
245+
let Err(not_accepted_msg) = self.inner.chan.list.push(msg) else {
246+
waker.wakeup(&mut self.lcpipe.borrow_mut());
247+
return Ok(());
248+
};
249+
msg = not_accepted_msg;
250+
drop(crit_section);
251+
self.thread_waker.wait(&self.inner.chan.disconnected);
241252
}
242253
}
243254
}
@@ -277,6 +288,9 @@ impl<T> EndpointReceiver<T> {
277288
return Err(RecvError::Disconnected);
278289
}
279290

291+
// Need to wake thread so it can push message
292+
// FIXME: why cord waker waits it's cond for 1ms ?
293+
self.thread_waker.wakeup_one();
280294
self.cord_waker
281295
.as_ref()
282296
.expect("unreachable: waker must exists")

0 commit comments

Comments
 (0)