Skip to content

Commit 00bcf36

Browse files
committed
Update chirpstack_api dependency.
1 parent a7ee0fd commit 00bcf36

File tree

5 files changed

+61
-83
lines changed

5 files changed

+61
-83
lines changed

Cargo.lock

Lines changed: 23 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
publish = false
1010

1111
[dependencies]
12-
chirpstack_api = { version = "4.12", default-features = false }
12+
chirpstack_api = { version = "4.13.0-test.1", default-features = false }
1313
serde_json = "1.0"
1414
zmq = "0.10"
1515
clap = { version = "4.5", default-features = false, features = [
@@ -28,10 +28,9 @@
2828
rand = "0.9"
2929
chrono = "0.4"
3030
base64 = "0.22"
31-
prost = "0.13"
32-
prost-types = "0.13"
3331
prometheus-client = "0.23"
3432
anyhow = "1.0"
33+
thiserror = "1.0"
3534

3635
[package.metadata.deb]
3736
assets = [

src/events.rs

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
use std::time::Duration;
22

33
use anyhow::Result;
4-
use prost::Message;
4+
use chirpstack_api::{gw, prost::Message};
55

66
use super::socket::ZMQ_CONTEXT;
77

8+
#[derive(thiserror::Error, Debug)]
9+
pub enum Error {
10+
#[error("Timeout")]
11+
Timeout,
12+
13+
#[error(transparent)]
14+
Anyhow(#[from] anyhow::Error),
15+
}
16+
817
pub fn get_socket(endpoint: &str) -> Result<zmq::Socket, zmq::Error> {
918
info!(
1019
"Creating new socket for receiving events, endpoint: {}",
@@ -19,23 +28,6 @@ pub fn get_socket(endpoint: &str) -> Result<zmq::Socket, zmq::Error> {
1928
Ok(sock)
2029
}
2130

22-
pub enum Event {
23-
// Reading event timed out.
24-
Timeout,
25-
26-
// Error reading event.
27-
Error(String),
28-
29-
// Unknown event.
30-
Unknown(String),
31-
32-
// Uplink event.
33-
Uplink(Box<chirpstack_api::gw::UplinkFrame>),
34-
35-
// Stats event.
36-
Stats(Box<chirpstack_api::gw::GatewayStats>),
37-
}
38-
3931
pub struct Reader<'a> {
4032
sub_sock: &'a zmq::Socket,
4133
timeout: Duration,
@@ -48,48 +40,20 @@ impl<'a> Reader<'a> {
4840
}
4941

5042
impl Iterator for Reader<'_> {
51-
type Item = Event;
43+
type Item = Result<gw::Event, Error>;
5244

53-
fn next(&mut self) -> Option<Event> {
45+
fn next(&mut self) -> Option<Result<gw::Event, Error>> {
5446
// set poller so that we can timeout
5547
let mut items = [self.sub_sock.as_poll_item(zmq::POLLIN)];
5648
zmq::poll(&mut items, self.timeout.as_millis() as i64).unwrap();
5749
if !items[0].is_readable() {
58-
return Some(Event::Timeout);
50+
return Some(Err(Error::Timeout));
5951
}
6052

61-
let msg = self.sub_sock.recv_multipart(0).unwrap();
62-
match handle_message(msg) {
63-
Ok(v) => Some(v),
64-
Err(err) => Some(Event::Error(err.to_string())),
53+
let b = self.sub_sock.recv_bytes(0).unwrap();
54+
match gw::Event::decode(b.as_slice()).map_err(|e| Error::Anyhow(anyhow::Error::new(e))) {
55+
Ok(v) => Some(Ok(v)),
56+
Err(e) => Some(Err(e)),
6557
}
6658
}
6759
}
68-
69-
fn handle_message(msg: Vec<Vec<u8>>) -> Result<Event> {
70-
if msg.len() != 2 {
71-
return Err(anyhow!("Event must have two frames"));
72-
}
73-
74-
let event = String::from_utf8(msg[0].clone())?;
75-
76-
Ok(match event.as_str() {
77-
"up" => match parse_up(&msg[1]) {
78-
Ok(v) => Event::Uplink(Box::new(v)),
79-
Err(err) => Event::Error(err.to_string()),
80-
},
81-
"stats" => match parse_stats(&msg[1]) {
82-
Ok(v) => Event::Stats(Box::new(v)),
83-
Err(err) => Event::Error(err.to_string()),
84-
},
85-
_ => Event::Unknown(event),
86-
})
87-
}
88-
89-
fn parse_up(msg: &[u8]) -> Result<chirpstack_api::gw::UplinkFrame> {
90-
Ok(chirpstack_api::gw::UplinkFrame::decode(msg)?)
91-
}
92-
93-
fn parse_stats(msg: &[u8]) -> Result<chirpstack_api::gw::GatewayStats> {
94-
Ok(chirpstack_api::gw::GatewayStats::decode(msg)?)
95-
}

src/forwarder.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use std::sync::{Arc, Mutex};
44
use std::{thread, time};
55

66
use anyhow::Result;
7-
use chirpstack_api::gw;
8-
use prost::Message;
7+
use chirpstack_api::{gw, prost::Message};
98
use rand::Rng;
109

1110
use super::commands;
@@ -318,21 +317,17 @@ fn events_loop(state: Arc<State>, stop_receive: Receiver<signals::Signal>) {
318317
}
319318

320319
match cmd {
321-
events::Event::Uplink(up) => {
322-
events_up(&state, *up);
323-
}
324-
events::Event::Stats(stats) => {
325-
events_stats(&state, *stats);
326-
}
327-
events::Event::Timeout => {
328-
continue;
329-
}
330-
events::Event::Error(err) => {
331-
error!("Read event error, error: {}", err);
332-
}
333-
events::Event::Unknown(event) => {
334-
warn!("Unknown event received, event: {}", event);
335-
}
320+
Ok(v) => match v.event {
321+
Some(gw::event::Event::UplinkFrame(pl)) => events_up(&state, pl),
322+
Some(gw::event::Event::GatewayStats(pl)) => events_stats(&state, pl),
323+
_ => continue,
324+
},
325+
Err(e) => match e {
326+
events::Error::Timeout => continue,
327+
_ => {
328+
warn!("Read event error, error: {}", e);
329+
}
330+
},
336331
}
337332
}
338333
}
@@ -472,12 +467,13 @@ fn handle_pull_resp(state: &Arc<State>, data: &[u8]) -> Result<()> {
472467
}
473468
};
474469

475-
let mut buf = Vec::new();
476-
pl.encode(&mut buf).unwrap();
470+
let pl = gw::Command {
471+
command: Some(gw::command::Command::SendDownlinkFrame(pl)),
472+
};
473+
let b = pl.encode_to_vec();
477474

478475
// send 'down' command with payload
479-
sock.send("down", zmq::SNDMORE).unwrap();
480-
sock.send(buf, 0).unwrap();
476+
sock.send(b, 0).unwrap();
481477

482478
// set poller so that we can timeout after 100ms
483479
let mut items = [sock.as_poll_item(zmq::POLLIN)];

src/structs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ use std::time::Duration;
33
use std::time::SystemTime;
44

55
use anyhow::Result;
6-
use base64::{engine::general_purpose, Engine as _};
6+
use base64::{Engine as _, engine::general_purpose};
77
use chrono::{DateTime, Utc};
88
use serde::de::Error;
99
use serde::{Deserialize, Deserializer, Serialize, Serializer};
1010
use serde_json::Value;
1111

12-
use chirpstack_api::gw;
12+
use chirpstack_api::{gw, prost_types};
1313

1414
const PROTOCOL_VERSION: u8 = 0x02;
1515

0 commit comments

Comments
 (0)