Skip to content

Commit b36ce65

Browse files
committed
Persist workflow definition of pending commands on start
When the agent restarts and loads pending operations, these operations might not have been assign a workflow version. This is notably the case after a self update from a previous version of the agent that was oblivious of workflow versions. In such a case, the agent assumes the current user-defined version of the workflow is the version to be used for the pending command. The workflow definition is persisted in `workflows-in-use` directory and assigned to the command. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent d710940 commit b36ce65

File tree

3 files changed

+33
-22
lines changed

3 files changed

+33
-22
lines changed

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ impl WorkflowActor {
513513
for command in self
514514
.workflow_repository
515515
.load_pending_commands(pending_commands)
516+
.await
516517
{
517518
// Make sure the latest state is visible over MQTT
518519
self.mqtt_publisher

crates/core/tedge_agent/src/operation_workflows/persist.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,25 @@ impl WorkflowRepository {
346346
None
347347
}
348348

349-
pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec<GenericCommandState> {
349+
pub async fn load_pending_commands(
350+
&mut self,
351+
mut commands: CommandBoard,
352+
) -> Vec<GenericCommandState> {
353+
// If the resumed commands have been triggered by an agent without workflow version management
354+
// then these commands are assigned the current version of the operation workflow.
355+
// These currents versions have also to be marked as in use and persisted.
356+
for (_, ref mut command) in commands.iter_mut() {
357+
if command.workflow_version().is_none() {
358+
if let Some(operation) = command.operation() {
359+
if let Some(current_version) = self.workflows.use_current_version(&operation) {
360+
self.persist_workflow_definition(&operation, &current_version)
361+
.await;
362+
*command = command.clone().set_workflow_version(&current_version);
363+
}
364+
}
365+
}
366+
}
367+
350368
self.workflows.load_pending_commands(commands)
351369
}
352370

crates/core/tedge_api/src/workflow/supervisor.rs

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -76,25 +76,7 @@ impl WorkflowSupervisor {
7676
}
7777

7878
/// Update on start the set of pending commands
79-
pub fn load_pending_commands(
80-
&mut self,
81-
mut commands: CommandBoard,
82-
) -> Vec<GenericCommandState> {
83-
// If the resumed commands have been triggered by an agent without workflow version management
84-
// then these commands are assigned the current version for the operation.
85-
// These currents versions have also to be marked as in use.
86-
for (_, ref mut command) in commands.iter_mut() {
87-
if command.workflow_version().is_none() {
88-
if let Some(versions) = command
89-
.operation()
90-
.and_then(|operation| self.workflows.get_mut(&operation.as_str().into()))
91-
{
92-
if let Some(current_version) = versions.use_current_version() {
93-
*command = command.clone().set_workflow_version(current_version);
94-
}
95-
}
96-
}
97-
}
79+
pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec<GenericCommandState> {
9880
self.commands = commands;
9981
self.commands
10082
.iter()
@@ -150,6 +132,16 @@ impl WorkflowSupervisor {
150132
}
151133
}
152134

135+
/// Mark the current version of an operation workflow as being in use.
136+
///
137+
/// Return the current version if any.
138+
pub fn use_current_version(&mut self, operation: &OperationName) -> Option<WorkflowVersion> {
139+
match self.workflows.get_mut(&operation.as_str().into()) {
140+
Some(versions) => versions.use_current_version().cloned(),
141+
None => None,
142+
}
143+
}
144+
153145
/// Update the state of the command board on reception of a message sent by a peer over MQTT
154146
///
155147
/// Return the new CommandRequest state if any.
@@ -399,7 +391,7 @@ impl WorkflowVersions {
399391
}
400392
}
401393

402-
// Mark the current version as being in-use.
394+
/// Mark the current version as being in-use.
403395
fn use_current_version(&mut self) -> Option<&WorkflowVersion> {
404396
match self.current.as_ref() {
405397
Some((version, workflow)) => {
@@ -416,7 +408,7 @@ impl WorkflowVersions {
416408
}
417409
}
418410

419-
// Remove the current version from this list of versions, restoring the built-in version if any
411+
/// Remove the current version from this list of versions, restoring the built-in version if any
420412
fn remove(&mut self, version: &WorkflowVersion) {
421413
if self.current.as_ref().map(|(v, _)| v == version) == Some(true) {
422414
self.current = None;

0 commit comments

Comments
 (0)