Skip to content

Commit 7c027a1

Browse files
committed
Make more consistent tedge mqtt logging
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent b8d0e03 commit 7c027a1

File tree

5 files changed

+26
-24
lines changed

5 files changed

+26
-24
lines changed

crates/common/mqtt_channel/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ impl Config {
267267
&mut self,
268268
ca_file: impl AsRef<Path>,
269269
) -> Result<&mut Self, certificate::CertificateError> {
270-
debug!("Using CA certificate: {}", ca_file.as_ref().display());
270+
debug!(target: "MQTT", "Using CA certificate: {}", ca_file.as_ref().display());
271271
let authentication_config = self.broker.authentication.get_or_insert(Default::default());
272272
let cert_store = &mut authentication_config.cert_store;
273273

@@ -282,7 +282,7 @@ impl Config {
282282
&mut self,
283283
ca_dir: impl AsRef<Path>,
284284
) -> Result<&mut Self, certificate::CertificateError> {
285-
debug!("Using CA directory: {}", ca_dir.as_ref().display());
285+
debug!(target: "MQTT", "Using CA directory: {}", ca_dir.as_ref().display());
286286
let authentication_config = self.broker.authentication.get_or_insert(Default::default());
287287
let cert_store = &mut authentication_config.cert_store;
288288

@@ -300,8 +300,8 @@ impl Config {
300300
cert_file: P,
301301
key_file: P,
302302
) -> Result<&mut Self, CertificateError> {
303-
debug!("Using client certificate: {}", cert_file.as_ref().display());
304-
debug!("Using client private key: {}", key_file.as_ref().display());
303+
debug!(target: "MQTT", "Using client certificate: {}", cert_file.as_ref().display());
304+
debug!(target: "MQTT", "Using client private key: {}", key_file.as_ref().display());
305305
let cert_chain = parse_root_certificate::read_cert_chain(cert_file)?;
306306
let key = parse_root_certificate::read_pvt_key(key_file)?;
307307

crates/common/mqtt_channel/src/connection.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use futures::SinkExt;
1010
use futures::StreamExt;
1111
use log::error;
1212
use log::info;
13+
use log::warn;
1314
use rumqttc::AsyncClient;
1415
use rumqttc::Event;
1516
use rumqttc::EventLoop;
@@ -132,17 +133,17 @@ impl Connection {
132133
const SECURE_MQTT_PORT: u16 = 8883;
133134

134135
if config.broker.port == INSECURE_MQTT_PORT && config.broker.authentication.is_some() {
135-
eprintln!("WARNING: Connecting on port 1883 for insecure MQTT using a TLS connection");
136+
warn!(target: "MQTT", "Connecting on port 1883 for insecure MQTT using a TLS connection");
136137
}
137138
if config.broker.port == SECURE_MQTT_PORT && config.broker.authentication.is_none() {
138-
eprintln!("WARNING: Connecting on port 8883 for secure MQTT without a CA file");
139+
warn!(target: "MQTT", "Connecting on port 8883 for secure MQTT without a CA file");
139140
}
140141

141142
let mqtt_options = config.rumqttc_options()?;
142143
let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity);
143144

144-
info!(
145-
"MQTT connecting to broker: host={}:{}, session_name={:?}",
145+
info!(target: "MQTT",
146+
"Connecting to broker: host={}:{}, session_name={:?}",
146147
config.broker.host, config.broker.port, config.session_name
147148
);
148149

@@ -152,7 +153,7 @@ impl Connection {
152153
if let Some(err) = MqttError::maybe_connection_error(&ack) {
153154
return Err(err);
154155
};
155-
info!("MQTT connection established");
156+
info!(target: "MQTT", "Connection established");
156157

157158
let subscriptions = config.subscriptions.filters();
158159

@@ -175,16 +176,16 @@ impl Connection {
175176
// Messages can be received before a sub ack
176177
// Errors on send are ignored: it just means the client has closed the receiving channel.
177178
if msg.payload.len() > config.max_packet_size {
178-
error!("Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
179+
error!(target: "MQTT", "Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
179180
msg.topic, msg.payload.len(), config.max_packet_size);
180181
continue;
181182
}
182183
let _ = message_sender.send(msg.into()).await;
183184
}
184185

185186
Err(err) => {
186-
error!(
187-
"MQTT: failed to connect to broker at '{host}:{port}': {err}",
187+
error!(target: "MQTT",
188+
"Failed to connect to broker at '{host}:{port}': {err}",
188189
host = config.broker.host,
189190
port = config.broker.port
190191
);
@@ -244,7 +245,7 @@ impl Connection {
244245
match event {
245246
Ok(Event::Incoming(Packet::Publish(msg))) => {
246247
if msg.payload.len() > config.max_packet_size {
247-
error!("Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
248+
error!(target: "MQTT", "Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
248249
msg.topic, msg.payload.len(), config.max_packet_size);
249250
continue;
250251
}
@@ -255,9 +256,9 @@ impl Connection {
255256

256257
Ok(Event::Incoming(Packet::ConnAck(ack))) => {
257258
if let Some(err) = MqttError::maybe_connection_error(&ack) {
258-
error!("MQTT connection Error {err}");
259+
error!(target: "MQTT", "Connection Error {err}");
259260
} else {
260-
info!("MQTT connection re-established");
261+
info!(target: "MQTT", "Connection re-established");
261262
if let Some(ref imsg_fn) = config.initial_message {
262263
// publish the initial message on connect
263264
let message = imsg_fn.new_init_message();
@@ -292,7 +293,7 @@ impl Connection {
292293
}
293294

294295
Err(err) => {
295-
error!("MQTT connection error: {err}");
296+
error!(target: "MQTT", "Connection error: {err}");
296297

297298
// Errors on send are ignored: it just means the client has closed the receiving channel.
298299
let _ = error_sender.send(err.into()).await;
@@ -307,7 +308,7 @@ impl Connection {
307308
// to make sure the disconnect is effective
308309
loop {
309310
if (event_loop.poll().await).is_err() {
310-
info!("MQTT connection closed");
311+
info!(target: "MQTT", "Connection closed");
311312
break;
312313
}
313314
}

crates/common/mqtt_channel/src/session.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::Config;
22
use crate::MqttError;
3+
use log::error;
34
use log::warn;
45
use rumqttc::AsyncClient;
56
use rumqttc::ConnectReturnCode;
@@ -44,7 +45,7 @@ pub async fn init_session(config: &Config) -> Result<(), MqttError> {
4445
Err(err) => match err {
4546
rumqttc::ConnectionError::ConnectionRefused(ConnectReturnCode::Success) => {}
4647
_ => {
47-
warn!("{}", MqttError::from_connection_error(err));
48+
warn!(target: "MQTT", "{}", MqttError::from_connection_error(err));
4849
break;
4950
}
5051
},
@@ -85,7 +86,7 @@ pub async fn clear_session(config: &Config) -> Result<(), MqttError> {
8586
}
8687

8788
Err(err) => {
88-
eprintln!("Connection Error {}", err);
89+
error!(target: "MQTT", "Connection Error {}", err);
8990
break;
9091
}
9192
_ => (),

crates/core/tedge/src/cli/mqtt/publish.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async fn publish(cmd: &MqttPublishCommand) -> Result<(), anyhow::Error> {
7171
{
7272
Ok(Ok(())) => (),
7373
Ok(err) => err?,
74-
Err(signal) => info!("{signal:?}"),
74+
Err(signal) => info!(target: "MQTT", "{signal:?}"),
7575
}
7676
mqtt.close().await;
7777

crates/core/tedge/src/cli/mqtt/subscribe.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use mqtt_channel::TopicFilter;
77
use std::time::Duration;
88
use tedge_config::tedge_toml::MqttAuthClientConfig;
99
use tokio::io::AsyncWriteExt;
10+
use tracing::error;
1011
use tracing::info;
1112

1213
const DEFAULT_QUEUE_CAPACITY: usize = 10;
1314
use super::MAX_PACKET_SIZE;
14-
use crate::error;
1515

1616
pub struct MqttSubscribeCommand {
1717
pub host: String,
@@ -76,7 +76,7 @@ async fn subscribe(cmd: &MqttSubscribeCommand) -> Result<(), anyhow::Error> {
7676
Ok(Some(message)) => message,
7777
Ok(None) => break,
7878
Err(signal) => {
79-
info!("{signal:?}");
79+
info!(target: "MQTT", "{signal:?}");
8080
break;
8181
}
8282
};
@@ -92,11 +92,11 @@ async fn subscribe(cmd: &MqttSubscribeCommand) -> Result<(), anyhow::Error> {
9292
let _ = stdout.flush().await;
9393
n_messages += 1;
9494
if matches!(cmd.count, Some(count) if count > 0 && n_messages >= count) {
95-
info!("Received {n_messages} message/s");
95+
info!(target: "MQTT", "Received {n_messages} message/s");
9696
break;
9797
}
9898
}
99-
Err(err) => error!("{err}"),
99+
Err(err) => error!(target: "MQTT", "{err}"),
100100
}
101101
}
102102

0 commit comments

Comments
 (0)