From d850634984a1d6f5e5ebec00c37159b47a259915 Mon Sep 17 00:00:00 2001 From: Tnze Date: Fri, 24 Jan 2025 21:42:27 +0800 Subject: [PATCH 1/2] Implement embedded_io_async::Read for SerialPort --- Cargo.toml | 1 + src/io.rs | 64 +++++++++++++++++++++++++++++++++++++++++++++- src/serial_port.rs | 7 +++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1ce1eba..f403f86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ embedded-hal = "0.2.4" nb = "1" usb-device = "0.3" embedded-io = "0.6" +embedded-io-async = "0.6.1" diff --git a/src/io.rs b/src/io.rs index 0b965e4..84592d0 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,5 +1,10 @@ use super::SerialPort; -use core::borrow::BorrowMut; +use core::{ + borrow::BorrowMut, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use usb_device::bus::UsbBus; #[derive(Debug)] @@ -85,3 +90,60 @@ impl, WS: BorrowMut<[u8]>> embedded_io::WriteRe Ok(self.write_buf.available_write() != 0) } } + +impl embedded_io_async::Read for SerialPort<'_, B, RS, WS> +where + B: UsbBus, + RS: BorrowMut<[u8]>, + WS: BorrowMut<[u8]>, +{ + async fn read(&mut self, buffer: &mut [u8]) -> Result { + AsyncRead { + serial_port: self, + buffer, + } + .await + } +} + +struct AsyncRead<'a, 'b, 'c, B, RS, WS> +where + B: UsbBus, + RS: BorrowMut<[u8]>, + WS: BorrowMut<[u8]>, +{ + serial_port: &'a mut SerialPort<'b, B, RS, WS>, + buffer: &'c mut [u8], +} + +impl<'a, 'b, 'c, B, RS, WS> Future for AsyncRead<'a, 'b, 'c, B, RS, WS> +where + B: UsbBus, + RS: BorrowMut<[u8]>, + WS: BorrowMut<[u8]>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + match s.serial_port.read(&mut s.buffer) { + Ok(len) => Poll::Ready(Ok(len)), + Err(usb_device::UsbError::WouldBlock) => { + if s.buffer.len() == 0 { + Poll::Ready(Ok(0)) + } else { + let prev = s.serial_port.read_waker.replace(cx.waker().clone()); + assert!( + prev.is_none(), + concat!( + "The ownership is borrowed though the mutable reference.", + "Impossable to run twice at the same time" + ) + ); + Poll::Pending + } + } + Err(err) => Poll::Ready(Err(Error(err))), + } + } +} diff --git a/src/serial_port.rs b/src/serial_port.rs index 1e05e89..86e5e40 100644 --- a/src/serial_port.rs +++ b/src/serial_port.rs @@ -2,6 +2,7 @@ use crate::buffer::{Buffer, DefaultBufferStore}; use crate::cdc_acm::*; use core::borrow::BorrowMut; use core::slice; +use core::task::Waker; use usb_device::class_prelude::*; use usb_device::descriptor::lang_id::LangID; use usb_device::Result; @@ -20,6 +21,8 @@ where pub(crate) read_buf: Buffer, pub(crate) write_buf: Buffer, write_state: WriteState, + + pub(crate) read_waker: Option, } /// If this many full size packets have been sent in a row, a short packet will be sent so that the @@ -95,6 +98,7 @@ where read_buf: Buffer::new(read_store), write_buf: Buffer::new(write_store), write_state: WriteState::Idle, + read_waker: None, } } @@ -151,6 +155,9 @@ where Err(err) => Err(err), } })?; + if let Some(read_waker) = self.read_waker.take() { + read_waker.wake(); + } Ok(()) } From 3e31f1d6b7f66189ba0d8214ca673cf35bb1f009 Mon Sep 17 00:00:00 2001 From: Tnze Date: Fri, 24 Jan 2025 22:13:15 +0800 Subject: [PATCH 2/2] Implement embedded_io_async::Write for SerialPort --- src/io.rs | 69 ++++++++++++++++++++++++++++++++++++++++------ src/serial_port.rs | 5 ++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/io.rs b/src/io.rs index 84592d0..dc083dd 100644 --- a/src/io.rs +++ b/src/io.rs @@ -91,6 +91,61 @@ impl, WS: BorrowMut<[u8]>> embedded_io::WriteRe } } +impl embedded_io_async::Write for SerialPort<'_, B, RS, WS> +where + B: UsbBus, + RS: BorrowMut<[u8]>, + WS: BorrowMut<[u8]>, +{ + async fn write(&mut self, buffer: &[u8]) -> core::result::Result { + if buffer.is_empty() { + return Ok(0); + } + AsyncWrite { + serial_port: self, + buffer, + } + .await + } + + // async fn flush(&mut self) -> core::result::Result<(), Self::Error> { + // todo!() + // } +} +struct AsyncWrite<'a, 'b, 'c, B, RS, WS> +where + B: UsbBus, + RS: BorrowMut<[u8]>, + WS: BorrowMut<[u8]>, +{ + serial_port: &'a mut SerialPort<'b, B, RS, WS>, + buffer: &'c [u8], +} + +impl<'a, 'b, 'c, B, RS, WS> Future for AsyncWrite<'a, 'b, 'c, B, RS, WS> +where + B: UsbBus, + RS: BorrowMut<[u8]>, + WS: BorrowMut<[u8]>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let s = self.get_mut(); + match s.serial_port.write(&s.buffer) { + Ok(n) => Poll::Ready(Ok(n)), + Err(usb_device::UsbError::WouldBlock) => { + // No need to worry about overriding. + // The ownership is borrowed though the mutable reference, + // so it's impossable to run twice at the same time. + s.serial_port.write_waker = Some(cx.waker().clone()); + Poll::Pending + } + Err(err) => Poll::Ready(Err(Error(err))), + } + } +} + impl embedded_io_async::Read for SerialPort<'_, B, RS, WS> where B: UsbBus, @@ -127,19 +182,15 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let s = self.get_mut(); match s.serial_port.read(&mut s.buffer) { - Ok(len) => Poll::Ready(Ok(len)), + Ok(n) => Poll::Ready(Ok(n)), Err(usb_device::UsbError::WouldBlock) => { if s.buffer.len() == 0 { Poll::Ready(Ok(0)) } else { - let prev = s.serial_port.read_waker.replace(cx.waker().clone()); - assert!( - prev.is_none(), - concat!( - "The ownership is borrowed though the mutable reference.", - "Impossable to run twice at the same time" - ) - ); + // No need to worry about overriding. + // The ownership is borrowed though the mutable reference, + // so it's impossable to run twice at the same time. + s.serial_port.read_waker = Some(cx.waker().clone()); Poll::Pending } } diff --git a/src/serial_port.rs b/src/serial_port.rs index 86e5e40..57e8d18 100644 --- a/src/serial_port.rs +++ b/src/serial_port.rs @@ -23,6 +23,7 @@ where write_state: WriteState, pub(crate) read_waker: Option, + pub(crate) write_waker: Option, } /// If this many full size packets have been sent in a row, a short packet will be sent so that the @@ -99,6 +100,7 @@ where write_buf: Buffer::new(write_store), write_state: WriteState::Idle, read_waker: None, + write_waker: None, } } @@ -265,6 +267,9 @@ where fn endpoint_in_complete(&mut self, addr: EndpointAddress) { if addr == self.inner.write_ep().address() { self.flush().ok(); + if let Some(write_waker) = self.write_waker.take() { + write_waker.wake(); + } } }