Skip to content

Commit 99b38fa

Browse files
authored
Merge pull request #3307 from didier-wenzek/fix/agent-self-update-with-unversionned-pending-commands
fix: Persist workflow definition of pending commands on start
2 parents d710940 + b36ce65 commit 99b38fa

File tree

3 files changed

+33
-22
lines changed

3 files changed

+33
-22
lines changed

crates/core/tedge_agent/src/operation_workflows/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,7 @@ impl WorkflowActor {
513513
for command in self
514514
.workflow_repository
515515
.load_pending_commands(pending_commands)
516+
.await
516517
{
517518
// Make sure the latest state is visible over MQTT
518519
self.mqtt_publisher

crates/core/tedge_agent/src/operation_workflows/persist.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,25 @@ impl WorkflowRepository {
346346
None
347347
}
348348

349-
pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec<GenericCommandState> {
349+
pub async fn load_pending_commands(
350+
&mut self,
351+
mut commands: CommandBoard,
352+
) -> Vec<GenericCommandState> {
353+
// If the resumed commands have been triggered by an agent without workflow version management
354+
// then these commands are assigned the current version of the operation workflow.
355+
// These currents versions have also to be marked as in use and persisted.
356+
for (_, ref mut command) in commands.iter_mut() {
357+
if command.workflow_version().is_none() {
358+
if let Some(operation) = command.operation() {
359+
if let Some(current_version) = self.workflows.use_current_version(&operation) {
360+
self.persist_workflow_definition(&operation, &current_version)
361+
.await;
362+
*command = command.clone().set_workflow_version(&current_version);
363+
}
364+
}
365+
}
366+
}
367+
350368
self.workflows.load_pending_commands(commands)
351369
}
352370

crates/core/tedge_api/src/workflow/supervisor.rs

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -76,25 +76,7 @@ impl WorkflowSupervisor {
7676
}
7777

7878
/// Update on start the set of pending commands
79-
pub fn load_pending_commands(
80-
&mut self,
81-
mut commands: CommandBoard,
82-
) -> Vec<GenericCommandState> {
83-
// If the resumed commands have been triggered by an agent without workflow version management
84-
// then these commands are assigned the current version for the operation.
85-
// These currents versions have also to be marked as in use.
86-
for (_, ref mut command) in commands.iter_mut() {
87-
if command.workflow_version().is_none() {
88-
if let Some(versions) = command
89-
.operation()
90-
.and_then(|operation| self.workflows.get_mut(&operation.as_str().into()))
91-
{
92-
if let Some(current_version) = versions.use_current_version() {
93-
*command = command.clone().set_workflow_version(current_version);
94-
}
95-
}
96-
}
97-
}
79+
pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec<GenericCommandState> {
9880
self.commands = commands;
9981
self.commands
10082
.iter()
@@ -150,6 +132,16 @@ impl WorkflowSupervisor {
150132
}
151133
}
152134

135+
/// Mark the current version of an operation workflow as being in use.
136+
///
137+
/// Return the current version if any.
138+
pub fn use_current_version(&mut self, operation: &OperationName) -> Option<WorkflowVersion> {
139+
match self.workflows.get_mut(&operation.as_str().into()) {
140+
Some(versions) => versions.use_current_version().cloned(),
141+
None => None,
142+
}
143+
}
144+
153145
/// Update the state of the command board on reception of a message sent by a peer over MQTT
154146
///
155147
/// Return the new CommandRequest state if any.
@@ -399,7 +391,7 @@ impl WorkflowVersions {
399391
}
400392
}
401393

402-
// Mark the current version as being in-use.
394+
/// Mark the current version as being in-use.
403395
fn use_current_version(&mut self) -> Option<&WorkflowVersion> {
404396
match self.current.as_ref() {
405397
Some((version, workflow)) => {
@@ -416,7 +408,7 @@ impl WorkflowVersions {
416408
}
417409
}
418410

419-
// Remove the current version from this list of versions, restoring the built-in version if any
411+
/// Remove the current version from this list of versions, restoring the built-in version if any
420412
fn remove(&mut self, version: &WorkflowVersion) {
421413
if self.current.as_ref().map(|(v, _)| v == version) == Some(true) {
422414
self.current = None;

0 commit comments

Comments
 (0)