Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
718 changes: 485 additions & 233 deletions Cargo.lock

Large diffs are not rendered by default.

39 changes: 28 additions & 11 deletions nt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
[package]
name = "nt"
version = "0.1.0"
name = "nt-rs"
version = "0.0.1"
edition = "2021"

license = "MIT"
description = "An NT 4.0 (and parially 4.1) client"
homepage = "https://github.com/BlueZeeKing/robotrs"
repository = "https://github.com/BlueZeeKing/robotrs"
readme = "README.md"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[build-dependencies]
anyhow = "1.0.75"
bindgen = "0.66.1"
bytes = "1.4.0"
reqwest = "0.11.20"
tempfile = "3.8.0"
tokio = { version = "1.32.0", features = ["full"] }
zip = "0.6.6"
build-utils = { path = "../build-utils" }
[features]
default = ["tokio"]
tokio = ["dep:tokio", "dep:http", "dep:tokio-tungstenite", "dep:tungstenite"]
wasm = ["dep:web-sys", "dep:wasm-bindgen-futures", "dep:fluvio-wasm-timer"]

[dependencies]
flume = "0.11.0"
futures = "0.3.30"
rmp = "0.8.12"
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
thiserror = "1.0.53"
http = { version = "1.0.0", optional = true }
tokio = { version = "1.35.1", features = ["full"], optional = true }
tokio-tungstenite = { version = "0.21.0", optional = true }
tungstenite = { version = "0.21.0", optional = true }
web-sys = { version = "0.3.66", features = ["WebSocket", "BinaryType", "Blob", "MessageEvent", "Window", "Performance", "console"], optional = true }
wasm-bindgen-futures = { version = "0.4.39", optional = true }
fluvio-wasm-timer = { version = "0.2.5", optional = true }
event-listener = "5.0.0"
1 change: 1 addition & 0 deletions nt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# NT-rs
49 changes: 0 additions & 49 deletions nt/build.rs

This file was deleted.

4 changes: 4 additions & 0 deletions nt/src/backends.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#[cfg(feature = "tokio")]
pub mod tokio;
#[cfg(feature = "wasm")]
pub mod wasm;
162 changes: 162 additions & 0 deletions nt/src/backends/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use core::panic;
use flume::RecvError;
use futures::{sink::SinkExt, stream::StreamExt, FutureExt};
use http::{header::SEC_WEBSOCKET_PROTOCOL, uri::InvalidUri, Request};
use std::{io::Cursor, str::FromStr, time::Duration};
use tokio::{select, task::JoinHandle};
use tokio_tungstenite::connect_async;
use tungstenite::{handshake::client::generate_key, Message};

use http::Uri;

use crate::{
types::{BinaryMessage, BinaryMessageError, TextMessage},
Backend, Error, Result, Timer,
};

#[derive(Debug, Error)]
pub enum TokioError {
#[error("Encountered an http error: {0}")]
Http(#[from] http::Error),
#[error("Encountered a websocket error: {0}")]
Websocket(#[from] tungstenite::Error),
#[error("Invalid uri error: {0}")]
Uri(#[from] InvalidUri),
#[error("Server does not support the nt v4.0 protocol")]
UnsupportedServer,
#[error("Error while encoding or decoding a binary message: {0}")]
BinaryMessage(#[from] BinaryMessageError),
#[error("Error while encoding or decoding a text message: {0}")]
TextMessage(#[from] serde_json::Error),
#[error("Error while sending a message")]
Send,
#[error("Error while receiving a message: {0}")]
Receive(#[from] RecvError),
#[error("Other error occured: {0}")]
Other(Box<dyn std::error::Error + 'static + Send>),
#[error("Encountered an unknown frame")]
UnknownFrame,
#[error("Encountered an incorrect type")]
Type,
}

pub struct TokioBackend {}

impl Backend for TokioBackend {
type Output = JoinHandle<()>;
type Error = TokioError;

fn create(
host: &str,
name: &str,
send: flume::Sender<Result<crate::NtMessage>>,
receive: flume::Receiver<crate::NtMessage>,
) -> Result<Self::Output, TokioError> {
let uri = Uri::from_str(&format!("ws://{host}:5810/nt/{name}"))?;

let send2 = send.clone();

Ok(tokio::spawn(async move {
let req = Request::builder()
.method("GET")
.header("Host", uri.host().unwrap())
.header("Connection", "Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", generate_key())
.header("Sec-WebSocket-Protocol", "networktables.first.wpi.edu")
.uri(uri)
.body(())?;

let (mut connection, res) = connect_async(req.clone()).await?;

if res
.headers()
.get(SEC_WEBSOCKET_PROTOCOL)
.ok_or(TokioError::UnsupportedServer)?
!= "networktables.first.wpi.edu"
{
return Err(TokioError::UnsupportedServer);
}


loop {
select! {
message = receive.recv_async() => {
let message = message?;

match message {
crate::NtMessage::Text(msg) => connection.send(Message::Text(serde_json::to_string(&[msg])?)).await?,
crate::NtMessage::Binary(msg) => {
let mut buf = Vec::new();
msg.to_writer(&mut buf)?;
connection.send(Message::Binary(buf)).await?
}
crate::NtMessage::Reconnect(_) => {
loop {
if let Ok((mut new_con, _)) = connect_async(req.clone()).await {
std::mem::swap(&mut new_con, &mut connection);

send.send(Ok(crate::NtMessage::Reconnect(Some(receive.drain().collect::<Vec<_>>())))).map_err(|_| TokioError::Send)?;

break;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
},
}
}
message = connection.next() => {
if matches!(message, Some(Err(_))) || message.is_none() {
loop {
if let Ok((mut new_con, _)) = connect_async(req.clone()).await {
std::mem::swap(&mut new_con, &mut connection);

send.send(Ok(crate::NtMessage::Reconnect(Some(receive.drain().collect::<Vec<_>>())))).map_err(|_| TokioError::Send)?;

receive.drain().for_each(|_| {});

break;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

continue;
}

let message = message.unwrap()?;

match message {
Message::Text(msg) => {
let msgs = serde_json::from_str::<Vec<TextMessage>>(&msg)?;
for msg in msgs {
send.send(Ok(crate::NtMessage::Text(msg))).map_err(|_| TokioError::Send)?;
}
}
Message::Binary(msg) => {
let mut cursor = Cursor::new(msg);

while (cursor.position() as usize) < cursor.get_ref().len() {
send.send(Ok(crate::NtMessage::Binary(BinaryMessage::from_reader(&mut cursor)?))).map_err(|_| TokioError::Send)?;
}
}
_ => return <Result<(), TokioError>>::Err(TokioError::UnknownFrame),
}
}
}
}
}.map(move |out| {
if let Err(err) = out {
let _res = send2.send(Err(Error::Other(Box::new(err))));
}
})))
}
}

impl Timer for TokioBackend {
async fn time(duration: std::time::Duration) {
tokio::time::sleep(duration).await;
}
}
Loading