Skip to content
This repository was archived by the owner on May 11, 2023. It is now read-only.

Commit 3a0d099

Browse files
committed
zb: Drop internal MessageSink entirely
1 parent 26c433a commit 3a0d099

File tree

2 files changed

+50
-66
lines changed

2 files changed

+50
-66
lines changed

zbus/src/azync/connection.rs

Lines changed: 18 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ struct ConnectionInner<S> {
119119
raw_in_conn: Arc<Mutex<RawConnection<Async<S>>>>,
120120
// FIXME: We really should be using async_lock::Mutex here but `Sink::start_send is not very
121121
// async friendly. :(
122-
sink: Arc<sync::Mutex<MessageSink>>,
122+
raw_out_conn: Arc<sync::Mutex<DynSocketConnection>>,
123123
// Serial number for next outgoing message
124124
serial: AtomicU32,
125125

@@ -751,10 +751,7 @@ impl Connection {
751751
let out_socket = auth.conn.socket().get_ref().try_clone()?;
752752
let out_conn = RawConnection::wrap(Async::new(out_socket)?);
753753
let cap_unix_fd = auth.cap_unix_fd;
754-
let sink = Arc::new(sync::Mutex::new(MessageSink {
755-
raw_conn: out_conn,
756-
cap_unix_fd,
757-
}));
754+
let raw_out_conn = Arc::new(sync::Mutex::new(out_conn));
758755

759756
let (mut msg_sender, msg_receiver) = broadcast(DEFAULT_MAX_QUEUED);
760757
msg_sender.set_overflow(true);
@@ -772,7 +769,7 @@ impl Connection {
772769
msg_receiver,
773770
inner: Arc::new(ConnectionInner {
774771
raw_in_conn,
775-
sink,
772+
raw_out_conn,
776773
server_guid: auth.server_guid,
777774
cap_unix_fd,
778775
bus_conn: bus_connection,
@@ -828,39 +825,7 @@ impl Connection {
828825
}
829826
}
830827

831-
#[derive(Debug)]
832-
struct MessageSink {
833-
raw_conn: DynSocketConnection,
834-
cap_unix_fd: bool,
835-
}
836-
837-
assert_impl_all!(MessageSink: Send, Sync, Unpin);
838-
839-
impl MessageSink {
840-
fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
841-
loop {
842-
match self.raw_conn.try_flush() {
843-
Ok(()) => return Poll::Ready(Ok(())),
844-
Err(e) => {
845-
if e.kind() == ErrorKind::WouldBlock {
846-
let poll = self.raw_conn.socket().poll_writable(cx);
847-
848-
match poll {
849-
Poll::Pending => return Poll::Pending,
850-
// Guess socket became ready already so let's try it again.
851-
Poll::Ready(Ok(_)) => continue,
852-
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
853-
}
854-
} else {
855-
return Poll::Ready(Err(Error::Io(e)));
856-
}
857-
}
858-
}
859-
}
860-
}
861-
}
862-
863-
impl Sink<Message> for MessageSink {
828+
impl Sink<Message> for Connection {
864829
type Error = Error;
865830

866831
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
@@ -869,48 +834,36 @@ impl Sink<Message> for MessageSink {
869834
}
870835

871836
fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> {
872-
if !msg.fds().is_empty() && !self.cap_unix_fd {
837+
if !msg.fds().is_empty() && !self.inner.cap_unix_fd {
873838
return Err(Error::Unsupported);
874839
}
875840

876-
self.get_mut().raw_conn.enqueue_message(msg);
841+
self.inner
842+
.raw_out_conn
843+
.lock()
844+
.expect("poisened lock")
845+
.enqueue_message(msg);
877846

878847
Ok(())
879848
}
880849

881850
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
882-
self.get_mut().flush(cx)
851+
self.inner
852+
.raw_out_conn
853+
.lock()
854+
.expect("poisened lock")
855+
.flush(cx)
883856
}
884857

885858
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
886-
let sink = self.get_mut();
887-
match sink.flush(cx) {
859+
let mut raw_out_conn = self.inner.raw_out_conn.lock().expect("poisened lock");
860+
match raw_out_conn.flush(cx) {
888861
Poll::Ready(Ok(_)) => (),
889862
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
890863
Poll::Pending => return Poll::Pending,
891864
}
892865

893-
Poll::Ready(sink.raw_conn.close())
894-
}
895-
}
896-
897-
impl Sink<Message> for Connection {
898-
type Error = Error;
899-
900-
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
901-
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_ready(cx)
902-
}
903-
904-
fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> {
905-
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).start_send(msg)
906-
}
907-
908-
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
909-
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_flush(cx)
910-
}
911-
912-
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
913-
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_close(cx)
866+
Poll::Ready(raw_out_conn.close())
914867
}
915868
}
916869

zbus/src/raw/connection.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
use std::{collections::VecDeque, io};
1+
use std::{
2+
collections::VecDeque,
3+
io::{self, ErrorKind},
4+
task::{Context, Poll},
5+
};
6+
7+
use async_io::Async;
28

39
use crate::{message::Message, message_header::MIN_MESSAGE_SIZE, raw::Socket, OwnedFd};
410

@@ -168,6 +174,31 @@ impl<S: Socket> Connection<S> {
168174
}
169175
}
170176

177+
impl Connection<Async<Box<dyn Socket>>> {
178+
/// Same as `try_flush` above, except it wraps the method for use in [`std::future::Future`] impls.
179+
pub(crate) fn flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
180+
loop {
181+
match self.try_flush() {
182+
Ok(()) => return Poll::Ready(Ok(())),
183+
Err(e) => {
184+
if e.kind() == ErrorKind::WouldBlock {
185+
let poll = self.socket().poll_writable(cx);
186+
187+
match poll {
188+
Poll::Pending => return Poll::Pending,
189+
// Guess socket became ready already so let's try it again.
190+
Poll::Ready(Ok(_)) => continue,
191+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
192+
}
193+
} else {
194+
return Poll::Ready(Err(crate::Error::Io(e)));
195+
}
196+
}
197+
}
198+
}
199+
}
200+
}
201+
171202
#[cfg(test)]
172203
mod tests {
173204
use super::Connection;

0 commit comments

Comments
 (0)