Skip to content

Commit 6add599

Browse files
committed
Refactor heartbeat + Concentratord ZMQ interface.
This refactors the heartbeat payload into an event payload, of which a heartbeat is one of the possible options. This also refactors the Concentratord ZMQ interface, moving away from multipart messages in favor of a single message containing an Event / Command Protobuf message.
1 parent b208a8e commit 6add599

19 files changed

+523
-424
lines changed

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"usage",
1717
"derive",
1818
] }
19-
chirpstack_api = { version = "4.12", default-features = false }
19+
chirpstack_api = { version = "4.12.0", default-features = false, path = "../chirpstack/api/rust" }
2020
lrwn_filters = { version = "4.12", features = ["serde"] }
2121
log = "0.4"
2222
simple_logger = "5.0"

src/backend.rs

Lines changed: 89 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ static RELAY_ID: OnceLock<Mutex<[u8; 4]>> = OnceLock::new();
1616
static CONCENTRATORD_CMD_CHAN: OnceLock<CommandChannel> = OnceLock::new();
1717
static MESH_CONCENTRATORD_CMD_CHAN: OnceLock<CommandChannel> = OnceLock::new();
1818

19-
type Event = (String, Vec<u8>);
20-
type Command = ((String, Vec<u8>), oneshot::Sender<Result<Vec<u8>>>);
19+
type Command = (gw::Command, oneshot::Sender<Result<Vec<u8>>>);
2120
type CommandChannel = mpsc::UnboundedSender<Command>;
2221

