Skip to content
This repository was archived by the owner on May 11, 2023. It is now read-only.

Commit de2b52d

Browse files
committed
zb: Named field for ConnectionInner in Connection struct
In a following commit, we'll be adding more fields in `Connection` directly.
1 parent 4d46b99 commit de2b52d

File tree

1 file changed

+38
-34
lines changed

1 file changed

+38
-34
lines changed

zbus/src/azync/connection.rs

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,9 @@ impl MessageReceiverTask<Box<dyn Socket>> {
305305
///
306306
/// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
307307
#[derive(Clone, Debug)]
308-
pub struct Connection(Arc<ConnectionInner<Box<dyn Socket>>>);
308+
pub struct Connection {
309+
inner: Arc<ConnectionInner<Box<dyn Socket>>>,
310+
}
309311

310312
assert_impl_all!(Connection: Send, Sync, Unpin);
311313

@@ -367,14 +369,14 @@ impl Connection {
367369
/// Get a stream to receive incoming messages.
368370
pub async fn stream(&self) -> MessageStream {
369371
let msg_receiver = self
370-
.0
372+
.inner
371373
.msg_receiver
372374
.read()
373375
// SAFETY: Not much we can do about a poisoned mutex.
374376
.expect("poisoned lock")
375377
.activate_cloned()
376378
.map(Ok);
377-
let error_stream = self.0.error_receiver.clone().map(Err);
379+
let error_stream = self.inner.error_receiver.clone().map(Err);
378380
let stream = stream_select(error_stream, msg_receiver).boxed();
379381

380382
MessageStream { stream }
@@ -519,7 +521,7 @@ impl Connection {
519521
///
520522
/// This will return `false` for p2p connections.
521523
pub fn is_bus(&self) -> bool {
522-
self.0.bus_conn
524+
self.inner.bus_conn
523525
}
524526

525527
/// Assigns a serial number to `msg` that is unique to this connection.
@@ -537,12 +539,12 @@ impl Connection {
537539

538540
/// The unique name as assigned by the message bus or `None` if not a message bus connection.
539541
pub fn unique_name(&self) -> Option<&str> {
540-
self.0.unique_name.get().map(|s| s.as_str())
542+
self.inner.unique_name.get().map(|s| s.as_str())
541543
}
542544

543545
/// Max number of messages to queue.
544546
pub fn max_queued(&self) -> usize {
545-
self.0
547+
self.inner
546548
.msg_receiver
547549
.read()
548550
.expect("poisoned lock")
@@ -575,7 +577,7 @@ impl Connection {
575577
///# Ok::<_, Box<dyn Error + Send + Sync>>(())
576578
/// ```
577579
pub fn set_max_queued(self, max: usize) -> Self {
578-
self.0
580+
self.inner
579581
.msg_receiver
580582
.write()
581583
.expect("poisoned lock")
@@ -586,7 +588,7 @@ impl Connection {
586588

587589
/// The server's GUID.
588590
pub fn server_guid(&self) -> &str {
589-
self.0.server_guid.as_str()
591+
self.inner.server_guid.as_str()
590592
}
591593

592594
/// The underlying executor.
@@ -625,12 +627,12 @@ impl Connection {
625627
///
626628
/// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
627629
pub fn executor(&self) -> &Executor<'static> {
628-
&self.0.executor
630+
&self.inner.executor
629631
}
630632

631633
/// Get the raw file descriptor of this connection.
632634
pub async fn as_raw_fd(&self) -> RawFd {
633-
(self.0.raw_in_conn.lock().await.socket()).as_raw_fd()
635+
(self.inner.raw_in_conn.lock().await.socket()).as_raw_fd()
634636
}
635637

636638
pub(crate) async fn subscribe_signal<'s, E>(
@@ -645,7 +647,7 @@ impl Connection {
645647
{
646648
let signal = SignalInfo::new(sender, path, interface, signal_name)?;
647649
let hash = signal.calc_hash();
648-
let mut subscriptions = self.0.signal_subscriptions.lock().await;
650+
let mut subscriptions = self.inner.signal_subscriptions.lock().await;
649651
match subscriptions.get_mut(&hash) {
650652
Some(subscription) => subscription.num_subscribers += 1,
651653
None => {
@@ -689,7 +691,7 @@ impl Connection {
689691
}
690692

691693
pub(crate) async fn unsubscribe_signal_by_id(&self, subscription_id: u64) -> Result<bool> {
692-
let mut subscriptions = self.0.signal_subscriptions.lock().await;
694+
let mut subscriptions = self.inner.signal_subscriptions.lock().await;
693695
match subscriptions.get_mut(&subscription_id) {
694696
Some(subscription) => {
695697
subscription.num_subscribers -= 1;
@@ -715,7 +717,7 @@ impl Connection {
715717

716718
pub(crate) fn queue_unsubscribe_signal(&self, subscription_id: u64) {
717719
let conn = self.clone();
718-
self.0
720+
self.inner
719721
.executor
720722
.spawn(async move {
721723
// FIXME: Ignoring the errors here. We should at least log a message when we've
@@ -742,7 +744,7 @@ impl Connection {
742744
let name = {
743745
use futures_util::future::{select, Either};
744746

745-
let executor = self.0.executor.clone();
747+
let executor = self.inner.executor.clone();
746748
let ticking_future = async move {
747749
// Keep running as long as this task/future is not cancelled.
748750
loop {
@@ -759,7 +761,7 @@ impl Connection {
759761
}
760762
};
761763

762-
self.0
764+
self.inner
763765
.unique_name
764766
.set(name)
765767
// programmer (probably our) error if this fails.
@@ -793,20 +795,22 @@ impl Connection {
793795
MessageReceiverTask::new(raw_in_conn.clone(), msg_sender, error_sender)
794796
.spawn(&executor);
795797

796-
let connection = Self(Arc::new(ConnectionInner {
797-
raw_in_conn,
798-
sink,
799-
error_receiver,
800-
server_guid: auth.server_guid,
801-
cap_unix_fd,
802-
bus_conn: bus_connection,
803-
serial: AtomicU32::new(1),
804-
unique_name: OnceCell::new(),
805-
signal_subscriptions: Mutex::new(HashMap::new()),
806-
msg_receiver: sync::RwLock::new(msg_receiver),
807-
executor: executor.clone(),
808-
msg_receiver_task: sync::Mutex::new(Some(msg_receiver_task)),
809-
}));
798+
let connection = Self {
799+
inner: Arc::new(ConnectionInner {
800+
raw_in_conn,
801+
sink,
802+
error_receiver,
803+
server_guid: auth.server_guid,
804+
cap_unix_fd,
805+
bus_conn: bus_connection,
806+
serial: AtomicU32::new(1),
807+
unique_name: OnceCell::new(),
808+
signal_subscriptions: Mutex::new(HashMap::new()),
809+
msg_receiver: sync::RwLock::new(msg_receiver),
810+
executor: executor.clone(),
811+
msg_receiver_task: sync::Mutex::new(Some(msg_receiver_task)),
812+
}),
813+
};
810814

811815
#[cfg(feature = "internal-executor")]
812816
std::thread::Builder::new()
@@ -831,7 +835,7 @@ impl Connection {
831835
}
832836

833837
fn next_serial(&self) -> u32 {
834-
self.0.serial.fetch_add(1, SeqCst)
838+
self.inner.serial.fetch_add(1, SeqCst)
835839
}
836840

837841
/// Create a `Connection` to the session/user message bus.
@@ -922,19 +926,19 @@ impl Sink<Message> for Connection {
922926
type Error = Error;
923927

924928
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
925-
Pin::new(&mut *self.0.sink.lock().expect("poisoned lock")).poll_ready(cx)
929+
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_ready(cx)
926930
}
927931

928932
fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> {
929-
Pin::new(&mut *self.0.sink.lock().expect("poisoned lock")).start_send(msg)
933+
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).start_send(msg)
930934
}
931935

932936
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
933-
Pin::new(&mut *self.0.sink.lock().expect("poisoned lock")).poll_flush(cx)
937+
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_flush(cx)
934938
}
935939

936940
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
937-
Pin::new(&mut *self.0.sink.lock().expect("poisoned lock")).poll_close(cx)
941+
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_close(cx)
938942
}
939943
}
940944

0 commit comments

Comments
 (0)