Skip to content

Commit 4b92cdb

Browse files
authored
write_fixed_all (#184)
Define write_all functions that take fixed buffers. Adds write_fixed_all for TCP and Unix streams. Adds write_fixed_all_at for File. Also adds an example: tcp_listener_fixed_buffers that uses read_fixed and write_fixed_all.
1 parent bb5a00c commit 4b92cdb

File tree

5 files changed

+277
-28
lines changed

5 files changed

+277
-28
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// An example of an echo server using fixed buffers for reading and writing TCP streams.
2+
// A buffer registry size of two is created, to allow a maximum of two simultaneous connections.
3+
4+
use std::{env, iter, net::SocketAddr};
5+
6+
use tokio_uring::{
7+
buf::{fixed::FixedBufRegistry, BoundedBuf},
8+
net::{TcpListener, TcpStream},
9+
}; // BoundedBuf for slice method
10+
11+
// A contrived example, where just two fixed buffers are created.
12+
const POOL_SIZE: usize = 2;
13+
14+
fn main() {
15+
let args: Vec<_> = env::args().collect();
16+
17+
let socket_addr = if args.len() <= 1 {
18+
"127.0.0.1:0"
19+
} else {
20+
args[1].as_ref()
21+
};
22+
let socket_addr: SocketAddr = socket_addr.parse().unwrap();
23+
24+
tokio_uring::start(accept_loop(socket_addr));
25+
}
26+
27+
// Bind to address and accept connections, spawning an echo handler for each connection.
28+
async fn accept_loop(listen_addr: SocketAddr) {
29+
let listener = TcpListener::bind(listen_addr).unwrap();
30+
31+
println!(
32+
"Listening on {}, fixed buffer pool size only {POOL_SIZE}",
33+
listener.local_addr().unwrap()
34+
);
35+
36+
// Other iterators may be passed to FixedBufRegistry::new also.
37+
let registry = FixedBufRegistry::new(iter::repeat(vec![0; 4096]).take(POOL_SIZE));
38+
39+
// Register the buffers with the kernel, asserting the syscall passed.
40+
41+
registry.register().unwrap();
42+
43+
loop {
44+
let (stream, peer) = listener.accept().await.unwrap();
45+
46+
tokio_uring::spawn(echo_handler(stream, peer, registry.clone()));
47+
}
48+
}
49+
50+
// A loop that echoes input to output. Use one fixed buffer for receiving and sending the response
51+
// back. Once the connection is closed, the function returns and the fixed buffer is dropped,
52+
// getting the fixed buffer index returned to the available pool kept by the registry.
53+
async fn echo_handler(stream: TcpStream, peer: SocketAddr, registry: FixedBufRegistry) {
54+
println!("peer {} connected", peer);
55+
56+
// Get one of the two fixed buffers.
57+
// If neither is unavailable, print reason and return immediately, dropping this connection;
58+
// be nice and shutdown the connection before dropping it so the client sees the connection is
59+
// closed immediately.
60+
61+
let mut fbuf = registry.check_out(0);
62+
if fbuf.is_none() {
63+
fbuf = registry.check_out(1);
64+
};
65+
if fbuf.is_none() {
66+
let _ = stream.shutdown(std::net::Shutdown::Write);
67+
println!("peer {} closed, no fixed buffers available", peer);
68+
return;
69+
};
70+
71+
let mut fbuf = fbuf.unwrap();
72+
73+
let mut n = 0;
74+
loop {
75+
// Each time through the loop, use fbuf and then get it back for the next
76+
// iteration.
77+
78+
let (result, fbuf1) = stream.read_fixed(fbuf).await;
79+
fbuf = {
80+
let read = result.unwrap();
81+
if read == 0 {
82+
break;
83+
}
84+
assert_eq!(4096, fbuf1.len()); // To prove a point.
85+
86+
let (res, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await;
87+
88+
let _ = res.unwrap();
89+
println!("peer {} all {} bytes ping-ponged", peer, read);
90+
n += read;
91+
92+
// Important. One of the points of this example.
93+
nslice.into_inner() // Return the buffer we started with.
94+
};
95+
}
96+
let _ = stream.shutdown(std::net::Shutdown::Write);
97+
println!("peer {} closed, {} total ping-ponged", peer, n);
98+
}

src/fs/file.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,77 @@ impl File {
632632
op.await
633633
}
634634

635+
/// Attempts to write an entire buffer into this file at the specified offset.
636+
///
637+
/// This method will continuously call [`write_fixed_at`] until there is no more data
638+
/// to be written or an error is returned.
639+
/// This method will not return until the entire buffer has been successfully
640+
/// written or an error occurs.
641+
///
642+
/// If the buffer contains no data, this will never call [`write_fixed_at`].
643+
///
644+
/// # Return
645+
///
646+
/// The method returns the operation result and the same buffer value passed
647+
/// in as an argument.
648+
///
649+
/// # Errors
650+
///
651+
/// This function will return the first error that [`write_fixed_at`] returns.
652+
///
653+
/// [`write_fixed_at`]: Self::write_fixed_at
654+
pub async fn write_fixed_all_at<T>(&self, buf: T, pos: u64) -> crate::BufResult<(), T>
655+
where
656+
T: BoundedBuf<Buf = FixedBuf>,
657+
{
658+
let orig_bounds = buf.bounds();
659+
let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos).await;
660+
(res, T::from_buf_bounds(buf, orig_bounds))
661+
}
662+
663+
async fn write_fixed_all_at_slice(
664+
&self,
665+
mut buf: Slice<FixedBuf>,
666+
mut pos: u64,
667+
) -> crate::BufResult<(), FixedBuf> {
668+
if pos.checked_add(buf.bytes_init() as u64).is_none() {
669+
return (
670+
Err(io::Error::new(
671+
io::ErrorKind::InvalidInput,
672+
"buffer too large for file",
673+
)),
674+
buf.into_inner(),
675+
);
676+
}
677+
678+
while buf.bytes_init() != 0 {
679+
let (res, slice) = self.write_fixed_at(buf, pos).await;
680+
match res {
681+
Ok(0) => {
682+
return (
683+
Err(io::Error::new(
684+
io::ErrorKind::WriteZero,
685+
"failed to write whole buffer",
686+
)),
687+
slice.into_inner(),
688+
)
689+
}
690+
Ok(n) => {
691+
pos += n as u64;
692+
buf = slice.slice(n..);
693+
}
694+
695+
// No match on an EINTR error is performed because this
696+
// crate's design ensures we are not calling the 'wait' option
697+
// in the ENTER syscall. Only an Enter with 'wait' can generate
698+
// an EINTR according to the io_uring man pages.
699+
Err(e) => return (Err(e), slice.into_inner()),
700+
};
701+
}
702+
703+
(Ok(()), buf.into_inner())
704+
}
705+
635706
/// Attempts to sync all OS-internal metadata to disk.
636707
///
637708
/// This function will attempt to ensure that all in-memory data reaches the

src/io/socket.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,40 @@ impl Socket {
4646
op.await
4747
}
4848

49+
pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
50+
let orig_bounds = buf.bounds();
51+
let (res, buf) = self.write_all_slice(buf.slice_full()).await;
52+
(res, T::from_buf_bounds(buf, orig_bounds))
53+
}
54+
55+
async fn write_all_slice<T: IoBuf>(&self, mut buf: Slice<T>) -> crate::BufResult<(), T> {
56+
while buf.bytes_init() != 0 {
57+
let res = self.write(buf).await;
58+
match res {
59+
(Ok(0), slice) => {
60+
return (
61+
Err(std::io::Error::new(
62+
std::io::ErrorKind::WriteZero,
63+
"failed to write whole buffer",
64+
)),
65+
slice.into_inner(),
66+
)
67+
}
68+
(Ok(n), slice) => {
69+
buf = slice.slice(n..);
70+
}
71+
72+
// No match on an EINTR error is performed because this
73+
// crate's design ensures we are not calling the 'wait' option
74+
// in the ENTER syscall. Only an Enter with 'wait' can generate
75+
// an EINTR according to the io_uring man pages.
76+
(Err(e), slice) => return (Err(e), slice.into_inner()),
77+
}
78+
}
79+
80+
(Ok(()), buf.into_inner())
81+
}
82+
4983
pub(crate) async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
5084
where
5185
T: BoundedBuf<Buf = FixedBuf>,
@@ -54,15 +88,21 @@ impl Socket {
5488
op.await
5589
}
5690

