Skip to content

Commit 1d8ca94

Browse files
committed
Move out from FirmwareManagerActor to FirmwareManagerWorker all stateless methods
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent ea877dd commit 1d8ca94

File tree

2 files changed

+77
-73
lines changed

2 files changed

+77
-73
lines changed

crates/extensions/c8y_firmware_manager/src/actor.rs

Lines changed: 70 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,15 @@ impl Actor for FirmwareManagerActor {
6969
"FirmwareManager"
7070
}
7171

72-
// This actor handles 3 kinds of messages from its peer actors:
72+
// This actor handles 2 kinds of messages from its peer actors:
7373
//
7474
// 1. MQTT messages from the MqttActor for firmware update requests from the cloud and firmware update responses from the child devices
7575
// 2. Operation timeouts from the TimerActor for requests for which the child devices don't respond within the timeout window
76-
// 3. Download results from the DownloaderActor for firmware download requests
7776

7877
async fn run(mut self) -> Result<(), RuntimeError> {
7978
self.resend_operations_to_child_device().await?;
8079
// TODO: We need a dedicated actor to publish 500 later.
81-
self.get_pending_operations_from_cloud().await?;
80+
self.worker.get_pending_operations_from_cloud().await?;
8281

8382
info!("Ready to serve firmware requests.");
8483
while let Some(event) = self.input_receiver.recv().await {
@@ -207,23 +206,29 @@ impl FirmwareManagerActor {
207206
};
208207
}
209208

209+
// Addressing the new firmware operation to further step.
210210
let op_id = nanoid!();
211-
if let Err(err) =
212-
// Addressing the new firmware operation to further step.
213-
self
214-
.handle_firmware_download_request_child_device(
215-
smartrest_request.clone(),
216-
op_id.as_str(),
217-
)
218-
.await
211+
if let Err(err) = self
212+
.worker
213+
.handle_firmware_download_request_child_device(
214+
smartrest_request.clone(),
215+
op_id.as_str(),
216+
)
217+
.await
219218
{
220219
self.fail_operation_in_cloud(child_id, Some(&op_id), &err.to_string())
221220
.await?;
222221
}
223222

223+
let operation_key = OperationKey::new(child_id, &op_id);
224+
self.active_child_ops
225+
.insert(operation_key, ActiveOperationState::Pending);
226+
224227
Ok(())
225228
}
229+
}
226230

