Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde_bytes = "0.11"

[dev-dependencies]
tracing-test = "0.2.4"
tracing-subscriber = "0.3.19"
udp-stream = "0.0.12"

[build-dependencies]
Expand Down
9 changes: 9 additions & 0 deletions examples/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
str::FromStr,
};
use tokio_serial::{SerialPort, SerialPortBuilderExt};
use tracing_subscriber;
use udp_stream::UdpStream;

#[derive(Parser, Debug)]
Expand All @@ -25,6 +26,14 @@ pub enum Port {
Udp(udp_stream::UdpStream),
}

pub fn configure_tracing() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_line_number(true)
.with_file(true)
.init();
}

pub async fn create_port() -> Port {
let args = Args::parse();

Expand Down
4 changes: 3 additions & 1 deletion examples/ping_1d.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod common;
use common::{create_port, Port};
use common::{configure_tracing, create_port, Port};
use std::convert::TryFrom;

use bluerobotics_ping::{
Expand All @@ -12,6 +12,8 @@ use bluerobotics_ping::{

#[tokio::main]
async fn main() -> Result<(), PingError> {
configure_tracing();

println!("Parsing user provided values and creating port...");
let port = create_port().await;

Expand Down
6 changes: 4 additions & 2 deletions examples/ping_360.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod common;
use common::{create_port, Port};
use common::{configure_tracing, create_port, Port};

use bluerobotics_ping::{
device::{Ping360, PingDevice},
Expand All @@ -8,6 +8,8 @@ use bluerobotics_ping::{

#[tokio::main]
async fn main() -> Result<(), PingError> {
configure_tracing();

println!("Parsing user provided values and creating port...");
let port = create_port().await;

Expand Down Expand Up @@ -44,7 +46,7 @@ async fn main() -> Result<(), PingError> {
);

println!("Protocol version is: {version}");
println!("Device information: {device_information:?}");
println!("Device information: {device_information:#?}");

Ok(())
}
6 changes: 4 additions & 2 deletions examples/ping_common.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod common;
use common::{create_port, Port};
use common::{configure_tracing, create_port, Port};

use bluerobotics_ping::{
common::Device, device::PingDevice, error::PingError, message::ProtocolMessage,
};

#[tokio::main]
async fn main() -> Result<(), PingError> {
configure_tracing();

println!("Parsing user provided values and creating port...");
let port = create_port().await;

Expand Down Expand Up @@ -54,7 +56,7 @@ async fn main() -> Result<(), PingError> {
);

println!("Protocol version is: {version}");
println!("Device information: \n {device_information_struct:?}");
println!("Device information: \n {device_information_struct:#?}");

// Read the same 2 packages from previous requests, but from subscriber task, all above tasks have success, we did it!
println!("Checking if subscriber returns with 2 same packages...");
Expand Down
7 changes: 5 additions & 2 deletions src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{decoder::Decoder as PingDecoder, error::PingError, message::ProtocolMessage};
use bytes::{Buf, BytesMut};
use tokio_util::codec::{Decoder, Encoder};
use tracing::debug;

pub struct PingCodec {
decoder: PingDecoder,
Expand All @@ -27,8 +28,10 @@ impl Decoder for PingCodec {
return Ok(None);
};

match decoder.parse_byte(*byte) {
crate::decoder::DecoderResult::InProgress => {
let state = decoder.parse_byte(*byte);
debug!("Decoder state: {:?}", state);
match state {
crate::decoder::DecoderResult::InProgress(_) => {
consumed += 1;
if consumed == src.len() {
src.advance(consumed)
Expand Down
36 changes: 19 additions & 17 deletions src/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tracing::info;
use tracing::debug;

use crate::message::{ProtocolMessage, HEADER};
#[cfg(feature = "serde")]
Expand All @@ -12,14 +12,7 @@ pub enum ParseError {
ChecksumError(ProtocolMessage),
}

#[derive(Debug)]
pub enum DecoderResult {
Success(ProtocolMessage),
InProgress,
Error(ParseError),
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum DecoderState {
AwaitingStart1,
AwaitingStart2,
Expand All @@ -28,6 +21,13 @@ pub enum DecoderState {
ReadingChecksum,
}

#[derive(Debug)]
pub enum DecoderResult {
Success(ProtocolMessage),
InProgress(DecoderState),
Error(ParseError),
}

pub struct Decoder {
pub state: DecoderState,
buffer: Vec<u8>,
Expand All @@ -44,19 +44,21 @@ impl Decoder {
}

pub fn parse_byte(&mut self, byte: u8) -> DecoderResult {
match self.state {
debug!("Parsing byte: 0x{byte:02x} ({byte})");
let state = &self.state;
match state {
DecoderState::AwaitingStart1 => {
if byte == HEADER[0] {
self.state = DecoderState::AwaitingStart2;
return DecoderResult::InProgress;
return DecoderResult::InProgress(self.state.clone());
}
return DecoderResult::Error(ParseError::InvalidStartByte);
}
DecoderState::AwaitingStart2 => {
if byte == HEADER[1] {
self.state = DecoderState::ReadingHeader;
self.buffer.clear();
return DecoderResult::InProgress;
return DecoderResult::InProgress(self.state.clone());
}
self.state = DecoderState::AwaitingStart1;
return DecoderResult::Error(ParseError::InvalidStartByte);
Expand All @@ -78,12 +80,12 @@ impl Decoder {
}
self.buffer.clear();
}
return DecoderResult::InProgress;
return DecoderResult::InProgress(self.state.clone());
}
DecoderState::ReadingPayload => {
self.buffer.push(byte);
info!(
"DecoderState : ReadingPayload {:?} {:?}",
debug!(
"DecoderState : ReadingPayload {:?} of {:?}",
self.buffer.len(),
self.message.payload_length
);
Expand All @@ -92,7 +94,7 @@ impl Decoder {
self.state = DecoderState::ReadingChecksum;
self.buffer.clear();
}
return DecoderResult::InProgress;
return DecoderResult::InProgress(self.state.clone());
}
DecoderState::ReadingChecksum => {
self.buffer.push(byte);
Expand All @@ -106,7 +108,7 @@ impl Decoder {
}
return DecoderResult::Success(message);
}
return DecoderResult::InProgress;
return DecoderResult::InProgress(self.state.clone());
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::{
task::JoinHandle,
};
use tokio_util::codec::{Decoder, Framed};
use tracing::{error, info};
use tracing::{error, info, trace};

use crate::{
codec::PingCodec,
Expand Down Expand Up @@ -90,7 +90,7 @@ impl Common {
};
}
Err(e) => {
error!("{e:?}");
trace!("{e:?}");
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions tests/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::TryFrom;
use bluerobotics_ping::common::Messages as common_messages;
use bluerobotics_ping::decoder::*;
use bluerobotics_ping::{common, Messages};
use tracing::info;
use tracing::debug;
use tracing_test::traced_test;

#[traced_test]
Expand All @@ -27,32 +27,31 @@ fn test_simple_deserialization() {
assert_eq!(general_request, parsed);

for byte in &buffer[0..buffer.len() - 2] {
info!("byte : {byte}, {:?}", &decoder.state);
debug!("byte : {byte}, {:?}", &decoder.state);
assert!(matches!(
decoder.parse_byte(byte.clone()),
DecoderResult::InProgress
DecoderResult::InProgress(_)
));
}
assert!(matches!(
decoder.parse_byte(buffer[buffer.len() - 2]),
DecoderResult::InProgress
DecoderResult::InProgress(_)
));
let DecoderResult::Success(_message) = decoder.parse_byte(buffer[buffer.len() - 1]) else {
info!("Decoder state: {:?}", decoder.state);
debug!("Decoder state: {:?}", decoder.state);
panic!("Failed to use decoder with valid message");
};

// Retry with a wrong receipt CRC
for byte in &buffer[0..buffer.len() - 2] {
dbg!(byte, &decoder.state);
assert!(matches!(
decoder.parse_byte(byte.clone()),
DecoderResult::InProgress
DecoderResult::InProgress(_)
));
}
assert!(matches!(
decoder.parse_byte(buffer[buffer.len() - 2]),
DecoderResult::InProgress
DecoderResult::InProgress(_)
));
assert!(matches!(
decoder.parse_byte(0x01), // force a crc error
Expand Down