Skip to content

Commit defa13f

Browse files
committed
Update c8y-firmare-manager to await download results
This is an intermediate step. The next one will be to spawn a task to handle the download and subsequent steps so a the manager is not blocked during that time. The failing test (handle_child_response_while_busy_downloading) is a witness of this missing requirement. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent d3abd9a commit defa13f

File tree

3 files changed

+40
-46
lines changed

3 files changed

+40
-46
lines changed

crates/extensions/c8y_firmware_manager/src/actor.rs

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::os::unix::fs as unix_fs;
1919
use std::path::Path;
2020
use tedge_actors::fan_in_message_type;
2121
use tedge_actors::Actor;
22+
use tedge_actors::ClientMessageBox;
2223
use tedge_actors::DynSender;
2324
use tedge_actors::LoggingReceiver;
2425
use tedge_actors::MessageReceiver;
@@ -54,13 +55,11 @@ pub type OperationTimeout = Timeout<OperationKey>;
5455
pub type IdDownloadResult = (String, DownloadResult);
5556
pub type IdDownloadRequest = (String, DownloadRequest);
5657

57-
fan_in_message_type!(FirmwareInput[MqttMessage, OperationTimeout, IdDownloadResult] : Debug);
58-
fan_in_message_type!(FirmwareOutput[MqttMessage, OperationSetTimeout, IdDownloadRequest] : Debug);
58+
fan_in_message_type!(FirmwareInput[MqttMessage, OperationTimeout] : Debug);
5959

6060
pub struct FirmwareManagerActor {
6161
config: FirmwareManagerConfig,
6262
active_child_ops: HashMap<OperationKey, ActiveOperationState>,
63-
reqs_pending_download: HashMap<String, SmartRestFirmwareRequest>,
6463
message_box: FirmwareManagerMessageBox,
6564
}
6665

@@ -90,9 +89,6 @@ impl Actor for FirmwareManagerActor {
9089
FirmwareInput::OperationTimeout(timeout) => {
9190
self.process_operation_timeout(timeout).await?;
9291
}
93-
FirmwareInput::IdDownloadResult((id, result)) => {
94-
self.process_downloaded_firmware(&id, result).await?
95-
}
9692
}
9793
}
9894
Ok(())
@@ -104,7 +100,6 @@ impl FirmwareManagerActor {
104100
Self {
105101
config,
106102
active_child_ops: HashMap::new(),
107-
reqs_pending_download: HashMap::new(),
108103
message_box,
109104
}
110105
}
@@ -262,12 +257,13 @@ impl FirmwareManagerActor {
262257
DownloadRequest::new(firmware_url, cache_file_path.as_std_path())
263258
};
264259