231+
impl FirmwareManagerWorker {
227232
// Check if the firmware file is already in cache.
228233
// If yes, publish a firmware request to child device with that firmware in the cache.
229234
// Otherwise, send a download request to the DownloaderActor and return immediately without waiting for the download to complete so that other requests/responses can be processed while the download is in progress.
@@ -236,7 +241,6 @@ impl FirmwareManagerActor {
236241
let firmware_url = smartrest_request.url.as_str();
237242
let file_cache_key = digest(firmware_url);
238243
let cache_file_path = self
239-
.worker
240244
.config
241245
.validate_and_get_cache_dir_path()?
242246
.join(&file_cache_key);
@@ -261,13 +265,12 @@ impl FirmwareManagerActor {
261265

262266
// Send a request to the Downloader to download the file asynchronously.
263267
let download_request = if self
264-
.worker
265268
.config
266269
.c8y_end_point
267270
.maybe_tenant_url(firmware_url)
268271
.is_some()
269272
{
270-
if let Ok(token) = self.worker.jwt_retriever.await_response(()).await? {
273+
if let Ok(token) = self.jwt_retriever.await_response(()).await? {
271274
DownloadRequest::new(firmware_url, cache_file_path.as_std_path())
272275
.with_auth(Auth::new_bearer(&token))
273276
} else {
@@ -278,7 +281,6 @@ impl FirmwareManagerActor {
278281
};
279282

280283
let (_, download_result) = self
281-
.worker
282284
.download_sender
283285
.await_response((operation_id.to_string(), download_request))
284286
.await?;
@@ -297,28 +299,21 @@ impl FirmwareManagerActor {
297299
smartrest_request: SmartRestFirmwareRequest,
298300
download_result: DownloadResult,
299301
) -> Result<(), FirmwareManagementError> {
300-
let child_id = smartrest_request.device.clone();
301302
match download_result {
302303
Ok(response) => {
303-
if let Err(err) =
304-
// Publish a firmware update request to child device.
305-
self
306-
.handle_firmware_update_request_with_downloaded_file(
307-
smartrest_request,
308-
operation_id,
309-
&response.file_path,
310-
)
311-
.await
312-
{
313-
self.fail_operation_in_cloud(&child_id, Some(operation_id), &err.to_string())
314-
.await?;
315-
}
304+
// Publish a firmware update request to child device.
305+
self.handle_firmware_update_request_with_downloaded_file(
306+
smartrest_request,
307+
operation_id,
308+
&response.file_path,
309+
)
310+
.await?
316311
}
317312
Err(err) => {
318-
let firmware_url = smartrest_request.url;
319-
let failure_reason = format!("Download from {firmware_url} failed with {err}");
320-
self.fail_operation_in_cloud(&child_id, Some(operation_id), &failure_reason)
321-
.await?;
313+
return Err(FirmwareManagementError::FromDownloadError {
314+
firmware_url: smartrest_request.url,
315+
err,
316+
});
322317
}
323318
}
324319
Ok(())
@@ -335,7 +330,7 @@ impl FirmwareManagerActor {
335330
let child_id = smartrest_request.device.as_str();
336331
let firmware_url = smartrest_request.url.as_str();
337332
let file_cache_key = digest(firmware_url);
338-
let cache_dir_path = self.worker.config.validate_and_get_cache_dir_path()?;
333+
let cache_dir_path = self.config.validate_and_get_cache_dir_path()?;
339334
let cache_file_path = cache_dir_path.join(&file_cache_key);
340335

341336
// If the downloaded firmware is not already in the cache, move it there
@@ -353,7 +348,7 @@ impl FirmwareManagerActor {
353348
self.create_file_transfer_symlink(child_id, &file_cache_key, &cache_file_path)?;
354349
let file_transfer_url = format!(
355350
"http://{}/tedge/file-transfer/{child_id}/firmware_update/{file_cache_key}",
356-
&self.worker.config.local_http_host
351+
&self.config.local_http_host
357352
);
358353
let file_sha256 = try_digest(symlink_path.as_path())?;
359354

@@ -368,27 +363,23 @@ impl FirmwareManagerActor {
368363
attempt: 1,
369364
};
370365

371-
operation_entry.create_status_file(self.worker.config.data_dir.firmware_dir())?;
366+
operation_entry.create_status_file(self.config.data_dir.firmware_dir())?;
372367

373368
self.publish_firmware_update_request(operation_entry)
374369
.await?;
375370

376371
let operation_key = OperationKey::new(child_id, operation_id);
377-
self.active_child_ops
378-
.insert(operation_key.clone(), ActiveOperationState::Pending);
379372

380373
// Start timer
381-
self.worker
382-
.timer_sender
383-
.send(SetTimeout::new(
384-
self.worker.config.timeout_sec,
385-
operation_key,
386-
))
374+
self.timer_sender
375+
.send(SetTimeout::new(self.config.timeout_sec, operation_key))
387376
.await?;
388377

389378
Ok(())
390379
}
380+
}
391381

382+
impl FirmwareManagerActor {
392383
// This is the start point function when receiving a firmware response from child device.
393384
async fn handle_child_device_firmware_operation_response(
394385
&mut self,
@@ -441,7 +432,7 @@ impl FirmwareManagerActor {
441432
match current_operation_state {
442433
Some(&ActiveOperationState::Executing) => {}
443434
Some(&ActiveOperationState::Pending) => {
444-
self.publish_c8y_executing_message(&child_id).await?;
435+
self.worker.publish_c8y_executing_message(&child_id).await?;
445436
self.active_child_ops
446437
.insert(operation_key.clone(), ActiveOperationState::Executing);
447438
}
@@ -462,20 +453,24 @@ impl FirmwareManagerActor {
462453
let operation_entry =
463454
FirmwareOperationEntry::read_from_file(status_file_path.as_path())?;
464455

465-
self.publish_c8y_installed_firmware_message(&operation_entry)
456+
self.worker
457+
.publish_c8y_installed_firmware_message(&operation_entry)
458+
.await?;
459+
self.worker
460+
.publish_c8y_successful_message(&child_id)
466461
.await?;
467-
self.publish_c8y_successful_message(&child_id).await?;
468462

469-
self.remove_status_file(operation_id)?;
463+
self.worker.remove_status_file(operation_id)?;
470464
self.remove_entry_from_active_operations(&operation_key);
471465
}
472466
OperationStatus::Failed => {
473-
self.publish_c8y_failed_message(
474-
&child_id,
475-
"No failure reason provided by child device.",
476-
)
477-
.await?;
478-
self.remove_status_file(operation_id)?;
467+
self.worker
468+
.publish_c8y_failed_message(
469+
&child_id,
470+
"No failure reason provided by child device.",
471+
)
472+
.await?;
473+
self.worker.remove_status_file(operation_id)?;
479474
self.remove_entry_from_active_operations(&operation_key);
480475
}
481476
OperationStatus::Executing => {
@@ -542,7 +537,8 @@ impl FirmwareManagerActor {
542537
);
543538

544539
new_operation_entry.overwrite_file(&firmware_dir_path)?;
545-
self.publish_firmware_update_request(new_operation_entry)
540+
self.worker
541+
.publish_firmware_update_request(new_operation_entry)
546542
.await?;
547543
// Add operation to hashmap
548544
self.active_child_ops
@@ -585,16 +581,17 @@ impl FirmwareManagerActor {
585581
) -> Result<(), FirmwareManagementError> {
586582
error!("{}", failure_reason);
587583
let op_state = if let Some(operation_id) = op_id {
588-
self.remove_status_file(operation_id)?;
584+
self.worker.remove_status_file(operation_id)?;
589585
self.remove_entry_from_active_operations(&OperationKey::new(child_id, operation_id))
590586
} else {
591587
ActiveOperationState::Pending
592588
};
593589

594590
if op_state == ActiveOperationState::Pending {
595-
self.publish_c8y_executing_message(child_id).await?;
591+
self.worker.publish_c8y_executing_message(child_id).await?;
596592
}
597-
self.publish_c8y_failed_message(child_id, failure_reason)
593+
self.worker
594+
.publish_c8y_failed_message(child_id, failure_reason)
598595
.await?;
599596

600597
Ok(())
@@ -616,7 +613,8 @@ impl FirmwareManagerActor {
616613
OperationKey::new(&operation_entry.child_id, &operation_entry.operation_id);
617614

618615
operation_entry.overwrite_file(&firmware_dir_path)?;
619-
self.publish_firmware_update_request(operation_entry)
616+
self.worker
617+
.publish_firmware_update_request(operation_entry)
620618
.await?;
621619
// Add operation to hashmap
622620
self.active_child_ops
@@ -633,10 +631,11 @@ impl FirmwareManagerActor {
633631
}
634632
Ok(())
635633
}
634+
}
636635

636+
impl FirmwareManagerWorker {
637637
fn remove_status_file(&mut self, operation_id: &str) -> Result<(), FirmwareManagementError> {
638638
let status_file_path = self
639-
.worker
640639
.config
641640
.validate_and_get_firmware_dir_path()?
642641
.join(operation_id);
@@ -652,7 +651,7 @@ impl FirmwareManagerActor {
652651
) -> Result<(), FirmwareManagementError> {
653652
let mqtt_message: MqttMessage =
654653
FirmwareOperationRequest::from(operation_entry.clone()).try_into()?;
655-
self.worker.mqtt_publisher.send(mqtt_message).await?;
654+
self.mqtt_publisher.send(mqtt_message).await?;
656655
info!(
657656
"Firmware update request is sent. operation_id={}, child={}",
658657
operation_entry.operation_id, operation_entry.child_id
@@ -671,7 +670,7 @@ impl FirmwareManagerActor {
671670
&c8y_child_topic,
672671
DownloadFirmwareStatusMessage::status_executing(),
673672
);
674-
self.worker.mqtt_publisher.send(executing_msg).await?;
673+
self.mqtt_publisher.send(executing_msg).await?;
675674
Ok(())
676675
}
677676

@@ -686,7 +685,7 @@ impl FirmwareManagerActor {
686685
&c8y_child_topic,
687686
DownloadFirmwareStatusMessage::status_successful(None),
688687
);
689-
self.worker.mqtt_publisher.send(successful_msg).await?;
688+
self.mqtt_publisher.send(successful_msg).await?;
690689
Ok(())
691690
}
692691

@@ -702,7 +701,7 @@ impl FirmwareManagerActor {
702701
&c8y_child_topic,
703702
DownloadFirmwareStatusMessage::status_failed(failure_reason),
704703
);
705-
self.worker.mqtt_publisher.send(failed_msg).await?;
704+
self.mqtt_publisher.send(failed_msg).await?;
706705
Ok(())
707706
}
708707

@@ -719,13 +718,12 @@ impl FirmwareManagerActor {
719718
);
720719
let installed_firmware_message =
721720
MqttMessage::new(&c8y_child_topic, installed_firmware_payload);
722-
self.worker
723-
.mqtt_publisher
724-
.send(installed_firmware_message)
725-
.await?;
721+
self.mqtt_publisher.send(installed_firmware_message).await?;
726722
Ok(())
727723
}
724+
}
728725

726+
impl FirmwareManagerActor {
729727
fn remove_entry_from_active_operations(
730728
&mut self,
731729
operation_key: &OperationKey,
@@ -736,18 +734,17 @@ impl FirmwareManagerActor {
736734
ActiveOperationState::Pending
737735
}
738736
}
737+
}
739738

739+
impl FirmwareManagerWorker {
740740
// The symlink path should be <tedge-data-dir>/file-transfer/<child-id>/firmware_update/<file_cache_key>
741741
fn create_file_transfer_symlink(
742742
&self,
743743
child_id: &str,
744744
file_cache_key: &str,
745745
original_file_path: impl AsRef<Path>,
746746
) -> Result<Utf8PathBuf, FirmwareManagementError> {
747-
let file_transfer_dir_path = self
748-
.worker
749-
.config
750-
.validate_and_get_file_transfer_dir_path()?;
747+
let file_transfer_dir_path = self.config.validate_and_get_file_transfer_dir_path()?;
751748

752749
let symlink_dir_path = file_transfer_dir_path
753750
.join(child_id)
@@ -764,7 +761,7 @@ impl FirmwareManagerActor {
764761
// Candidate to be removed since another actor should be in charge of this.
765762
async fn get_pending_operations_from_cloud(&mut self) -> Result<(), FirmwareManagementError> {
766763
let message = MqttMessage::new(&C8yTopic::SmartRestResponse.to_topic()?, "500");
767-
self.worker.mqtt_publisher.send(message).await?;
764+
self.mqtt_publisher.send(message).await?;
768765
Ok(())
769766
}
770767
}

crates/extensions/c8y_firmware_manager/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use tedge_actors::RuntimeError;
2+
use tedge_api::DownloadError;
23

34
#[derive(thiserror::Error, Debug)]
45
pub enum FirmwareManagementError {
@@ -42,6 +43,12 @@ pub enum FirmwareManagementError {
4243

4344
#[error(transparent)]
4445
FromMqttError(#[from] tedge_mqtt_ext::MqttError),
46+
47+
#[error("Download from {firmware_url} failed with {err}")]
48+
FromDownloadError {
49+
firmware_url: String,
50+
err: DownloadError,
51+
},
4552
}
4653

4754
impl From<FirmwareManagementError> for RuntimeError {

0 commit comments

Comments
 (0)