Skip to content

Commit d4c5554

Browse files
Merge pull request #3466 from didier-wenzek/refactor/async-tedge-cli
refactor: make tedge cli async
2 parents af23b46 + 35ebfc7 commit d4c5554

File tree

99 files changed

+1926
-1704
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+1926
-1704
lines changed

.config/nextest.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,15 @@
22
retries = 0
33
test-threads = "num-cpus"
44

5+
[test-groups]
6+
serial = { max-threads = 1 }
7+
58
[[profile.default.overrides]]
69
# Remove once https://github.com/thin-edge/thin-edge.io/issues/3030 is resolved
710
filter = 'test(uploaded_file_can_be_downloaded_from_the_api)'
811
retries = 4
12+
13+
[[profile.default.overrides]]
14+
filter = 'package(c8y_firmware_manager)'
15+
test-group = 'serial'
16+

Cargo.lock

Lines changed: 13 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ collectd_ext = { path = "crates/extensions/collectd_ext" }
3838
download = { path = "crates/common/download" }
3939
flockfile = { path = "crates/common/flockfile" }
4040
json-writer = { path = "crates/common/json_writer" }
41-
log_manager = { path = "crates/common/log_manager" }
4241
mqtt_channel = { path = "crates/common/mqtt_channel" }
4342
mqtt_tests = { path = "crates/tests/mqtt_tests" }
4443
plugin_sm = { path = "crates/core/plugin_sm" }
@@ -77,7 +76,7 @@ assert-json-diff = "2.0"
7776
assert_cmd = "2.0"
7877
assert_matches = "1.5"
7978
async-compat = "0.2.1"
80-
async-log = "2.0"
79+
async-tempfile = "0.7"
8180
async-trait = "0.1"
8281
async-tungstenite = { version = "0.28", features = [
8382
"tokio-runtime",
@@ -110,7 +109,6 @@ figment = { version = "0.10" }
110109
filetime = "0.2"
111110
freedesktop_entry_parser = "1.3.0"
112111
futures = "0.3"
113-
futures-timer = "3.0"
114112
futures-util = "0.3.25"
115113
glob = "0.3"
116114
heck = "0.4.1"
@@ -171,8 +169,6 @@ serde_json = "1.0"
171169
sha-1 = "0.10"
172170
sha256 = "1.1"
173171
shell-words = "1.1"
174-
signal-hook = "0.3"
175-
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
176172
strum = "0.24"
177173
strum_macros = "0.24"
178174
syn = { version = "2", features = ["full", "extra-traits"] }
@@ -189,7 +185,6 @@ tower = "0.4"
189185
tracing = { version = "0.1", features = ["attributes", "log"] }
190186
tracing-subscriber = { version = "0.3", features = ["time", "env-filter"] }
191187
try-traits = "0.1"
192-
tungstenite = "0.26.1"
193188
url = "2.3"
194189
uzers = "0.11"
195190
walkdir = "2"

crates/common/certificate/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ repository = { workspace = true }
1010

1111
[features]
1212
default = []
13-
reqwest-blocking = ["dep:reqwest", "reqwest/blocking"]
1413
reqwest = ["dep:reqwest"]
1514
cryptoki = ["dep:cryptoki"]
1615

crates/common/certificate/src/cloud_root_certificate.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,6 @@ impl CloudRootCerts {
2727
.build()
2828
.expect("Valid reqwest client builder configuration")
2929
}
30-
31-
#[allow(clippy::disallowed_types)]
32-
#[cfg(feature = "reqwest-blocking")]
33-
pub fn blocking_client_builder(&self) -> reqwest::blocking::ClientBuilder {
34-
self.certificates
35-
.iter()
36-
.cloned()
37-
.fold(reqwest::blocking::ClientBuilder::new(), |builder, cert| {
38-
builder.add_root_certificate(cert)
39-
})
40-
}
41-
42-
#[allow(clippy::disallowed_types)]
43-
#[cfg(feature = "reqwest-blocking")]
44-
pub fn blocking_client(&self) -> reqwest::blocking::Client {
45-
self.blocking_client_builder()
46-
.build()
47-
.expect("Valid reqwest client builder configuration")
48-
}
4930
}
5031

5132
impl From<Arc<[Certificate]>> for CloudRootCerts {

crates/common/mqtt_channel/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ impl Config {
267267
&mut self,
268268
ca_file: impl AsRef<Path>,
269269
) -> Result<&mut Self, certificate::CertificateError> {
270-
debug!("Using CA certificate: {}", ca_file.as_ref().display());
270+
debug!(target: "MQTT", "Using CA certificate: {}", ca_file.as_ref().display());
271271
let authentication_config = self.broker.authentication.get_or_insert(Default::default());
272272
let cert_store = &mut authentication_config.cert_store;
273273

@@ -282,7 +282,7 @@ impl Config {
282282
&mut self,
283283
ca_dir: impl AsRef<Path>,
284284
) -> Result<&mut Self, certificate::CertificateError> {
285-
debug!("Using CA directory: {}", ca_dir.as_ref().display());
285+
debug!(target: "MQTT", "Using CA directory: {}", ca_dir.as_ref().display());
286286
let authentication_config = self.broker.authentication.get_or_insert(Default::default());
287287
let cert_store = &mut authentication_config.cert_store;
288288

@@ -300,8 +300,8 @@ impl Config {
300300
cert_file: P,
301301
key_file: P,
302302
) -> Result<&mut Self, CertificateError> {
303-
debug!("Using client certificate: {}", cert_file.as_ref().display());
304-
debug!("Using client private key: {}", key_file.as_ref().display());
303+
debug!(target: "MQTT", "Using client certificate: {}", cert_file.as_ref().display());
304+
debug!(target: "MQTT", "Using client private key: {}", key_file.as_ref().display());
305305
let cert_chain = parse_root_certificate::read_cert_chain(cert_file)?;
306306
let key = parse_root_certificate::read_pvt_key(key_file)?;
307307

crates/common/mqtt_channel/src/connection.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use futures::SinkExt;
1010
use futures::StreamExt;
1111
use log::error;
1212
use log::info;
13+
use log::warn;
1314
use rumqttc::AsyncClient;
1415
use rumqttc::Event;
1516
use rumqttc::EventLoop;
@@ -132,17 +133,17 @@ impl Connection {
132133
const SECURE_MQTT_PORT: u16 = 8883;
133134

134135
if config.broker.port == INSECURE_MQTT_PORT && config.broker.authentication.is_some() {
135-
eprintln!("WARNING: Connecting on port 1883 for insecure MQTT using a TLS connection");
136+
warn!(target: "MQTT", "Connecting on port 1883 for insecure MQTT using a TLS connection");
136137
}
137138
if config.broker.port == SECURE_MQTT_PORT && config.broker.authentication.is_none() {
138-
eprintln!("WARNING: Connecting on port 8883 for secure MQTT without a CA file");
139+
warn!(target: "MQTT", "Connecting on port 8883 for secure MQTT without a CA file");
139140
}
140141

141142
let mqtt_options = config.rumqttc_options()?;
142143
let (mqtt_client, mut event_loop) = AsyncClient::new(mqtt_options, config.queue_capacity);
143144

144-
info!(
145-
"MQTT connecting to broker: host={}:{}, session_name={:?}",
145+
info!(target: "MQTT",
146+
"Connecting to broker: host={}:{}, session_name={:?}",
146147
config.broker.host, config.broker.port, config.session_name
147148
);
148149

@@ -152,7 +153,7 @@ impl Connection {
152153
if let Some(err) = MqttError::maybe_connection_error(&ack) {
153154
return Err(err);
154155
};
155-
info!("MQTT connection established");
156+
info!(target: "MQTT", "Connection established");
156157

157158
let subscriptions = config.subscriptions.filters();
158159

@@ -175,16 +176,16 @@ impl Connection {
175176
// Messages can be received before a sub ack
176177
// Errors on send are ignored: it just means the client has closed the receiving channel.
177178
if msg.payload.len() > config.max_packet_size {
178-
error!("Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
179+
error!(target: "MQTT", "Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
179180
msg.topic, msg.payload.len(), config.max_packet_size);
180181
continue;
181182
}
182183
let _ = message_sender.send(msg.into()).await;
183184
}
184185

185186
Err(err) => {
186-
error!(
187-
"MQTT: failed to connect to broker at '{host}:{port}': {err}",
187+
error!(target: "MQTT",
188+
"Failed to connect to broker at '{host}:{port}': {err}",
188189
host = config.broker.host,
189190
port = config.broker.port
190191
);
@@ -244,7 +245,7 @@ impl Connection {
244245
match event {
245246
Ok(Event::Incoming(Packet::Publish(msg))) => {
246247
if msg.payload.len() > config.max_packet_size {
247-
error!("Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
248+
error!(target: "MQTT", "Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}",
248249
msg.topic, msg.payload.len(), config.max_packet_size);
249250
continue;
250251
}
@@ -255,9 +256,9 @@ impl Connection {
255256

256257
Ok(Event::Incoming(Packet::ConnAck(ack))) => {
257258
if let Some(err) = MqttError::maybe_connection_error(&ack) {
258-
error!("MQTT connection Error {err}");
259+
error!(target: "MQTT", "Connection Error {err}");
259260
} else {
260-
info!("MQTT connection re-established");
261+
info!(target: "MQTT", "Connection re-established");
261262
if let Some(ref imsg_fn) = config.initial_message {
262263
// publish the initial message on connect
263264
let message = imsg_fn.new_init_message();
@@ -288,12 +289,11 @@ impl Connection {
288289

289290
Ok(Event::Incoming(Incoming::Disconnect))
290291
| Ok(Event::Outgoing(Outgoing::Disconnect)) => {
291-
info!("MQTT connection closed");
292292
break;
293293
}
294294

295295
Err(err) => {
296-
error!("MQTT connection error: {err}");
296+
error!(target: "MQTT", "Connection error: {err}");
297297

298298
// Errors on send are ignored: it just means the client has closed the receiving channel.
299299
let _ = error_sender.send(err.into()).await;
@@ -303,6 +303,16 @@ impl Connection {
303303
_ => (),
304304
}
305305
}
306+
307+
// Wait for Err(MqttState(ConnectionAborted))
308+
// to make sure the disconnect is effective
309+
loop {
310+
if (event_loop.poll().await).is_err() {
311+
info!(target: "MQTT", "Connection closed");
312+
break;
313+
}
314+
}
315+
306316
// No more messages will be forwarded to the client
307317
let _ = message_sender.close().await;
308318
let _ = error_sender.close().await;

crates/common/mqtt_channel/src/messages.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,10 @@ impl MqttMessage {
184184
}
185185
}
186186

187+
pub fn with_retain_flag(self, retain: bool) -> Self {
188+
Self { retain, ..self }
189+
}
190+
187191
/// The message payload
188192
pub fn payload(&self) -> &Payload {
189193
&self.payload.0

crates/common/mqtt_channel/src/session.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::Config;
22
use crate::MqttError;
3+
use log::error;
34
use log::warn;
45
use rumqttc::AsyncClient;
56
use rumqttc::ConnectReturnCode;
@@ -44,7 +45,7 @@ pub async fn init_session(config: &Config) -> Result<(), MqttError> {
4445
Err(err) => match err {
4546
rumqttc::ConnectionError::ConnectionRefused(ConnectReturnCode::Success) => {}
4647
_ => {
47-
warn!("{}", MqttError::from_connection_error(err));
48+
warn!(target: "MQTT", "{}", MqttError::from_connection_error(err));
4849
break;
4950
}
5051
},
@@ -85,7 +86,7 @@ pub async fn clear_session(config: &Config) -> Result<(), MqttError> {
8586
}
8687

8788
Err(err) => {
88-
eprintln!("Connection Error {}", err);
89+
error!(target: "MQTT", "Connection Error {}", err);
8990
break;
9091
}
9192
_ => (),

crates/common/tedge_config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ strum_macros = { workspace = true }
3434
tedge_config_macros = { workspace = true }
3535
tedge_utils = { workspace = true, features = ["timestamp"] }
3636
thiserror = { workspace = true }
37+
tokio = { workspace = true }
3738
toml = { workspace = true }
3839
tracing = { workspace = true }
3940
tracing-subscriber = { workspace = true }

0 commit comments

Comments
 (0)