Skip to content

Commit 08577c9

Browse files
Merge pull request #2844 from didier-wenzek/feat/config-and-log-management-workflows
feat: Config and log management workflows
2 parents 3a236f0 + 15b7e26 commit 08577c9

File tree

40 files changed

+1199
-480
lines changed

40 files changed

+1199
-480
lines changed

crates/common/mqtt_channel/src/topics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ impl Topic {
4141
/// An MQTT topic filter
4242
#[derive(Debug, Clone, Eq, PartialEq)]
4343
pub struct TopicFilter {
44-
pub patterns: Vec<String>,
45-
pub qos: QoS,
44+
patterns: Vec<String>,
45+
qos: QoS,
4646
}
4747

4848
impl Default for TopicFilter {

crates/core/c8y_api/src/json_c8y.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub struct C8ySoftwareModuleItem {
8282
pub url: Option<DownloadInfo>,
8383
}
8484

85-
impl<'a> Jsonify<'a> for C8ySoftwareModuleItem {}
85+
impl Jsonify for C8ySoftwareModuleItem {}
8686

8787
impl From<SoftwareModule> for C8ySoftwareModuleItem {
8888
fn from(module: SoftwareModule) -> Self {
@@ -108,7 +108,7 @@ pub struct C8yUpdateSoftwareListResponse {
108108
c8y_software_list: Option<Vec<C8ySoftwareModuleItem>>,
109109
}
110110

111-
impl<'a> Jsonify<'a> for C8yUpdateSoftwareListResponse {}
111+
impl Jsonify for C8yUpdateSoftwareListResponse {}
112112

113113
impl From<&SoftwareListCommand> for C8yUpdateSoftwareListResponse {
114114
fn from(list: &SoftwareListCommand) -> Self {
@@ -163,7 +163,7 @@ impl From<ThinEdgeEvent> for C8yCreateEvent {
163163
}
164164
}
165165

166-
impl<'a> Jsonify<'a> for C8yCreateEvent {}
166+
impl Jsonify for C8yCreateEvent {}
167167

168168
fn update_the_external_source_event(extras: &mut HashMap<String, Value>, source: &str) {
169169
let mut value = serde_json::Map::new();

crates/core/tedge_actors/src/builders.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ pub trait MessageSink<M: Message> {
165165
cast: MessageMapper,
166166
) where
167167
N: Message,
168-
MS: Iterator<Item = M> + Send,
168+
MS: IntoIterator<Item = M> + Send,
169+
MS::IntoIter: Send,
169170
MessageMapper: Fn(N) -> MS,
170171
MessageMapper: 'static + Send + Sync,
171172
{
@@ -193,6 +194,26 @@ pub trait MessageSource<M: Message, Config> {
193194
/// A peer can subscribe to a subset of the messages produced by this source.
194195
/// This subset of messages expected by the peer is defined by the `config` parameter.
195196
fn connect_sink(&mut self, config: Config, peer: &impl MessageSink<M>);
197+
198+
/// Connect a peer actor that will consume transformed messages produced by this actor.
199+
///
200+
/// The transformation function will be applied to the messages sent by the source,
201+
/// to convert them in a sequence, possibly empty, of messages forwarded to the sink.
202+
fn connect_mapped_sink<N, NS, MessageMapper>(
203+
&mut self,
204+
config: Config,
205+
peer: &impl MessageSink<N>,
206+
cast: MessageMapper,
207+
) where
208+
N: Message,
209+
NS: IntoIterator<Item = N> + Send,
210+
NS::IntoIter: Send,
211+
MessageMapper: Fn(M) -> NS,
212+
MessageMapper: 'static + Send + Sync,
213+
{
214+
let sender: DynSender<M> = MappingSender::new(peer.get_sender(), cast).into();
215+
self.connect_sink(config, &sender)
216+
}
196217
}
197218

198219
/// The [Builder] of an [Actor](crate::Actor) must implement this trait

crates/core/tedge_actors/src/channels.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ impl<M, N, NS, F> Sender<M> for MappingSender<F, N>
146146
where
147147
M: Message,
148148
N: Message,
149-
NS: Iterator<Item = N> + Send,
149+
NS: IntoIterator<Item = N> + Send,
150+
NS::IntoIter: Send,
150151
F: Fn(M) -> NS,
151152
F: 'static + Sync + Send,
152153
{

crates/core/tedge_agent/src/agent.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -249,14 +249,14 @@ impl Agent {
249249
let mut software_update_builder = SoftwareManagerBuilder::new(self.config.sw_update_config);
250250

251251
// Converter actor
252-
let converter_actor_builder = TedgeOperationConverterBuilder::new(
252+
let mut converter_actor_builder = TedgeOperationConverterBuilder::new(
253253
self.config.operation_config,
254254
workflows,
255-
&mut software_update_builder,
256-
&mut restart_actor_builder,
257255
&mut mqtt_actor_builder,
258256
&mut script_runner,
259257
);
258+
converter_actor_builder.register_builtin_operation(&mut restart_actor_builder);
259+
converter_actor_builder.register_builtin_operation(&mut software_update_builder);
260260

261261
// Shutdown on SIGINT
262262
let signal_actor_builder = SignalActor::builder(&runtime.get_handle());
@@ -297,16 +297,15 @@ impl Agent {
297297
is_sudo_enabled: self.config.is_sudo_enabled,
298298
config_update_enabled: self.config.capabilities.config_update,
299299
})?;
300-
Some(
301-
ConfigManagerBuilder::try_new(
302-
manager_config,
303-
&mut mqtt_actor_builder,
304-
&mut fs_watch_actor_builder,
305-
&mut downloader_actor_builder,
306-
&mut uploader_actor_builder,
307-
)
308-
.await?,
300+
let mut config_manager = ConfigManagerBuilder::try_new(
301+
manager_config,
302+
&mut fs_watch_actor_builder,
303+
&mut downloader_actor_builder,
304+
&mut uploader_actor_builder,
309305
)
306+
.await?;
307+
converter_actor_builder.register_builtin_operation(&mut config_manager);
308+
Some(config_manager)
310309
} else if self.config.capabilities.config_update {
311310
warn!("Config_snapshot operation must be enabled to run config_update!");
312311
None
@@ -323,15 +322,14 @@ impl Agent {
323322
mqtt_schema: mqtt_schema.clone(),
324323
mqtt_device_topic_id: self.config.mqtt_device_topic_id.clone(),
325324
})?;
326-
Some(
327-
LogManagerBuilder::try_new(
328-
log_manager_config,
329-
&mut mqtt_actor_builder,
330-
&mut fs_watch_actor_builder,
331-
&mut uploader_actor_builder,
332-
)
333-
.await?,
325+
let mut log_actor = LogManagerBuilder::try_new(
326+
log_manager_config,
327+
&mut fs_watch_actor_builder,
328+
&mut uploader_actor_builder,
334329
)
330+
.await?;
331+
converter_actor_builder.register_builtin_operation(&mut log_actor);
332+
Some(log_actor)
335333
} else {
336334
None
337335
};

crates/core/tedge_agent/src/restart_manager/builder.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@ use crate::restart_manager::config::RestartManagerConfig;
33
use tedge_actors::Builder;
44
use tedge_actors::DynSender;
55
use tedge_actors::LinkError;
6+
use tedge_actors::MappingSender;
67
use tedge_actors::MessageSink;
78
use tedge_actors::MessageSource;
89
use tedge_actors::NoConfig;
910
use tedge_actors::RuntimeRequest;
1011
use tedge_actors::RuntimeRequestSink;
1112
use tedge_actors::SimpleMessageBoxBuilder;
13+
use tedge_api::mqtt_topics::OperationType;
14+
use tedge_api::workflow::GenericCommandData;
15+
use tedge_api::workflow::GenericCommandState;
16+
use tedge_api::workflow::OperationName;
1217
use tedge_api::RestartCommand;
1318

1419
pub struct RestartManagerBuilder {
@@ -39,6 +44,25 @@ impl MessageSource<RestartCommand, NoConfig> for RestartManagerBuilder {
3944
}
4045
}
4146

47+
impl MessageSource<GenericCommandData, NoConfig> for RestartManagerBuilder {
48+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<GenericCommandData>) {
49+
self.message_box.connect_sink(config, &peer.get_sender())
50+
}
51+
}
52+
53+
impl IntoIterator for &RestartManagerBuilder {
54+
type Item = (OperationName, DynSender<GenericCommandState>);
55+
type IntoIter = std::vec::IntoIter<Self::Item>;
56+
57+
fn into_iter(self) -> Self::IntoIter {
58+
let sender =
59+
MappingSender::new(self.message_box.get_sender(), |msg: GenericCommandState| {
60+
msg.try_into().ok()
61+
});
62+
vec![(OperationType::Restart.to_string(), sender.into())].into_iter()
63+
}
64+
}
65+
4266
impl RuntimeRequestSink for RestartManagerBuilder {
4367
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
4468
self.message_box.get_signal_sender()

crates/core/tedge_agent/src/software_manager/actor.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ use tedge_api::commands::CommandStatus;
2727
use tedge_api::commands::SoftwareCommandMetadata;
2828
use tedge_api::commands::SoftwareListCommand;
2929
use tedge_api::commands::SoftwareUpdateCommand;
30+
use tedge_api::mqtt_topics::OperationType;
31+
use tedge_api::workflow::GenericCommandData;
32+
use tedge_api::workflow::GenericCommandMetadata;
33+
use tedge_api::workflow::GenericCommandState;
34+
use tedge_api::Jsonify;
3035
use tedge_api::SoftwareType;
3136
use tedge_config::TEdgeConfigError;
3237
use tracing::error;
@@ -35,6 +40,33 @@ use tracing::warn;
3540

3641
fan_in_message_type!(SoftwareCommand[SoftwareUpdateCommand, SoftwareListCommand, SoftwareCommandMetadata] : Debug, Eq, PartialEq, Deserialize, Serialize);
3742

43+
impl SoftwareCommand {
44+
pub fn into_generic_commands(self) -> Vec<GenericCommandData> {
45+
match self {
46+
SoftwareCommand::SoftwareUpdateCommand(cmd) => {
47+
vec![GenericCommandState::from(cmd).into()]
48+
}
49+
SoftwareCommand::SoftwareListCommand(cmd) => {
50+
vec![GenericCommandState::from(cmd).into()]
51+
}
52+
SoftwareCommand::SoftwareCommandMetadata(metadata) => {
53+
vec![
54+
GenericCommandMetadata {
55+
operation: OperationType::SoftwareList.to_string(),
56+
payload: metadata.to_value(),
57+
}
58+
.into(),
59+
GenericCommandMetadata {
60+
operation: OperationType::SoftwareUpdate.to_string(),
61+
payload: metadata.to_value(),
62+
}
63+
.into(),
64+
]
65+
}
66+
}
67+
}
68+
}
69+
3870
/// Actor which performs software operations.
3971
///
4072
/// This actor takes as input [`SoftwareRequest`]s, and responds with

crates/core/tedge_agent/src/software_manager/builder.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ use crate::software_manager::config::SoftwareManagerConfig;
44
use tedge_actors::Builder;
55
use tedge_actors::DynSender;
66
use tedge_actors::LinkError;
7+
use tedge_actors::MappingSender;
78
use tedge_actors::MessageSink;
89
use tedge_actors::MessageSource;
910
use tedge_actors::NoConfig;
1011
use tedge_actors::RuntimeRequest;
1112
use tedge_actors::RuntimeRequestSink;
1213
use tedge_actors::SimpleMessageBoxBuilder;
14+
use tedge_api::mqtt_topics::OperationType;
15+
use tedge_api::workflow::GenericCommandData;
16+
use tedge_api::workflow::GenericCommandState;
17+
use tedge_api::workflow::OperationName;
18+
use tedge_api::SoftwareListCommand;
19+
use tedge_api::SoftwareUpdateCommand;
1320

1421
pub struct SoftwareManagerBuilder {
1522
config: SoftwareManagerConfig,
@@ -45,6 +52,47 @@ impl RuntimeRequestSink for SoftwareManagerBuilder {
4552
}
4653
}
4754

55+
impl MessageSource<GenericCommandData, NoConfig> for SoftwareManagerBuilder {
56+
fn connect_sink(&mut self, config: NoConfig, peer: &impl MessageSink<GenericCommandData>) {
57+
self.message_box
58+
.connect_mapped_sink(config, &peer.get_sender(), |msg: SoftwareCommand| {
59+
msg.into_generic_commands()
60+
})
61+
}
62+
}
63+
64+
impl IntoIterator for &SoftwareManagerBuilder {
65+
type Item = (OperationName, DynSender<GenericCommandState>);
66+
type IntoIter = std::vec::IntoIter<Self::Item>;
67+
68+
fn into_iter(self) -> Self::IntoIter {
69+
let software_list_sender =
70+
MappingSender::new(self.message_box.get_sender(), |msg: GenericCommandState| {
71+
SoftwareListCommand::try_from(msg)
72+
.map(SoftwareCommand::SoftwareListCommand)
73+
.ok()
74+
});
75+
let software_update_sender =
76+
MappingSender::new(self.message_box.get_sender(), |msg: GenericCommandState| {
77+
SoftwareUpdateCommand::try_from(msg)
78+
.map(SoftwareCommand::SoftwareUpdateCommand)
79+
.ok()
80+
})
81+
.into();
82+
vec![
83+
(
84+
OperationType::SoftwareList.to_string(),
85+
software_list_sender.into(),
86+
),
87+
(
88+
OperationType::SoftwareUpdate.to_string(),
89+
software_update_sender,
90+
),
91+
]
92+
.into_iter()
93+
}
94+
}
95+
4896
impl Builder<SoftwareManagerActor> for SoftwareManagerBuilder {
4997
type Error = LinkError;
5098

0 commit comments

Comments
 (0)