2322
pub async fn setup(conf: &Configuration) -> Result<()> {
@@ -47,7 +46,7 @@ async fn setup_concentratord(conf: &Configuration) -> Result<()> {
4746
sock.connect(&command_url).unwrap();
4847

4948
while let Some(cmd) = cmd_rx.blocking_recv() {
50-
let resp = send_zmq_command(&mut sock, &cmd);
49+
let resp = send_zmq_command(&mut sock, &cmd.0);
5150
cmd.1.send(resp).unwrap();
5251
}
5352

@@ -60,23 +59,30 @@ async fn setup_concentratord(conf: &Configuration) -> Result<()> {
6059
trace!("Reading Gateway ID");
6160
let mut gateway_id: [u8; 8] = [0; 8];
6261
let (gateway_id_tx, gateway_id_rx) = oneshot::channel::<Result<Vec<u8>>>();
63-
cmd_tx.send((("gateway_id".to_string(), vec![]), gateway_id_tx))?;
62+
cmd_tx.send((
63+
gw::Command {
64+
command: Some(gw::command::Command::GetGatewayId(
65+
gw::GetGatewayIdRequest {},
66+
)),
67+
},
68+
gateway_id_tx,
69+
))?;
6470
let resp = gateway_id_rx.await??;
65-
gateway_id.copy_from_slice(&resp);
66-
info!("Retrieved Gateway ID: {}", hex::encode(gateway_id));
71+
72+
let resp = gw::GetGatewayIdResponse::decode(resp.as_slice())?;
73+
gateway_id.copy_from_slice(&hex::decode(&resp.gateway_id)?);
74+
info!("Retrieved Gateway ID: {}", resp.gateway_id);
6775
GATEWAY_ID
6876
.set(Mutex::new(gateway_id))
6977
.map_err(|e| anyhow!("OnceLock error: {:?}", e))?;
7078

7179
// Set CMD channel.
72-
7380
CONCENTRATORD_CMD_CHAN
7481
.set(cmd_tx)
7582
.map_err(|e| anyhow!("OnceLock error: {:?}", e))?;
7683

7784
// Setup ZMQ event.
78-
79-
let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
85+
let (event_tx, event_rx) = mpsc::unbounded_channel::<gw::Event>();
8086

8187
// Spawn the zmq event handler to a dedicated thread.
8288
thread::spawn({
@@ -143,7 +149,7 @@ async fn setup_mesh_conncentratord(conf: &Configuration) -> Result<()> {
143149
sock.connect(&command_url).unwrap();
144150

145151
while let Some(cmd) = cmd_rx.blocking_recv() {
146-
let resp = send_zmq_command(&mut sock, &cmd);
152+
let resp = send_zmq_command(&mut sock, &cmd.0);
147153
cmd.1.send(resp).unwrap();
148154
}
149155

@@ -155,12 +161,20 @@ async fn setup_mesh_conncentratord(conf: &Configuration) -> Result<()> {
155161
trace!("Reading Gateway ID");
156162

157163
let (gateway_id_tx, gateway_id_rx) = oneshot::channel::<Result<Vec<u8>>>();
158-
cmd_tx.send((("gateway_id".to_string(), vec![]), gateway_id_tx))?;
164+
cmd_tx.send((
165+
gw::Command {
166+
command: Some(gw::command::Command::GetGatewayId(
167+
gw::GetGatewayIdRequest {},
168+
)),
169+
},
170+
gateway_id_tx,
171+
))?;
159172
let resp = gateway_id_rx.await??;
160-
info!("Retrieved Gateway ID: {}", hex::encode(&resp));
173+
let resp = gw::GetGatewayIdResponse::decode(resp.as_slice())?;
174+
info!("Retrieved Gateway ID: {}", resp.gateway_id);
161175

162176
let mut relay_id: [u8; 4] = [0; 4];
163-
relay_id.copy_from_slice(&resp[4..]);
177+
relay_id.copy_from_slice(&hex::decode(&resp.gateway_id)?[4..]);
164178
RELAY_ID
165179
.set(Mutex::new(relay_id))
166180
.map_err(|e| anyhow!("OnceLock error: {:?}", e))?;
@@ -173,7 +187,7 @@ async fn setup_mesh_conncentratord(conf: &Configuration) -> Result<()> {
173187

174188
// Setup ZMQ event.
175189

176-
let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
190+
let (event_tx, event_rx) = mpsc::unbounded_channel::<gw::Event>();
177191

178192
// Spawn the zmq event handler to a dedicated thread;
179193
thread::spawn({
@@ -211,15 +225,15 @@ async fn setup_mesh_conncentratord(conf: &Configuration) -> Result<()> {
211225
async fn event_loop(
212226
border_gateway: bool,
213227
border_gateway_ignore_direct_uplinks: bool,
214-
mut event_rx: mpsc::UnboundedReceiver<Event>,
228+
mut event_rx: mpsc::UnboundedReceiver<gw::Event>,
215229
filters: lrwn_filters::Filters,
216230
) {
217231
trace!("Starting event loop");
218232
while let Some(event) = event_rx.recv().await {
219233
if let Err(e) = handle_event_msg(
220234
border_gateway,
221235
border_gateway_ignore_direct_uplinks,
222-
&event,
236+
event,
223237
&filters,
224238
)
225239
.await
@@ -230,10 +244,10 @@ async fn event_loop(
230244
}
231245
}
232246

233-
async fn mesh_event_loop(border_gateway: bool, mut event_rx: mpsc::UnboundedReceiver<Event>) {
247+
async fn mesh_event_loop(border_gateway: bool, mut event_rx: mpsc::UnboundedReceiver<gw::Event>) {
234248
trace!("Starting mesh event loop");
235249
while let Some(event) = event_rx.recv().await {
236-
if let Err(e) = handle_mesh_event_msg(border_gateway, &event).await {
250+
if let Err(e) = handle_mesh_event_msg(border_gateway, event).await {
237251
error!("Handle mesh event error: {}", e);
238252
continue;
239253
}
@@ -243,20 +257,14 @@ async fn mesh_event_loop(border_gateway: bool, mut event_rx: mpsc::UnboundedRece
243257
async fn handle_event_msg(
244258
border_gateway: bool,
245259
border_gateway_ignore_direct_uplinks: bool,
246-
event: &Event,
260+
event: gw::Event,
247261
filters: &lrwn_filters::Filters,
248262
) -> Result<()> {
249-
trace!(
250-
"Handling event, event: {}, data: {}",
251-
event.0,
252-
hex::encode(&event.1)
253-
);
254-
255-
match event.0.as_str() {
256-
"up" => {
257-
let pl = gw::UplinkFrame::decode(event.1.as_slice())?;
263+
trace!("Handling event, event: {:?}", event,);
258264

259-
if let Some(rx_info) = &pl.rx_info {
265+
match &event.event {
266+
Some(gw::event::Event::UplinkFrame(v)) => {
267+
if let Some(rx_info) = &v.rx_info {
260268
// Filter out frames with invalid CRC.
261269
if rx_info.crc_status() != gw::CrcStatus::CrcOk {
262270
debug!(
@@ -267,7 +275,7 @@ async fn handle_event_msg(
267275
}
268276

269277
// Filter out proprietary payloads.
270-
if pl.phy_payload.first().cloned().unwrap_or_default() & 0xe0 == 0xe0 {
278+
if v.phy_payload.first().cloned().unwrap_or_default() & 0xe0 == 0xe0 {
271279
debug!(
272280
"Discarding proprietary uplink, uplink_id: {}",
273281
rx_info.uplink_id
@@ -282,44 +290,34 @@ async fn handle_event_msg(
282290
}
283291

284292
// Filter uplinks based on DevAddr and JoinEUI filters.
285-
if !lrwn_filters::matches(&pl.phy_payload, filters) {
293+
if !lrwn_filters::matches(&v.phy_payload, filters) {
286294
debug!(
287295
"Discarding uplink because of dev_addr and join_eui filters, uplink_id: {}",
288296
rx_info.uplink_id
289297
)
290298
}
291299

292-
info!("Frame received - {}", helpers::format_uplink(&pl)?);
293-
mesh::handle_uplink(border_gateway, pl).await?;
300+
info!("Frame received - {}", helpers::format_uplink(&v)?);
301+
mesh::handle_uplink(border_gateway, v).await?;
294302
}
295303
}
296-
"stats" => {
304+
Some(gw::event::Event::GatewayStats(_)) => {
297305
if border_gateway {
298-
let pl = gw::GatewayStats::decode(event.1.as_slice())?;
299-
info!("Gateway stats received, gateway_id: {}", pl.gateway_id);
300-
proxy::send_stats(&pl).await?;
306+
proxy::send_event(event).await?;
301307
}
302308
}
303-
_ => {
304-
return Ok(());
305-
}
309+
_ => {}
306310
}
307311

308312
Ok(())
309313
}
310314

311-
async fn handle_mesh_event_msg(border_gateway: bool, event: &Event) -> Result<()> {
312-
trace!(
313-
"Handling mesh event, event: {}, data: {}",
314-
event.0,
315-
hex::encode(&event.1)
316-
);
317-
318-
match event.0.as_str() {
319-
"up" => {
320-
let pl = gw::UplinkFrame::decode(event.1.as_slice())?;
315+
async fn handle_mesh_event_msg(border_gateway: bool, event: gw::Event) -> Result<()> {
316+
trace!("Handling mesh event, event: {:?}", event);
321317

322-
if let Some(rx_info) = &pl.rx_info {
318+
match &event.event {
319+
Some(gw::event::Event::UplinkFrame(v)) => {
320+
if let Some(rx_info) = &v.rx_info {
323321
// Filter out frames with invalid CRC.
324322
if rx_info.crc_status() != gw::CrcStatus::CrcOk {
325323
debug!(
@@ -331,79 +329,79 @@ async fn handle_mesh_event_msg(border_gateway: bool, event: &Event) -> Result<()
331329
}
332330

333331
// The mesh event msg must always be a proprietary payload.
334-
if pl.phy_payload.first().cloned().unwrap_or_default() & 0xe0 == 0xe0 {
335-
info!("Mesh frame received - {}", helpers::format_uplink(&pl)?);
336-
mesh::handle_mesh(border_gateway, pl).await?;
332+
if v.phy_payload.first().cloned().unwrap_or_default() & 0xe0 == 0xe0 {
333+
info!("Mesh frame received - {}", helpers::format_uplink(v)?);
334+
mesh::handle_mesh(border_gateway, v).await?;
337335
}
338336
}
339-
_ => {
340-
return Ok(());
341-
}
337+
_ => {}
342338
}
343339

344340
Ok(())
345341
}
346342

347-
async fn send_command(cmd: &str, b: &[u8]) -> Result<Vec<u8>> {
348-
trace!(
349-
"Sending command, command: {}, data: {}",
350-
cmd,
351-
hex::encode(b)
352-
);
343+
async fn send_command(cmd: gw::Command) -> Result<Vec<u8>> {
344+
trace!("Sending command, command: {:?}", cmd,);
353345

354346
let cmd_chan = CONCENTRATORD_CMD_CHAN
355347
.get()
356348
.ok_or_else(|| anyhow!("CONCENTRATORD_CMD_CHAN is not set"))?;
357349

358350
let (cmd_tx, cmd_rx) = oneshot::channel::<Result<Vec<u8>>>();
359-
cmd_chan.send(((cmd.to_string(), b.to_vec()), cmd_tx))?;
351+
cmd_chan.send((cmd, cmd_tx))?;
360352
cmd_rx.await?
361353
}
362354

363-
async fn send_mesh_command(cmd: &str, b: &[u8]) -> Result<Vec<u8>> {
364-
trace!(
365-
"Sending mesh command, command: {}, data: {}",
366-
cmd,
367-
hex::encode(b)
368-
);
355+
async fn send_mesh_command(cmd: gw::Command) -> Result<Vec<u8>> {
356+
trace!("Sending mesh command, command: {:?}", cmd);
369357

370358
let cmd_chan = MESH_CONCENTRATORD_CMD_CHAN
371359
.get()
372360
.ok_or_else(|| anyhow!("MESH_CONCENTRATORD_CMD_CHAN is not set"))?;
373361

374362
let (cmd_tx, cmd_rx) = oneshot::channel::<Result<Vec<u8>>>();
375-
cmd_chan.send(((cmd.to_string(), b.to_vec()), cmd_tx))?;
363+
cmd_chan.send((cmd, cmd_tx))?;
376364
cmd_rx.await?
377365
}
378366

379-
pub async fn mesh(pl: &gw::DownlinkFrame) -> Result<()> {
380-
info!("Sending mesh frame - {}", helpers::format_downlink(pl)?);
367+
pub async fn mesh(pl: gw::DownlinkFrame) -> Result<()> {
368+
info!("Sending mesh frame - {}", helpers::format_downlink(&pl)?);
369+
let downlink_id = pl.downlink_id;
381370

382371
let tx_ack = {
383-
let b = pl.encode_to_vec();
384-
let resp_b = send_mesh_command("down", &b).await?;
372+
let pl = gw::Command {
373+
command: Some(gw::command::Command::SendDownlinkFrame(pl)),
374+
};
375+
let resp_b = send_mesh_command(pl).await?;
385376
gw::DownlinkTxAck::decode(resp_b.as_slice())?
386377
};
387378
helpers::tx_ack_to_err(&tx_ack)?;
388-
info!("Enqueue acknowledged, downlink_id: {}", pl.downlink_id);
379+
info!("Enqueue acknowledged, downlink_id: {}", downlink_id);
389380
Ok(())
390381
}
391382

392-
pub async fn send_downlink(pl: &gw::DownlinkFrame) -> Result<gw::DownlinkTxAck> {
393-
info!("Sending downlink frame - {}", helpers::format_downlink(pl)?);
383+
pub async fn send_downlink(pl: gw::DownlinkFrame) -> Result<gw::DownlinkTxAck> {
384+
info!(
385+
"Sending downlink frame - {}",
386+
helpers::format_downlink(&pl)?
387+
);
394388

395-
let b = pl.encode_to_vec();
396-
let resp_b = send_command("down", &b).await?;
389+
let pl = gw::Command {
390+
command: Some(gw::command::Command::SendDownlinkFrame(pl)),
391+
};
392+
let resp_b = send_command(pl).await?;
397393
let tx_ack = gw::DownlinkTxAck::decode(resp_b.as_slice())?;
398394

399395
Ok(tx_ack)
400396
}
401397

402-
pub async fn send_gateway_configuration(pl: &gw::GatewayConfiguration) -> Result<()> {
398+
pub async fn send_gateway_configuration(pl: gw::GatewayConfiguration) -> Result<()> {
403399
info!("Sending gateway configuration, version: {}", pl.version);
404400

405-
let b = pl.encode_to_vec();
406-
let _ = send_command("config", &b).await?;
401+
let pl = gw::Command {
402+
command: Some(gw::command::Command::SetGatewayConfiguration(pl)),
403+
};
404+
let _ = send_command(pl).await?;
407405

408406
Ok(())
409407
}
@@ -428,15 +426,9 @@ pub async fn get_gateway_id() -> Result<[u8; 8]> {
428426
.await)
429427
}
430428

431-
fn send_zmq_command(sock: &mut zmq::Socket, cmd: &Command) -> Result<Vec<u8>> {
432-
debug!(
433-
"Sending command to socket, command: {}, payload: {}",
434-
&cmd.0 .0,
435-
hex::encode(&cmd.0 .1)
436-
);
437-
438-
sock.send(&cmd.0 .0, zmq::SNDMORE)?;
439-
sock.send(&cmd.0 .1, 0)?;
429+
fn send_zmq_command(sock: &mut zmq::Socket, cmd: &gw::Command) -> Result<Vec<u8>> {
430+
debug!("Sending command to socket, command: {:?}", &cmd,);
431+
sock.send(cmd.encode_to_vec(), 0)?;
440432

441433
// set poller so that we can timeout after 100ms
442434
let mut items = [sock.as_poll_item(zmq::POLLIN)];
@@ -445,19 +437,13 @@ fn send_zmq_command(sock: &mut zmq::Socket, cmd: &Command) -> Result<Vec<u8>> {
445437
return Err(anyhow!("Could not read down response"));
446438
}
447439

448-
// red tx ack response
440+
// read tx ack response
449441
let resp_b: &[u8] = &sock.recv_bytes(0)?;
450442
Ok(resp_b.to_vec())
451443
}
452444

453-
fn receive_zmq_event(sock: &mut zmq::Socket) -> Result<Event> {
454-
let msg = sock.recv_multipart(0)?;
455-
if msg.len() != 2 {
456-
return Err(anyhow!("Event must have 2 frames"));
457-
}
458-
459-
let event = String::from_utf8(msg[0].to_vec())?;
460-
let b = msg[1].to_vec();
461-
462-
Ok((event, b))
445+
fn receive_zmq_event(sock: &mut zmq::Socket) -> Result<gw::Event> {
446+
let b = sock.recv_bytes(0)?;
447+
let event = gw::Event::decode(b.as_slice())?;
448+
Ok(event)
463449
}

0 commit comments

Comments
 (0)