265-
self.message_box
260+
let (_, download_result) = self
261+
.message_box
266262
.download_sender
267-
.send((operation_id.to_string(), download_request))
263+
.await_response((operation_id.to_string(), download_request))
264+
.await?;
265+
self.process_downloaded_firmware(operation_id, smartrest_request, download_result)
268266
.await?;
269-
self.reqs_pending_download
270-
.insert(operation_id.to_string(), smartrest_request);
271267
}
272268
Ok(())
273269
}
@@ -278,39 +274,32 @@ impl FirmwareManagerActor {
278274
async fn process_downloaded_firmware(
279275
&mut self,
280276
operation_id: &str,
277+
smartrest_request: SmartRestFirmwareRequest,
281278
download_result: DownloadResult,
282279
) -> Result<(), FirmwareManagementError> {
283-
if let Some(smartrest_request) = self.reqs_pending_download.remove(operation_id) {
284-
let child_id = smartrest_request.device.clone();
285-
match download_result {
286-
Ok(response) => {
287-
if let Err(err) =
288-
// Publish a firmware update request to child device.
289-
self
290-
.handle_firmware_update_request_with_downloaded_file(
291-
smartrest_request,
292-
operation_id,
293-
&response.file_path,
294-
)
295-
.await
296-
{
297-
self.fail_operation_in_cloud(
298-
&child_id,
299-
Some(operation_id),
300-
&err.to_string(),
280+
let child_id = smartrest_request.device.clone();
281+
match download_result {
282+
Ok(response) => {
283+
if let Err(err) =
284+
// Publish a firmware update request to child device.
285+
self
286+
.handle_firmware_update_request_with_downloaded_file(
287+
smartrest_request,
288+
operation_id,
289+
&response.file_path,
301290
)
302-
.await?;
303-
}
304-
}
305-
Err(err) => {
306-
let firmware_url = smartrest_request.url;
307-
let failure_reason = format!("Download from {firmware_url} failed with {err}");
308-
self.fail_operation_in_cloud(&child_id, Some(operation_id), &failure_reason)
291+
.await
292+
{
293+
self.fail_operation_in_cloud(&child_id, Some(operation_id), &err.to_string())
309294
.await?;
310295
}
311296
}
312-
} else {
313-
error!("Unexpected: Download completed for unknown operation: {operation_id}");
297+
Err(err) => {
298+
let firmware_url = smartrest_request.url;
299+
let failure_reason = format!("Download from {firmware_url} failed with {err}");
300+
self.fail_operation_in_cloud(&child_id, Some(operation_id), &failure_reason)
301+
.await?;
302+
}
314303
}
315304
Ok(())
316305
}
@@ -743,7 +732,7 @@ pub struct FirmwareManagerMessageBox {
743732
mqtt_publisher: DynSender<MqttMessage>,
744733
jwt_retriever: JwtRetriever,
745734
timer_sender: DynSender<SetTimeout<OperationKey>>,
746-
download_sender: DynSender<IdDownloadRequest>,
735+
download_sender: ClientMessageBox<IdDownloadRequest, IdDownloadResult>,
747736
}
748737

749738
impl FirmwareManagerMessageBox {
@@ -752,7 +741,7 @@ impl FirmwareManagerMessageBox {
752741
mqtt_publisher: DynSender<MqttMessage>,
753742
jwt_retriever: JwtRetriever,
754743
timer_sender: DynSender<SetTimeout<OperationKey>>,
755-
download_sender: DynSender<IdDownloadRequest>,
744+
download_sender: ClientMessageBox<IdDownloadRequest, IdDownloadResult>,
756745
) -> Self {
757746
Self {
758747
input_receiver,

crates/extensions/c8y_firmware_manager/src/lib.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use c8y_http_proxy::credentials::JwtRetriever;
1919
pub use config::*;
2020
use tedge_actors::futures::channel::mpsc;
2121
use tedge_actors::Builder;
22+
use tedge_actors::ClientMessageBox;
2223
use tedge_actors::DynSender;
2324
use tedge_actors::LinkError;
2425
use tedge_actors::LoggingReceiver;
@@ -40,7 +41,7 @@ pub struct FirmwareManagerBuilder {
4041
mqtt_publisher: DynSender<MqttMessage>,
4142
jwt_retriever: JwtRetriever,
4243
timer_sender: DynSender<SetTimeout<OperationKey>>,
43-
download_sender: DynSender<IdDownloadRequest>,
44+
download_sender: ClientMessageBox<IdDownloadRequest, IdDownloadResult>,
4445
signal_sender: mpsc::Sender<RuntimeRequest>,
4546
}
4647

@@ -50,7 +51,11 @@ impl FirmwareManagerBuilder {
5051
mqtt_actor: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
5152
jwt_actor: &mut impl Service<(), JwtResult>,
5253
timer_actor: &mut impl ServiceProvider<OperationSetTimeout, OperationTimeout, NoConfig>,
53-
downloader_actor: &mut impl ServiceProvider<IdDownloadRequest, IdDownloadResult, NoConfig>,
54+
downloader_actor: &mut impl ServiceProvider<
55+
RequestEnvelope<IdDownloadRequest, IdDownloadResult>,
56+
NoMessage,
57+
ReplyToRequester,
58+
>,
5459
) -> Result<FirmwareManagerBuilder, FileError> {
5560
Self::init(&config.data_dir)?;
5661

@@ -66,7 +71,7 @@ impl FirmwareManagerBuilder {
6671
mqtt_actor.connect_consumer(Self::subscriptions(), input_sender.clone().into());
6772
let jwt_retriever = JwtRetriever::new(jwt_actor);
6873
let timer_sender = timer_actor.connect_consumer(NoConfig, input_sender.clone().into());
69-
let download_sender = downloader_actor.connect_consumer(NoConfig, input_sender.into());
74+
let download_sender = ClientMessageBox::new(downloader_actor);
7075
Ok(Self {
7176
config,
7277
input_receiver,

crates/extensions/c8y_firmware_manager/src/tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ async fn spawn_firmware_manager(
699699
TimedMessageBox<SimpleMessageBox<MqttMessage, MqttMessage>>,
700700
TimedMessageBox<FakeServerBox<JwtRequest, JwtResult>>,
701701
SimpleMessageBox<OperationSetTimeout, OperationTimeout>,
702-
TimedMessageBox<SimpleMessageBox<IdDownloadRequest, IdDownloadResult>>,
702+
TimedMessageBox<FakeServerBox<IdDownloadRequest, IdDownloadResult>>,
703703
),
704704
DynError,
705705
> {
@@ -726,8 +726,8 @@ async fn spawn_firmware_manager(
726726
let mut jwt_builder: FakeServerBoxBuilder<JwtRequest, JwtResult> = FakeServerBox::builder();
727727
let mut timer_builder: SimpleMessageBoxBuilder<OperationSetTimeout, OperationTimeout> =
728728
SimpleMessageBoxBuilder::new("Timer", 5);
729-
let mut downloader_builder: SimpleMessageBoxBuilder<IdDownloadRequest, IdDownloadResult> =
730-
SimpleMessageBoxBuilder::new("Downloader", 5);
729+
let mut downloader_builder: FakeServerBoxBuilder<IdDownloadRequest, IdDownloadResult> =
730+
FakeServerBox::builder();
731731

732732
let firmware_manager_builder = FirmwareManagerBuilder::try_new(
733733
config,

0 commit comments

Comments
 (0)