Skip to content

Commit ed8c78c

Browse files
committed
dbus: rauc: forward poller status to broker
1 parent 68547c9 commit ed8c78c

File tree

2 files changed

+148
-3
lines changed

2 files changed

+148
-3
lines changed

src/dbus/rauc.rs

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use anyhow::Result;
2121
use async_std::stream::StreamExt;
2222
use async_std::sync::Arc;
2323
use futures_util::FutureExt;
24-
use log::{info, warn};
24+
use log::{error, info, warn};
2525
use serde::{Deserialize, Serialize};
2626

2727
use super::systemd::{Service, ServiceAction};
@@ -41,21 +41,37 @@ mod demo_mode;
4141
#[cfg(not(feature = "demo_mode"))]
4242
mod installer;
4343

44+
#[cfg(not(feature = "demo_mode"))]
45+
use installer::InstallerProxy;
46+
4447
#[cfg(not(feature = "demo_mode"))]
4548
mod poller;
4649

4750
#[cfg(not(feature = "demo_mode"))]
48-
use installer::InstallerProxy;
51+
use poller::PollerProxy;
4952

5053
#[cfg(feature = "demo_mode")]
5154
mod imports {
5255
pub(super) const CHANNELS_DIR: &str = "demo_files/usr/share/tacd/update_channels";
56+
57+
pub(super) struct PollerProxy<'a> {
58+
_dummy: &'a (),
59+
}
60+
61+
impl PollerProxy<'_> {
62+
pub async fn new<C>(_conn: C) -> Option<Self> {
63+
Some(Self { _dummy: &() })
64+
}
65+
66+
pub async fn poll(&self) -> zbus::Result<()> {
67+
Ok(())
68+
}
69+
}
5370
}
5471

5572
#[cfg(not(feature = "demo_mode"))]
5673
mod imports {
5774
pub(super) use anyhow::bail;
58-
pub(super) use log::error;
5975

6076
pub(super) const CHANNELS_DIR: &str = "/usr/share/tacd/update_channels";
6177
}
@@ -186,11 +202,14 @@ fn would_reboot_into_other_slot(slot_status: &SlotStatus, primary: Option<String
186202
}
187203

188204
async fn channel_list_update_task(
205+
conn: Arc<Connection>,
189206
reload: Arc<Topic<bool>>,
190207
enable_polling: Arc<Topic<bool>>,
191208
channels: Arc<Topic<Channels>>,
192209
rauc_service: Service,
193210
) -> Result<()> {
211+
let poller = PollerProxy::new(&conn).await.unwrap();
212+
194213
let (reload_stream, _) = reload.subscribe_unbounded();
195214
let (mut enable_polling_stream, _) = enable_polling.subscribe_unbounded();
196215

@@ -252,6 +271,14 @@ async fn channel_list_update_task(
252271
}
253272

254273
status_subscription.unsubscribe();
274+
275+
if enable_polling {
276+
info!("Trigger a poll");
277+
278+
if let Err(err) = poller.poll().await {
279+
error!("Failed to poll for updates: {err}");
280+
}
281+
}
255282
}
256283
}
257284

