Skip to content

Commit 1b249b3

Browse files
committed
Move subscription mutex from config to connection
This fixes the handling of subscriptions when we clone mqtt_channel::Config. With the previous behaviour, any `Config` clones shared all their subscriptions.
1 parent 2292145 commit 1b249b3

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

crates/common/mqtt_channel/src/config.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::fmt::Formatter;
1111
use std::ops::Deref;
1212
use std::path::Path;
1313
use std::sync::Arc;
14-
use std::sync::Mutex;
1514
use zeroize::Zeroizing;
1615

1716
pub const MAX_PACKET_SIZE: usize = 268435455;
@@ -33,7 +32,7 @@ pub struct Config {
3332
/// The list of topics to subscribe to on connect
3433
///
3534
/// Default: An empty topic list
36-
pub subscriptions: Arc<Mutex<TopicFilter>>,
35+
pub subscriptions: TopicFilter,
3736

3837
/// Clean the MQTT session upon connect if set to `true`.
3938
///
@@ -164,7 +163,7 @@ impl Default for Config {
164163
authentication: None,
165164
},
166165
session_name: None,
167-
subscriptions: Arc::new(Mutex::new(TopicFilter::empty())),
166+
subscriptions: TopicFilter::empty(),
168167
clean_session: false,
169168
queue_capacity: 1024,
170169
max_packet_size: 16 * 1024 * 1024,
@@ -214,8 +213,8 @@ impl Config {
214213
/// Add a list of topics to subscribe to on connect
215214
///
216215
/// Can be called several times to subscribe to many topics.
217-
pub fn with_subscriptions(self, topics: TopicFilter) -> Self {
218-
self.subscriptions.lock().unwrap().add_all(topics);
216+
pub fn with_subscriptions(mut self, topics: TopicFilter) -> Self {
217+
self.subscriptions.add_all(topics);
219218
self
220219
}
221220

crates/common/mqtt_channel/src/connection.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,15 @@ impl Connection {
164164
let (published_sender, published_receiver) = mpsc::unbounded();
165165
let (error_sender, error_receiver) = mpsc::unbounded();
166166
let (pub_done_sender, pub_done_receiver) = oneshot::channel();
167+
let subscriptions = Arc::new(Mutex::new(config.subscriptions.clone()));
167168

168-
let (mqtt_client, event_loop) =
169-
Connection::open(config, received_sender.clone(), error_sender.clone()).await?;
169+
let (mqtt_client, event_loop) = Connection::open(
170+
config,
171+
received_sender.clone(),
172+
error_sender.clone(),
173+
subscriptions.clone(),
174+
)
175+
.await?;
170176
let permits = Arc::new(Semaphore::new(1));
171177
let permit = permits.clone().acquire_owned().await.unwrap();
172178
let pub_count = Arc::new(AtomicUsize::new(0));
@@ -179,6 +185,7 @@ impl Connection {
179185
pub_done_sender,
180186
permits,
181187
pub_count.clone(),
188+
subscriptions.clone(),
182189
));
183190
tokio::spawn(Connection::sender_loop(
184191
mqtt_client.clone(),
@@ -194,7 +201,7 @@ impl Connection {
194201
published: published_sender,
195202
errors: error_receiver,
196203
pub_done: pub_done_receiver,
197-
subscriptions: SubscriberHandle::new(mqtt_client, config.subscriptions.clone()),
204+
subscriptions: SubscriberHandle::new(mqtt_client, subscriptions),
198205
})
199206
}
200207

@@ -207,6 +214,7 @@ impl Connection {
207214
config: &Config,
208215
mut message_sender: mpsc::UnboundedSender<MqttMessage>,
209216
mut error_sender: mpsc::UnboundedSender<MqttError>,
217+
subscriptions: Arc<Mutex<TopicFilter>>,
210218
) -> Result<(AsyncClient, EventLoop), MqttError> {
211219
const INSECURE_MQTT_PORT: u16 = 1883;
212220
const SECURE_MQTT_PORT: u16 = 8883;
@@ -234,7 +242,7 @@ impl Connection {
234242
};
235243
info!(target: "MQTT", "Connection established");
236244

237-
let subscriptions = config.subscriptions.lock().unwrap().filters();
245+
let subscriptions = subscriptions.lock().unwrap().filters();
238246

239247
// Need check here otherwise it will hang waiting for a SubAck, and none will come when there is no subscription.
240248
if subscriptions.is_empty() {
@@ -291,6 +299,7 @@ impl Connection {
291299
done: oneshot::Sender<()>,
292300
permits: Arc<Semaphore>,
293301
pub_count: Arc<AtomicUsize>,
302+
subscriptions: Arc<Mutex<TopicFilter>>,
294303
) -> Result<(), MqttError> {
295304
let mut triggered_disconnect = false;
296305
let mut disconnect_permit = None;
@@ -363,7 +372,7 @@ impl Connection {
363372
// If session_name is not provided or if the broker session persistence
364373
// is not enabled or working, then re-subscribe
365374

366-
let subscriptions = config.subscriptions.lock().unwrap().filters();
375+
let subscriptions = subscriptions.lock().unwrap().filters();
367376
// Need check here otherwise it will hang waiting for a SubAck, and none will come when there is no subscription.
368377
if subscriptions.is_empty() {
369378
break;

0 commit comments

Comments
 (0)