Skip to content

Commit fbcde02

Browse files
committed
Implement event / command framework.
The events feature makes it possible to periodically send (proprietary) events from the Relay gateways. The commands feature makes it possible to send commands to the Relay gateways, in which case the output is returned as an event.
1 parent b003a34 commit fbcde02

20 files changed

+956
-133
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
"time",
3333
"fs",
3434
"sync",
35+
"process",
36+
"io-util",
3537
] }
3638
hex = "0.4"
3739
rand = "0.9"

src/cache.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ impl From<&packets::MeshPacket> for PayloadCache {
6969
.unwrap_or_default()
7070
.as_secs() as u32,
7171
},
72+
packets::Payload::Command(v) => PayloadCache {
73+
p_type,
74+
uplink_id: 0,
75+
relay_id: v.relay_id,
76+
timestamp: v
77+
.timestamp
78+
.duration_since(UNIX_EPOCH)
79+
.unwrap_or_default()
80+
.as_secs() as u32,
81+
},
7282
}
7383
}
7484
}

src/cmd/configfile.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,6 @@ pub fn run() {
3737
# uplinks and forward these to the proxy API, rather than relaying these.
3838
border_gateway={{ mesh.border_gateway }}
3939
40-
# Heartbeat interval (Relay Gateway only).
41-
#
42-
# This defines the interval in which a Relay Gateway (border_gateway=false)
43-
# will emit heartbeat messages.
44-
heartbeat_interval="{{ mesh.heartbeat_interval }}"
45-
4640
# Max hop count.
4741
#
4842
# This defines the maximum number of hops a relayed payload will pass.
@@ -142,6 +136,63 @@ pub fn run() {
142136
143137
# Command API URL.
144138
command_url="{{ backend.mesh_concentratord.command_url }}"
139+
140+
141+
# Events configuration (Relay only).
142+
[events]
143+
144+
# Heartbeat interval (Relay Gateway only).
145+
#
146+
# This defines the interval in which a Relay Gateway (border_gateway=false)
147+
# will emit heartbeat messages.
148+
heartbeat_interval="{{ events.heartbeat_interval }}"
149+
150+
# Commands.
151+
#
152+
# This configures for each event type the command that must be executed. The
153+
# stdout of the command will be used as event payload. Example:
154+
#
155+
# 128 = ["/path/to/command", "arg1", "arg2"]
156+
#
157+
[events.commands]
158+
159+
{{#each events.commands}}
160+
{{@key}} = [{{#each this}}"{{this}}", {{/each}}]
161+
{{/each}}
162+
163+
# Event sets (can be repeated).
164+
#
165+
# This configures sets of events that will be periodically sent by the
166+
# relay. Example:
167+
#
168+
# [[events.sets]]
169+
# interval = "5min"
170+
# events = [128, 129, 130]
171+
#
172+
{{#each events.sets}}
173+
[[events.sets]]
174+
interval = "{{this.interval}}"
175+
events = [{{#each this.events}}{{this}}, {{/each}}]
176+
{{/each}}
177+
178+
179+
# Commands configuration (Relay only).
180+
[commands]
181+
182+
# Commands.
183+
#
184+
# On receiving a command, the Gateway Mesh will execute the command matching
185+
# the command type (128 - 255 is for proprietary commands). The payload will
186+
# be provided to the command using stdin. The returned stdout will be sent
187+
# back as event (using the same type). Example:
188+
#
189+
# "129" = ["/path/to/command", "arg1", "arg2"]
190+
#
191+
[events.commands]
192+
193+
{{#each events.commands}}
194+
{{@key}} = [{{#each this}}"{{this}}", {{/each}}]
195+
{{/each}}
145196
"#;
146197

147198
let conf = config::get();

src/cmd/root.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use signal_hook::consts::signal::*;
44
use signal_hook_tokio::Signals;
55

66
use crate::config::Configuration;
7-
use crate::{backend, heartbeat, proxy};
7+
use crate::{backend, commands, events, proxy};
88

99
pub async fn run(conf: &Configuration) -> Result<()> {
1010
proxy::setup(conf).await?;
1111
backend::setup(conf).await?;
12-
heartbeat::setup(conf).await?;
12+
events::setup(conf).await?;
13+
commands::setup(conf).await?;
1314

1415
let mut signals = Signals::new([SIGINT, SIGTERM])?;
1516
let handle = signals.handle();

src/commands.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use std::collections::HashMap;
2+
use std::process::Stdio;
3+
use std::time::SystemTime;
4+
5+
use anyhow::Result;
6+
use log::error;
7+
use tokio::io::AsyncWriteExt;
8+
use tokio::process::Command;
9+
use tokio::sync::{Mutex, OnceCell};
10+
11+
use crate::{config::Configuration, packets};
12+
13+
static COMMANDS: OnceCell<HashMap<u8, Vec<String>>> = OnceCell::const_new();
14+
static LAST_TIMESTAMP: OnceCell<Mutex<Option<SystemTime>>> = OnceCell::const_new();
15+
16+
pub async fn setup(conf: &Configuration) -> Result<()> {
17+
// Only Relay Gateways process commands.
18+
if conf.mesh.border_gateway {
19+
return Ok(());
20+
}
21+
22+
// Set commands.
23+
COMMANDS
24+
.set(
25+
conf.commands
26+
.commands
27+
.iter()
28+
.map(|(k, v)| (k.parse().unwrap(), v.clone()))
29+
.collect(),
30+
)
31+
.map_err(|_| anyhow!("OnceCell set error"))?;
32+
33+
Ok(())
34+
}
35+
36+
pub async fn execute_commands(pl: &packets::CommandPayload) -> Result<Vec<packets::Event>> {
37+
// Validate that the command timestamp did increment, compared to previous
38+
// command payload.
39+
if let Some(ts) = get_last_timestamp().await {
40+
if ts >= pl.timestamp {
41+
return Err(anyhow!(
42+
"Command timestamp did not increment compared to previous command payload"
43+
));
44+
}
45+
}
46+
47+
// Store the command timestamp.
48+
set_last_timestamp(pl.timestamp).await;
49+
50+
// Execute the commands and capture the response events.
51+
let mut out = vec![];
52+
for cmd in &pl.commands {
53+
let resp = match cmd {
54+
packets::Command::Proprietary((t, v)) => execute_proprietary(*t, v).await,
55+
};
56+
57+
match resp {
58+
Ok(v) => out.push(v),
59+
Err(e) => error!("Execute command error: {}", e),
60+
}
61+
}
62+
63+
Ok(out)
64+
}
65+
66+
async fn execute_proprietary(typ: u8, value: &[u8]) -> Result<packets::Event> {
67+
let args = COMMANDS
68+
.get()
69+
.ok_or_else(|| anyhow!("COMMANDS is not set"))?
70+
.get(&typ)
71+
.ok_or_else(|| anyhow!("Command type {} is not configured", typ))?;
72+
73+
if args.is_empty() {
74+
return Err(anyhow!("Command for command type {} is empty", typ,));
75+
}
76+
77+
let mut cmd = Command::new(&args[0]);
78+
cmd.stdin(Stdio::piped());
79+
cmd.stdout(Stdio::piped());
80+
cmd.stderr(Stdio::piped());
81+
82+
// Add addition args.
83+
if args.len() > 1 {
84+
cmd.args(&args[1..]);
85+
}
86+
87+
// Spawn process
88+
let mut child = cmd.spawn()?;
89+
90+
// Write stdin
91+
let mut stdin = child.stdin.take().unwrap();
92+
tokio::spawn({
93+
let b = value.to_vec();
94+
async move { stdin.write(&b).await }
95+
});
96+
97+
// Wait for output
98+
let out = child.wait_with_output().await?;
99+
Ok(packets::Event::Proprietary((typ, out.stdout)))
100+
}
101+
102+
async fn get_last_timestamp() -> Option<SystemTime> {
103+
LAST_TIMESTAMP
104+
.get_or_init(|| async { Mutex::new(None) })
105+
.await
106+
.lock()
107+
.await
108+
.clone()
109+
}
110+
111+
async fn set_last_timestamp(ts: SystemTime) {
112+
let mut last_ts = LAST_TIMESTAMP
113+
.get_or_init(|| async { Mutex::new(None) })
114+
.await
115+
.lock()
116+
.await;
117+
118+
*last_ts = Some(ts);
119+
}

src/config.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::sync::{Arc, Mutex, OnceLock};
23
use std::time::Duration;
34
use std::{env, fs};
@@ -18,6 +19,7 @@ pub struct Configuration {
1819
pub backend: Backend,
1920
pub mappings: Mappings,
2021
pub events: Events,
22+
pub commands: Commands,
2123
}
2224

2325
impl Configuration {
@@ -158,16 +160,34 @@ pub struct DataRate {
158160
pub struct Events {
159161
#[serde(with = "humantime_serde")]
160162
pub heartbeat_interval: Duration,
163+
pub commands: HashMap<String, Vec<String>>,
164+
pub sets: Vec<EventsSet>,
161165
}
162166

163167
impl Default for Events {
164168
fn default() -> Self {
165169
Events {
166170
heartbeat_interval: Duration::from_secs(300),
171+
commands: Default::default(),
172+
sets: Vec::new(),
167173
}
168174
}
169175
}
170176

177+
#[derive(Default, Serialize, Deserialize, PartialEq, Eq)]
178+
#[serde(default)]
179+
pub struct EventsSet {
180+
#[serde(with = "humantime_serde")]
181+
pub interval: Duration,
182+
pub events: Vec<u8>,
183+
}
184+
185+
#[derive(Default, Serialize, Deserialize, PartialEq, Eq)]
186+
#[serde(default)]
187+
pub struct Commands {
188+
pub commands: HashMap<String, Vec<String>>,
189+
}
190+
171191
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
172192
#[allow(non_camel_case_types)]
173193
#[allow(clippy::upper_case_acronyms)]

0 commit comments

Comments
 (0)