@@ -295,6 +322,7 @@ impl Rauc {
295322
wtb.spawn_task(
296323
"rauc-channel-list-update",
297324
channel_list_update_task(
325+
Arc::new(Connection),
298326
inst.reload.clone(),
299327
inst.enable_polling.clone(),
300328
inst.channels.clone(),
@@ -448,6 +476,56 @@ impl Rauc {
448476
Ok(())
449477
})?;
450478

479+
let conn_task = conn.clone();
480+
let channels = inst.channels.clone();
481+
482+
// Forward the "Poller::status" property to the broker framework
483+
wtb.spawn_task("rauc-forward-poller-status", async move {
484+
let proxy = PollerProxy::new(&conn_task).await.unwrap();
485+
486+
let mut stream = proxy.receive_status_changed().await;
487+
488+
if let Ok(status) = proxy.status().await {
489+
channels.modify(|chs| {
490+
let mut chs = chs?;
491+
492+
match chs.update_from_poll_status(status.into()) {
493+
Ok(true) => Some(chs),
494+
Ok(false) => None,
495+
Err(e) => {
496+
warn!("Could not update channel list from poll status: {e}");
497+
None
498+
}
499+
}
500+
});
501+
}
502+
503+
while let Some(status) = stream.next().await {
504+
let status = match status.get().await {
505+
Ok(status) => status,
506+
Err(e) => {
507+
warn!("Could not get poll status: {e}");
508+
continue;
509+
}
510+
};
511+
512+
channels.modify(|chs| {
513+
let mut chs = chs?;
514+
515+
match chs.update_from_poll_status(status.into()) {
516+
Ok(true) => Some(chs),
517+
Ok(false) => None,
518+
Err(e) => {
519+
warn!("Could not update channel list from poll status: {e}");
520+
None
521+
}
522+
}
523+
});
524+
}
525+
526+
Ok(())
527+
})?;
528+
451529
let conn_task = conn.clone();
452530
let channels = inst.channels.clone();
453531
let (mut install_stream, _) = inst.install.clone().subscribe_unbounded();
@@ -503,6 +581,7 @@ impl Rauc {
503581
wtb.spawn_task(
504582
"rauc-channel-list-update",
505583
channel_list_update_task(
584+
conn.clone(),
506585
inst.reload.clone(),
507586
inst.enable_polling.clone(),
508587
inst.channels.clone(),

src/dbus/rauc/update_channels.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// with this program; if not, write to the Free Software Foundation, Inc.,
1616
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
1717

18+
#[cfg(not(feature = "demo_mode"))]
19+
use std::convert::TryFrom;
1820
use std::fs::{read_dir, read_to_string, DirEntry};
1921
use std::os::unix::ffi::OsStrExt;
2022
use std::path::Path;
@@ -64,6 +66,34 @@ pub struct ChannelFile {
6466
pub polling_interval: Option<String>,
6567
}
6668

69+
#[cfg(not(feature = "demo_mode"))]
70+
fn zvariant_walk_nested_dicts<'a, T>(map: &'a zvariant::Dict, path: &'a [&'a str]) -> Result<&'a T>
71+
where
72+
&'a T: TryFrom<&'a zvariant::Value<'a>>,
73+
<&'a T as TryFrom<&'a zvariant::Value<'a>>>::Error: Into<zvariant::Error>,
74+
{
75+
let (key, rem) = path
76+
.split_first()
77+
.ok_or_else(|| anyhow!("Got an empty path to walk"))?;
78+
79+
let value: &zvariant::Value = map
80+
.get(key)?
81+
.ok_or_else(|| anyhow!("Could not find key \"{key}\" in dict"))?;
82+
83+
if rem.is_empty() {
84+
value.downcast_ref().map_err(|e| {
85+
let type_name = std::any::type_name::<T>();
86+
anyhow!("Failed to convert value in dictionary for key \"{key}\" to {type_name}: {e}")
87+
})
88+
} else {
89+
let value = value.downcast_ref().map_err(|e| {
90+
anyhow!("Failed to convert value in dictionary for key \"{key}\" to a dict: {e}")
91+
})?;
92+
93+
zvariant_walk_nested_dicts(value, rem)
94+
}
95+
}
96+
6797
impl Channel {
6898
fn from_file(path: &Path) -> Result<Self> {
6999
let file_name = || {
@@ -177,4 +207,40 @@ impl Channels {
177207
pub(super) fn primary(&self) -> Option<&Channel> {
178208
self.0.iter().find(|ch| ch.primary)
179209
}
210+
211+
#[cfg(not(feature = "demo_mode"))]
212+
fn primary_mut(&mut self) -> Option<&mut Channel> {
213+
self.0.iter_mut().find(|ch| ch.primary)
214+
}
215+
216+
#[cfg(not(feature = "demo_mode"))]
217+
pub(super) fn update_from_poll_status(&mut self, poll_status: zvariant::Dict) -> Result<bool> {
218+
let compatible: &zvariant::Str =
219+
zvariant_walk_nested_dicts(&poll_status, &["manifest", "update", "compatible"])?;
220+
let version: &zvariant::Str =
221+
zvariant_walk_nested_dicts(&poll_status, &["manifest", "update", "version"])?;
222+
let newer_than_installed: &bool =
223+
zvariant_walk_nested_dicts(&poll_status, &["update-available"])?;
224+
225+
if let Some(pb) = self.0.iter().find_map(|ch| ch.bundle.as_ref()) {
226+
if compatible == pb.compatible.as_str()
227+
&& version == pb.version.as_str()
228+
&& *newer_than_installed == pb.newer_than_installed
229+
{
230+
return Ok(false);
231+
}
232+
}
233+
234+
self.0.iter_mut().for_each(|ch| ch.bundle = None);
235+
236+
if let Some(primary) = self.primary_mut() {
237+
primary.bundle = Some(UpstreamBundle {
238+
compatible: compatible.as_str().into(),
239+
version: version.as_str().into(),
240+
newer_than_installed: *newer_than_installed,
241+
});
242+
}
243+
244+
Ok(true)
245+
}
180246
}

0 commit comments

Comments
 (0)