Skip to content
This repository was archived by the owner on Nov 9, 2019. It is now read-only.

Commit e4b2dbe

Browse files
committed
Added documentation for aktoro-channel.
1 parent 4229e02 commit e4b2dbe

File tree

9 files changed

+327
-1
lines changed

9 files changed

+327
-1
lines changed

aktoro-channel/src/builder.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,72 +10,134 @@ use crate::queue::Queue;
1010
use crate::receiver::Receiver;
1111
use crate::sender::Sender;
1212

13+
/// A configuration builder for a
14+
/// channel.
1315
pub struct Builder {
16+
/// The capacity of the future
17+
/// channel, or `None` if it
18+
/// should be unbounded.
1419
cap: Option<usize>,
20+
/// The limit of messages that
21+
/// can be send over the channel,
22+
/// or `None` if no limit should
23+
/// be set.
1524
msgs: Option<usize>,
25+
/// The limit of senders the
26+
/// channel should be allowed to
27+
/// have at the same time, or
28+
/// `None` if no limit should
29+
/// be set.
1630
senders: Option<usize>,
31+
/// The limit of receivers the
32+
/// channel should be allowed to
33+
/// have at the same time, or
34+
/// `None` if no limit should
35+
/// be set.
1736
recvers: Option<usize>,
1837
}
1938

2039
impl Builder {
40+
/// Creates a new builder with
41+
/// the default configuration:
42+
/// - unbounded
43+
/// - no limit of messages
44+
/// - no limit of senders
45+
/// - no limit of receivers
2146
pub fn new() -> Builder {
2247
Builder::default()
2348
}
2449

50+
/// Sets the channel to be created
51+
/// as bounded with `cap` as its
52+
/// buffer capacity.
2553
pub fn bounded(mut self, cap: usize) -> Builder {
2654
self.cap = Some(cap);
2755
self
2856
}
2957

58+
/// Sets the channel to be created
59+
/// as unbounded.
3060
pub fn unbounded(mut self) -> Builder {
3161
self.cap = None;
3262
self
3363
}
3464

65+
/// Sets the maximum number of
66+
/// messages that the channel will
67+
/// be able to pass.
3568
pub fn limited_msgs(mut self, limit: usize) -> Builder {
3669
self.msgs = Some(limit);
3770
self
3871
}
3972

73+
/// Allows an infinite number of
74+
/// messages to be sent over the
75+
/// channel.
4076
pub fn unlimited_msgs(mut self) -> Builder {
4177
self.msgs = None;
4278
self
4379
}
4480

81+
/// Sets the maximum number of
82+
/// senders that the channel will
83+
/// be able to have at the same
84+
/// time.
4585
pub fn limited_senders(mut self, limit: usize) -> Builder {
4686
self.senders = Some(limit);
4787
self
4888
}
4989

90+
/// Alows an inifinite number of
91+
/// senders to be connected to
92+
/// the channel at the same time.
5093
pub fn unlimited_senders(mut self) -> Builder {
5194
self.senders = None;
5295
self
5396
}
5497

98+
/// Sets the maximum number of
99+
/// receivers that will be allowed
100+
/// to be connected to the channel
101+
/// at the same time.
55102
pub fn limited_receivers(mut self, limit: usize) -> Builder {
56103
self.recvers = Some(limit);
57104
self
58105
}
59106

107+
/// Allows an inifinite number of
108+
/// receivers to be connected
109+
/// to the channel at the same time.
60110
pub fn unlimited_receivers(mut self) -> Builder {
61111
self.recvers = None;
62112
self
63113
}
64114

115+
/// Builds the channel using the
116+
/// specified configuration and
117+
/// returning a sender and receiver
118+
/// connected to it.
65119
pub fn build<T>(self) -> (Sender<T>, Receiver<T>) {
120+
// We create either a bounded or
121+
// unbounded queue.
66122
let queue = if let Some(cap) = self.cap {
67123
Queue::Bounded(ArrayQueue::new(cap))
68124
} else {
69125
Queue::Unbounded(SegQueue::new())
70126
};
71127

128+
// We create the channel and put
129+
// it inside an atomically
130+
// reference counted pointer.
72131
let channel = Arc::new(Channel {
73132
queue,
74133
closed: AtomicBool::new(false),
75134
counters: Counters::new(self.msgs, self.senders, self.recvers),
76135
wakers: SegQueue::new(),
77136
});
78137

138+
// We return a sender and a
139+
// receiver containing a copy
140+
// of the pointer.
79141
(Sender::new(channel.clone()), Receiver::new(channel))
80142
}
81143
}