57-
pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
91+
pub(crate) async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
92+
where
93+
T: BoundedBuf<Buf = FixedBuf>,
94+
{
5895
let orig_bounds = buf.bounds();
59-
let (res, buf) = self.write_all_slice(buf.slice_full()).await;
96+
let (res, buf) = self.write_fixed_all_slice(buf.slice_full()).await;
6097
(res, T::from_buf_bounds(buf, orig_bounds))
6198
}
6299

63-
async fn write_all_slice<T: IoBuf>(&self, mut buf: Slice<T>) -> crate::BufResult<(), T> {
100+
async fn write_fixed_all_slice(
101+
&self,
102+
mut buf: Slice<FixedBuf>,
103+
) -> crate::BufResult<(), FixedBuf> {
64104
while buf.bytes_init() != 0 {
65-
let res = self.write(buf).await;
105+
let res = self.write_fixed(buf).await;
66106
match res {
67107
(Ok(0), slice) => {
68108
return (

src/net/tcp/stream.rs

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -100,24 +100,6 @@ impl TcpStream {
100100
self.inner.write(buf).await
101101
}
102102

103-
/// Like [`write`], but using a pre-mapped buffer
104-
/// registered with [`FixedBufRegistry`].
105-
///
106-
/// [`write`]: Self::write
107-
/// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
108-
///
109-
/// # Errors
110-
///
111-
/// In addition to errors that can be reported by `write`,
112-
/// this operation fails if the buffer is not registered in the
113-
/// current `tokio-uring` runtime.
114-
pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
115-
where
116-
T: BoundedBuf<Buf = FixedBuf>,
117-
{
118-
self.inner.write_fixed(buf).await
119-
}
120-
121103
/// Attempts to write an entire buffer to the stream.
122104
///
123105
/// This method will continuously call [`write`] until there is no more data to be
@@ -172,6 +154,44 @@ impl TcpStream {
172154
self.inner.write_all(buf).await
173155
}
174156

157+
/// Like [`write`], but using a pre-mapped buffer
158+
/// registered with [`FixedBufRegistry`].
159+
///
160+
/// [`write`]: Self::write
161+
/// [`FixedBufRegistry`]: crate::buf::fixed::FixedBufRegistry
162+
///
163+
/// # Errors
164+
///
165+
/// In addition to errors that can be reported by `write`,
166+
/// this operation fails if the buffer is not registered in the
167+
/// current `tokio-uring` runtime.
168+
pub async fn write_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
169+
where
170+
T: BoundedBuf<Buf = FixedBuf>,
171+
{
172+
self.inner.write_fixed(buf).await
173+
}
174+
175+
/// Attempts to write an entire buffer to the stream.
176+
///
177+
/// This method will continuously call [`write_fixed`] until there is no more data to be
178+
/// written or an error is returned. This method will not return until the entire
179+
/// buffer has been successfully written or an error has occurred.
180+
///
181+
/// If the buffer contains no data, this will never call [`write_fixed`].
182+
///
183+
/// # Errors
184+
///
185+
/// This function will return the first error that [`write_fixed`] returns.
186+
///
187+
/// [`write_fixed`]: Self::write
188+
pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
189+
where
190+
T: BoundedBuf<Buf = FixedBuf>,
191+
{
192+
self.inner.write_fixed_all(buf).await
193+
}
194+
175195
/// Write data from buffers into this socket returning how many bytes were
176196
/// written.
177197
///

src/net/unix/stream.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,23 @@ impl UnixStream {
102102
self.inner.write(buf).await
103103
}
104104

105+
/// Attempts to write an entire buffer to the stream.
106+
///
107+
/// This method will continuously call [`write`] until there is no more data to be
108+
/// written or an error is returned. This method will not return until the entire
109+
/// buffer has been successfully written or an error has occurred.
110+
///
111+
/// If the buffer contains no data, this will never call [`write`].
112+
///
113+
/// # Errors
114+
///
115+
/// This function will return the first error that [`write`] returns.
116+
///
117+
/// [`write`]: Self::write
118+
pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
119+
self.inner.write_all(buf).await
120+
}
121+
105122
/// Like [`write`], but using a pre-mapped buffer
106123
/// registered with [`FixedBufRegistry`].
107124
///
@@ -122,19 +139,22 @@ impl UnixStream {
122139

123140
/// Attempts to write an entire buffer to the stream.
124141
///
125-
/// This method will continuously call [`write`] until there is no more data to be
142+
/// This method will continuously call [`write_fixed`] until there is no more data to be
126143
/// written or an error is returned. This method will not return until the entire
127144
/// buffer has been successfully written or an error has occurred.
128145
///
129-
/// If the buffer contains no data, this will never call [`write`].
146+
/// If the buffer contains no data, this will never call [`write_fixed`].
130147
///
131148
/// # Errors
132149
///
133-
/// This function will return the first error that [`write`] returns.
150+
/// This function will return the first error that [`write_fixed`] returns.
134151
///
135-
/// [`write`]: Self::write
136-
pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
137-
self.inner.write_all(buf).await
152+
/// [`write_fixed`]: Self::write
153+
pub async fn write_fixed_all<T>(&self, buf: T) -> crate::BufResult<(), T>
154+
where
155+
T: BoundedBuf<Buf = FixedBuf>,
156+
{
157+
self.inner.write_fixed_all(buf).await
138158
}
139159

140160
/// Write data from buffers into this socket returning how many bytes were

0 commit comments

Comments
 (0)