@@ -55,7 +55,14 @@ pub type OperationTimeout = Timeout<OperationKey>;
55
55
pub type IdDownloadResult = ( String , DownloadResult ) ;
56
56
pub type IdDownloadRequest = ( String , DownloadRequest ) ;
57
57
58
- fan_in_message_type ! ( FirmwareInput [ MqttMessage , OperationTimeout ] : Debug ) ;
58
+ #[ derive( Debug ) ]
59
+ pub struct RequestForwardOutcome {
60
+ child_id : String ,
61
+ operation_id : String ,
62
+ result : Result < ( ) , FirmwareManagementError > ,
63
+ }
64
+
65
+ fan_in_message_type ! ( FirmwareInput [ MqttMessage , OperationTimeout , RequestForwardOutcome ] : Debug ) ;
59
66
60
67
pub struct FirmwareManagerActor {
61
68
input_receiver : LoggingReceiver < FirmwareInput > ,
@@ -69,11 +76,11 @@ impl Actor for FirmwareManagerActor {
69
76
"FirmwareManager"
70
77
}
71
78
72
- // This actor handles 2 kinds of messages from its peer actors:
79
+ // This actor handles 3 kinds of messages from its peer actors:
73
80
//
74
81
// 1. MQTT messages from the MqttActor for firmware update requests from the cloud and firmware update responses from the child devices
75
82
// 2. Operation timeouts from the TimerActor for requests for which the child devices don't respond within the timeout window
76
-
83
+ // 3. RequestForwardOutcome sent back by the background workers once the firmware request has been forwarded to the child device
77
84
async fn run ( mut self ) -> Result < ( ) , RuntimeError > {
78
85
self . resend_operations_to_child_device ( ) . await ?;
79
86
// TODO: We need a dedicated actor to publish 500 later.
@@ -88,6 +95,18 @@ impl Actor for FirmwareManagerActor {
88
95
FirmwareInput :: OperationTimeout ( timeout) => {
89
96
self . process_operation_timeout ( timeout) . await ?;
90
97
}
98
+ FirmwareInput :: RequestForwardOutcome ( outcome) => {
99
+ if let Err ( err) = outcome. result {
100
+ self . fail_operation_in_cloud (
101
+ & outcome. child_id ,
102
+ Some ( & outcome. operation_id ) ,
103
+ & err. to_string ( ) ,
104
+ )
105
+ . await ?;
106
+ }
107
+ // The firmware has been downloaded and the request forwarded to the child device.
108
+ // Simply waits for a response from the child device (over MQTT) or a timeout.
109
+ }
91
110
}
92
111
}
93
112
Ok ( ( ) )
@@ -102,6 +121,7 @@ impl FirmwareManagerActor {
102
121
jwt_retriever : JwtRetriever ,
103
122
timer_sender : DynSender < SetTimeout < OperationKey > > ,
104
123
download_sender : ClientMessageBox < IdDownloadRequest , IdDownloadResult > ,
124
+ progress_sender : DynSender < RequestForwardOutcome > ,
105
125
) -> Self {
106
126
Self {
107
127
input_receiver,
@@ -111,6 +131,7 @@ impl FirmwareManagerActor {
111
131
jwt_retriever,
112
132
download_sender,
113
133
timer_sender,
134
+ progress_sender,
114
135
} ,
115
136
active_child_ops : HashMap :: new ( ) ,
116
137
}
@@ -187,7 +208,7 @@ impl FirmwareManagerActor {
187
208
return Ok ( ( ) ) ;
188
209
}
189
210
190
- let child_id = smartrest_request. device . as_str ( ) ;
211
+ let child_id = smartrest_request. device . clone ( ) ;
191
212
192
213
if let Err ( err) = self
193
214
. validate_same_request_in_progress ( smartrest_request. clone ( ) )
@@ -199,7 +220,7 @@ impl FirmwareManagerActor {
199
220
Ok ( ( ) )
200
221
}
201
222
_ => {
202
- self . fail_operation_in_cloud ( child_id, None , & err. to_string ( ) )
223
+ self . fail_operation_in_cloud ( & child_id, None , & err. to_string ( ) )
203
224
. await ?;
204
225
Err ( err)
205
226
}
@@ -208,19 +229,29 @@ impl FirmwareManagerActor {
208
229
209
230
// Addressing the new firmware operation to further step.
210
231
let op_id = nanoid ! ( ) ;
211
- if let Err ( err) = self
212
- . worker
213
- . handle_firmware_download_request_child_device (
214
- smartrest_request. clone ( ) ,
215
- op_id. as_str ( ) ,
216
- )
217
- . await
218
- {
219
- self . fail_operation_in_cloud ( child_id, Some ( & op_id) , & err. to_string ( ) )
220
- . await ?;
221
- }
232
+ let operation_key = OperationKey :: new ( & child_id, & op_id) ;
233
+
234
+ let mut worker = self . worker . clone ( ) ;
235
+ tokio:: spawn ( async move {
236
+ let result = worker
237
+ . handle_firmware_download_request_child_device (
238
+ smartrest_request. clone ( ) ,
239
+ op_id. as_str ( ) ,
240
+ )
241
+ . await ;
242
+ if let Err ( err) = worker
243
+ . progress_sender
244
+ . send ( RequestForwardOutcome {
245
+ child_id,
246
+ operation_id : op_id,
247
+ result,
248
+ } )
249
+ . await
250
+ {
251
+ error ! ( "Fail to forward operation progress due to: {err}" ) ;
252
+ }
253
+ } ) ;
222
254
223
- let operation_key = OperationKey :: new ( child_id, & op_id) ;
224
255
self . active_child_ops
225
256
. insert ( operation_key, ActiveOperationState :: Pending ) ;
226
257
@@ -231,8 +262,10 @@ impl FirmwareManagerActor {
231
262
impl FirmwareManagerWorker {
232
263
// Check if the firmware file is already in cache.
233
264
// If yes, publish a firmware request to child device with that firmware in the cache.
234
- // Otherwise, send a download request to the DownloaderActor and return immediately without waiting for the download to complete so that other requests/responses can be processed while the download is in progress.
235
- // The download will be performed by the DownloaderActor asynchronously and the response will be processed by this actor later on, in the `run` method.
265
+ // Otherwise, send a download request to the DownloaderActor awaiting for the download to complete.
266
+ //
267
+ // This method has to be spawn in a task
268
+ // so other requests/responses can be processed while the download is in progress.
236
269
async fn handle_firmware_download_request_child_device (
237
270
& mut self ,
238
271
smartrest_request : SmartRestFirmwareRequest ,
@@ -290,7 +323,7 @@ impl FirmwareManagerWorker {
290
323
Ok ( ( ) )
291
324
}
292
325
293
- // This function is called on receiving a DownloadResult from the DownloaderActor or when the firmware file is already available in the cache .
326
+ // This function is called on receiving a DownloadResult from the DownloaderActor.
294
327
// If the download is successful, publish a firmware request to child device with it
295
328
// Otherwise, fail the operation in the cloud
296
329
async fn process_downloaded_firmware (
@@ -772,6 +805,7 @@ struct FirmwareManagerWorker {
772
805
jwt_retriever : JwtRetriever ,
773
806
download_sender : ClientMessageBox < IdDownloadRequest , IdDownloadResult > ,
774
807
timer_sender : DynSender < SetTimeout < OperationKey > > ,
808
+ progress_sender : DynSender < RequestForwardOutcome > ,
775
809
}
776
810
777
811
impl Clone for FirmwareManagerWorker {
@@ -782,6 +816,7 @@ impl Clone for FirmwareManagerWorker {
782
816
jwt_retriever : self . jwt_retriever . clone ( ) ,
783
817
download_sender : self . download_sender . clone ( ) ,
784
818
timer_sender : self . timer_sender . sender_clone ( ) ,
819
+ progress_sender : self . progress_sender . sender_clone ( ) ,
785
820
}
786
821
}
787
822
}
0 commit comments