aktoro-channel/src/channel.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use crate::error::*;
99
use crate::message::Message;
1010
use crate::queue::Queue;
1111

12+
/// A channel allowing senders to pass
13+
/// messages over it, and receivers to
14+
/// retrieve them.
1215
pub(crate) struct Channel<T> {
1316
pub(crate) queue: Queue<Message<T>>,
1417
pub(crate) closed: AtomicBool,
@@ -17,54 +20,87 @@ pub(crate) struct Channel<T> {
1720
}
1821

1922
impl<T> Channel<T> {
23+
/// Tries to send a message over the
24+
/// channel.
2025
pub(crate) fn try_send(&self, msg: Message<T>) -> Result<(), TrySendError<T>> {
26+
// If the channel has already
27+
// been closed, we return an
28+
// error.
2129
if self.is_closed() {
2230
return Err(TrySendError::closed(msg.msg));
2331
}
2432

33+
// If we couldn't increase the
34+
// number of messages inside the
35+
// inner counters, we return an
36+
// error.
2537
if self.counters.add_msg().is_err() {
2638
return Err(TrySendError::limit(msg.msg));
2739
}
2840

41+
// We try to push the message
42+
// over the queue.
2943
self.queue
3044
.push(msg)
3145
.map_err(|msg| TrySendError::full(msg.msg))?;
3246

47+
// We notify a receiver that a
48+
// new message is available.
3349
self.notify();
3450

3551
Ok(())
3652
}
3753

54+
/// Tries to receive a message from the
55+
/// channel if one is available.
3856
pub(crate) fn try_recv(&self) -> Result<Option<Message<T>>, TryRecvError> {
57+
// If the queue is empty, we
58+
// return an error if it's closed.
3959
if self.queue.is_empty() {
4060
if self.check_is_closed() {
4161
Err(TryRecvError::closed())
4262
} else {
4363
Ok(None)
4464
}
65+
// Otherwise, we pop try to
66+
// pop a message from it (it
67+
// could return `None` if the
68+
// message was already poped).
4569
} else {
4670
Ok(self.queue.pop())
4771
}
4872
}
4973

74+
/// Registers a new waker to be
75+
/// notified when a new message is
76+
/// available.
5077
pub(crate) fn register(&self, waker: Waker) {
5178
self.wakers.push(waker);
5279
}
5380

81+
/// Notifies a waker if one is
82+
/// available.
5483
fn notify(&self) {
5584
if let Ok(waker) = self.wakers.pop() {
5685
waker.wake();
5786
}
5887
}
5988

89+
/// Whether the queue is empty.
6090
pub(crate) fn is_empty(&self) -> bool {
6191
self.queue.is_empty()
6292
}
6393

94+
/// Whether the channel has been
95+
/// closed.
6496
pub(crate) fn is_closed(&self) -> bool {
6597
self.closed.load(Ordering::SeqCst)
6698
}
6799

100+
/// Verifies whether the channel has
101+
/// been closed and checks if senders
102+
/// are still connected to it (closing
103+
/// the channel if not).
68104
pub(crate) fn check_is_closed(&self) -> bool {
69105
if self.is_closed() {
70106
return true;
@@ -78,6 +114,7 @@ impl<T> Channel<T> {
78114
}
79115
}
80116

117+
/// Closes the channel.
81118
pub(crate) fn close(&self) {
82119
self.closed.store(true, Ordering::SeqCst);
83120
}

aktoro-channel/src/counters.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,37 @@
11
use std::sync::atomic::AtomicUsize;
22
use std::sync::atomic::Ordering;
33

4+
/// Counters used to store the number
5+
/// of senders, receivers, messages
6+
/// sent and the channel's limits.
47
pub(crate) struct Counters {
8+
/// The number of messages sent
9+
/// over the channel.
510
cmsgs: Option<AtomicUsize>,
11+
/// The number of senders
12+
/// connected to the channel.
613
csenders: AtomicUsize,
14+
/// The number of receivers
15+
/// connected to the channel.
716
crecvers: AtomicUsize,
817

18+
/// The number of messages that
19+
/// can be sent over the channel
20+
/// (in total).
921
lmsgs: Option<usize>,
22+
/// The maximum number of senders
23+
/// that can be connected to the
24+
/// channel.
1025
lsenders: Option<usize>,
26+
/// The maximum number of
27+
/// receivers that can be connected
28+
/// to the channel.
1129
lrecvers: Option<usize>,
1230
}
1331

1432
impl Counters {
33+
/// Creates counters with the
34+
/// specified limits.
1535
pub(crate) fn new(msgs: Option<usize>, senders: Option<usize>, recvers: Option<usize>) -> Self {
1636
Counters {
1737
cmsgs: msgs.map(|_| AtomicUsize::new(0)),
@@ -24,14 +44,23 @@ impl Counters {
2444
}
2545
}
2646

47+
/// Gets the current number of
48+
/// senders connected to the channel.
2749
pub(crate) fn senders(&self) -> usize {
2850
self.csenders.load(Ordering::SeqCst)
2951
}
3052

53+
/// Increases the total number of
54+
/// messages sent over the channel if
55+
/// necessary.
3156
pub(crate) fn add_msg(&self) -> Result<(), ()> {
3257
if let Some(counter) = &self.cmsgs {
3358
let limit = self.lmsgs.unwrap();
3459

60+
// We CAS the sent messages
61+
// counter to increase it of
62+
// 1 or return an error if it
63+
// is above the limit.
3564
loop {
3665
let cur = counter.load(Ordering::SeqCst);
3766
let new = cur + 1;
@@ -49,7 +78,15 @@ impl Counters {
4978
Ok(())
5079
}
5180

81+
/// Increases the counter for the
82+
/// number of senders connected to the
83+
/// channel.
5284
pub(crate) fn add_sender(&self) -> Result<(), ()> {
85+
// If there is a limit for the number
86+
// of senders connected to the channel,
87+
// we CAS the counter to increase it of
88+
// 1 and return an error if it is
89+
// above the limit.
5390
if let Some(limit) = &self.lsenders {
5491
loop {
5592
let cur = self.csenders.load(Ordering::SeqCst);
@@ -70,7 +107,15 @@ impl Counters {
70107
Ok(())
71108
}
72109

110+
/// Increases the counter for the
111+
/// number of receivers connected to
112+
/// the channel.
73113
pub(crate) fn add_recver(&self) -> Result<(), ()> {
114+
// If there is a limit for the number
115+
// or receivers that can be connected
116+
// to the channel, we CAS the counter
117+
// to increase it of 1 and return an
118+
// error if it is above the limit.
74119
if let Some(limit) = &self.lrecvers {
75120
loop {
76121
let cur = self.crecvers.load(Ordering::SeqCst);
@@ -91,6 +136,8 @@ impl Counters {
91136
Ok(())
92137
}
93138

139+
/// Decreases the number of senders,
140+
/// returning the updated number.
94141
pub(crate) fn sub_sender(&self) -> usize {
95142
loop {
96143
let cur = self.csenders.load(Ordering::SeqCst);
@@ -105,6 +152,8 @@ impl Counters {
105152
}
106153
}
107154

155+
/// Decreases the number of receivers,
156+
/// returning the updated number.
108157
pub(crate) fn sub_recver(&self) -> usize {
109158
loop {
110159
let cur = self.crecvers.load(Ordering::SeqCst);

0 commit comments

Comments
 (0)