Skip to content

Commit b843d72

Browse files
Merge pull request #3309 from didier-wenzek/refactor/simplify_command_state_updates
refactor: Simplify command state update methods
2 parents b48c19c + b9fea1b commit b843d72

File tree

6 files changed

+59
-52
lines changed

6 files changed

+59
-52
lines changed

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl WorkflowActor {
159159
Ok(Some(new_state)) => {
160160
self.persist_command_board().await?;
161161
if new_state.is_init() {
162-
self.process_command_update(new_state.set_log_path(&log_file.path))
162+
self.process_command_update(new_state.with_log_path(&log_file.path))
163163
.await?;
164164
}
165165
}

crates/core/tedge_agent/src/operation_workflows/persist.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,7 @@ impl WorkflowRepository {
261261

262262
/// Copy the workflow definition file to the persisted state directory,
263263
/// unless this has already been done.
264-
async fn persist_workflow_definition(
265-
&mut self,
266-
operation: &OperationName,
267-
version: &WorkflowVersion,
268-
) {
264+
async fn persist_workflow_definition(&mut self, operation: &str, version: &str) {
269265
if version_is_builtin(version) {
270266
return;
271267
}
@@ -279,16 +275,12 @@ impl WorkflowRepository {
279275
if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await {
280276
error!("Fail to persist a copy of {source} as {target}: {err}");
281277
} else {
282-
self.in_use_copies.insert(version.clone(), 1);
278+
self.in_use_copies.insert(version.to_owned(), 1);
283279
}
284280
}
285281
}
286282

287-
fn workflow_copy_path(
288-
&self,
289-
operation: &OperationName,
290-
version: &WorkflowVersion,
291-
) -> Utf8PathBuf {
283+
fn workflow_copy_path(&self, operation: &str, version: &str) -> Utf8PathBuf {
292284
let filename = format!("{operation}-{version}");
293285
self.state_dir.join(filename).with_extension("toml")
294286
}
@@ -305,7 +297,7 @@ impl WorkflowRepository {
305297
}
306298
}
307299

308-
async fn release_in_use_copy(&mut self, operation: &OperationName, version: &WorkflowVersion) {
300+
async fn release_in_use_copy(&mut self, operation: &str, version: &str) {
309301
if version_is_builtin(version) {
310302
return;
311303
}
@@ -359,7 +351,7 @@ impl WorkflowRepository {
359351
if let Some(current_version) = self.workflows.use_current_version(&operation) {
360352
self.persist_workflow_definition(&operation, &current_version)
361353
.await;
362-
*command = command.clone().set_workflow_version(&current_version);
354+
command.set_workflow_version(&current_version);
363355
}
364356
}
365357
}
@@ -410,14 +402,14 @@ impl WorkflowRepository {
410402
operation: &OperationType,
411403
command_state: GenericCommandState,
412404
) -> Result<Option<GenericCommandState>, WorkflowExecutionError> {
405+
let operation_name = operation.name();
413406
if command_state.is_init() {
414407
// A new command instance must use the latest on-disk version of the operation workflow
415-
self.load_latest_version(&operation.to_string()).await;
408+
self.load_latest_version(&operation_name).await;
416409
} else if command_state.is_finished() {
417410
// Clear the cache if this happens to be the latest instance using that version of the workflow
418411
if let Some(version) = command_state.workflow_version() {
419-
self.release_in_use_copy(&operation.to_string(), &version)
420-
.await;
412+
self.release_in_use_copy(&operation_name, version).await;
421413
}
422414
}
423415

@@ -429,7 +421,7 @@ impl WorkflowRepository {
429421

430422
Some(new_state) if new_state.is_init() => {
431423
if let Some(version) = new_state.workflow_version() {
432-
self.persist_workflow_definition(&operation.to_string(), &version)
424+
self.persist_workflow_definition(&operation_name, version)
433425
.await;
434426
}
435427
Ok(Some(new_state))

crates/core/tedge_api/src/mqtt_topics.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -737,12 +737,6 @@ impl<'a> From<&'a str> for OperationType {
737737
}
738738
}
739739

740-
impl From<&OperationType> for String {
741-
fn from(value: &OperationType) -> Self {
742-
format!("{value}")
743-
}
744-
}
745-
746740
impl Display for OperationType {
747741
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
748742
match self {
@@ -760,6 +754,12 @@ impl Display for OperationType {
760754
}
761755
}
762756

757+
impl OperationType {
758+
pub fn name(&self) -> String {
759+
format!("{self}")
760+
}
761+
}
762+
763763
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
764764
pub enum ChannelError {
765765
#[error("Channel needs to have at least 2 segments")]

crates/core/tedge_api/src/workflow/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub type WorkflowVersion = String;
3232

3333
const BUILT_IN: &str = "builtin";
3434

35-
pub fn version_is_builtin(version: &WorkflowVersion) -> bool {
35+
pub fn version_is_builtin(version: &str) -> bool {
3636
version == BUILT_IN
3737
}
3838

@@ -342,7 +342,7 @@ impl OperationWorkflow {
342342
self.states
343343
.get(&command_state.status)
344344
.ok_or_else(|| WorkflowExecutionError::UnknownStep {
345-
operation: (&self.operation).into(),
345+
operation: self.operation.name(),
346346
step: command_state.status.clone(),
347347
})
348348
.map(|action| action.inject_state(command_state))

crates/core/tedge_api/src/workflow/state.rs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,18 @@ impl GenericCommandState {
186186
}
187187
}
188188

189-
pub fn update_with_key_value(self, key: &str, val: &str) -> Self {
190-
self.update_with_json(json!({ key: val }))
189+
pub fn with_key_value(mut self, key: &str, val: &str) -> Self {
190+
self.set_key_value(key, val);
191+
self
192+
}
193+
194+
fn set_key_value(&mut self, key: &str, val: &str) {
195+
if let Some(o) = self.payload.as_object_mut() {
196+
o.insert(key.into(), val.into());
197+
}
198+
if key == STATUS {
199+
self.status = val.to_owned();
200+
}
191201
}
192202

193203
pub fn get_log_path(&self) -> Option<Utf8PathBuf> {
@@ -197,19 +207,28 @@ impl GenericCommandState {
197207
.map(Utf8PathBuf::from)
198208
}
199209

200-
pub fn set_log_path<P: AsRef<Utf8Path>>(self, path: P) -> Self {
201-
self.update_with_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str())
210+
pub fn with_log_path<P: AsRef<Utf8Path>>(mut self, path: P) -> Self {
211+
self.set_log_path(path);
212+
self
202213
}
203214

204-
pub fn workflow_version(&self) -> Option<String> {
215+
pub fn set_log_path<P: AsRef<Utf8Path>>(&mut self, path: P) {
216+
self.set_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str())
217+
}
218+
219+
pub fn workflow_version(&self) -> Option<&str> {
205220
self.payload
206221
.get(OP_WORKFLOW_VERSION_KEY)
207222
.and_then(|val| val.as_str())
208-
.map(|str| str.to_string())
209223
}
210224

211-
pub fn set_workflow_version(self, version: &str) -> Self {
212-
self.update_with_key_value(OP_WORKFLOW_VERSION_KEY, version)
225+
pub fn with_workflow_version(mut self, version: &str) -> Self {
226+
self.set_workflow_version(version);
227+
self
228+
}
229+
230+
pub fn set_workflow_version(&mut self, version: &str) {
231+
self.set_key_value(OP_WORKFLOW_VERSION_KEY, version)
213232
}
214233

215234
/// Update the command state with the outcome of a script

crates/core/tedge_api/src/workflow/supervisor.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl WorkflowSupervisor {
8080
self.commands = commands;
8181
self.commands
8282
.iter()
83-
.filter_map(|(t, s)| self.resume_command(t, s))
83+
.filter_map(|(t, s)| self.resume_command(t, s.clone()))
8484
.collect()
8585
}
8686

@@ -136,10 +136,10 @@ impl WorkflowSupervisor {
136136
///
137137
/// Return the current version if any.
138138
pub fn use_current_version(&mut self, operation: &OperationName) -> Option<WorkflowVersion> {
139-
match self.workflows.get_mut(&operation.as_str().into()) {
140-
Some(versions) => versions.use_current_version().cloned(),
141-
None => None,
142-
}
139+
self.workflows
140+
.get_mut(&operation.as_str().into())?
141+
.use_current_version()
142+
.cloned()
143143
}
144144

145145
/// Update the state of the command board on reception of a message sent by a peer over MQTT
@@ -162,9 +162,9 @@ impl WorkflowSupervisor {
162162
} else if command_state.is_init() {
163163
// This is a new command request
164164
if let Some(current_version) = workflow_versions.use_current_version() {
165-
let command_state = command_state.set_workflow_version(current_version);
166-
self.commands.insert(command_state.clone())?;
167-
Ok(Some(command_state))
165+
let updated_state = command_state.with_workflow_version(current_version);
166+
self.commands.insert(updated_state.clone())?;
167+
Ok(Some(updated_state))
168168
} else {
169169
return Err(WorkflowExecutionError::DeprecatedOperation {
170170
operation: operation.to_string(),
@@ -192,7 +192,7 @@ impl WorkflowSupervisor {
192192
});
193193
};
194194

195-
let Some(version) = &command_state.workflow_version() else {
195+
let Some(version) = command_state.workflow_version() else {
196196
return Err(WorkflowExecutionError::MissingVersion);
197197
};
198198

@@ -287,21 +287,17 @@ impl WorkflowSupervisor {
287287
fn resume_command(
288288
&self,
289289
timestamp: &Timestamp,
290-
command: &GenericCommandState,
290+
command: GenericCommandState,
291291
) -> Option<GenericCommandState> {
292-
let action = match self.get_action(command) {
292+
let action = match self.get_action(&command) {
293293
Ok(action) => action,
294294
Err(err) => {
295-
return Some(
296-
command
297-
.clone()
298-
.fail_with(format!("Fail to resume on start: {err:?}")),
299-
);
295+
return Some(command.fail_with(format!("Fail to resume on start: {err:?}")));
300296
}
301297
};
302298

303299
let epoch = format!("{}.{}", timestamp.unix_timestamp(), timestamp.millisecond());
304-
let command = command.clone().update_with_key_value("resumed_at", &epoch);
300+
let command = command.with_key_value("resumed_at", &epoch);
305301
match action {
306302
OperationAction::AwaitingAgentRestart(handlers) => {
307303
Some(command.update(handlers.on_success))
@@ -425,7 +421,7 @@ impl WorkflowVersions {
425421
self.in_use.contains_key(BUILT_IN)
426422
}
427423

428-
fn get(&self, version: &WorkflowVersion) -> Result<&OperationWorkflow, WorkflowExecutionError> {
424+
fn get(&self, version: &str) -> Result<&OperationWorkflow, WorkflowExecutionError> {
429425
self.in_use
430426
.get(version)
431427
.ok_or(WorkflowExecutionError::UnknownVersion {

0 commit comments

Comments
 (0)