Skip to content

Commit 97e7830

Browse files
udoprogDarksonn
andauthored
net: provide NamedPipe{Client, Server} types and builders (#3760)
This builds on tokio-rs/mio#1351 and introduces the tokio::net::windows::named_pipe module which provides low level types for building and communicating asynchronously over windows named pipes. Named pipes require the `net` feature flag to be enabled on Windows. Co-authored-by: Alice Ryhl <alice@ryhl.io>
1 parent 606206e commit 97e7830

File tree

13 files changed

+1740
-0
lines changed

13 files changed

+1740
-0
lines changed

examples/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ serde_json = "1.0"
2222
httparse = "1.0"
2323
time = "0.1"
2424
once_cell = "1.5.2"
25+
rand = "0.8.3"
2526

27+
[target.'cfg(windows)'.dev-dependencies.winapi]
28+
version = "0.3.8"
2629

2730
[[example]]
2831
name = "chat"
@@ -76,3 +79,11 @@ path = "custom-executor.rs"
7679
[[example]]
7780
name = "custom-executor-tokio-context"
7881
path = "custom-executor-tokio-context.rs"
82+
83+
[[example]]
84+
name = "named-pipe"
85+
path = "named-pipe.rs"
86+
87+
[[example]]
88+
name = "named-pipe-multi-client"
89+
path = "named-pipe-multi-client.rs"

examples/named-pipe-multi-client.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use std::io;
2+
3+
#[cfg(windows)]
4+
async fn windows_main() -> io::Result<()> {
5+
use std::time::Duration;
6+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
7+
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
8+
use tokio::time;
9+
use winapi::shared::winerror;
10+
11+
const PIPE_NAME: &str = r"\\.\pipe\named-pipe-multi-client";
12+
const N: usize = 10;
13+
14+
// The first server needs to be constructed early so that clients can
15+
// be correctly connected. Otherwise a waiting client will error.
16+
//
17+
// Here we also make use of `first_pipe_instance`, which will ensure
18+
// that there are no other servers up and running already.
19+
let mut server = ServerOptions::new()
20+
.first_pipe_instance(true)
21+
.create(PIPE_NAME)?;
22+
23+
let server = tokio::spawn(async move {
24+
// Artificial workload.
25+
time::sleep(Duration::from_secs(1)).await;
26+
27+
for _ in 0..N {
28+
// Wait for client to connect.
29+
server.connect().await?;
30+
let mut inner = server;
31+
32+
// Construct the next server to be connected before sending the one
33+
// we already have of onto a task. This ensures that the server
34+
// isn't closed (after it's done in the task) before a new one is
35+
// available. Otherwise the client might error with
36+
// `io::ErrorKind::NotFound`.
37+
server = ServerOptions::new().create(PIPE_NAME)?;
38+
39+
let _ = tokio::spawn(async move {
40+
let mut buf = vec![0u8; 4];
41+
inner.read_exact(&mut buf).await?;
42+
inner.write_all(b"pong").await?;
43+
Ok::<_, io::Error>(())
44+
});
45+
}
46+
47+
Ok::<_, io::Error>(())
48+
});
49+
50+
let mut clients = Vec::new();
51+
52+
for _ in 0..N {
53+
clients.push(tokio::spawn(async move {
54+
// This showcases a generic connect loop.
55+
//
56+
// We immediately try to create a client, if it's not found or
57+
// the pipe is busy we use the specialized wait function on the
58+
// client builder.
59+
let mut client = loop {
60+
match ClientOptions::new().open(PIPE_NAME) {
61+
Ok(client) => break client,
62+
Err(e) if e.raw_os_error() == Some(winerror::ERROR_PIPE_BUSY as i32) => (),
63+
Err(e) => return Err(e),
64+
}
65+
66+
time::sleep(Duration::from_millis(5)).await;
67+
};
68+
69+
let mut buf = [0u8; 4];
70+
client.write_all(b"ping").await?;
71+
client.read_exact(&mut buf).await?;
72+
Ok::<_, io::Error>(buf)
73+
}));
74+
}
75+
76+
for client in clients {
77+
let result = client.await?;
78+
assert_eq!(&result?[..], b"pong");
79+
}
80+
81+
server.await??;
82+
Ok(())
83+
}
84+
85+
#[tokio::main]
86+
async fn main() -> io::Result<()> {
87+
#[cfg(windows)]
88+
{
89+
windows_main().await?;
90+
}
91+
92+
#[cfg(not(windows))]
93+
{
94+
println!("Named pipes are only supported on Windows!");
95+
}
96+
97+
Ok(())
98+
}

examples/named-pipe.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
use std::io;
2+
3+
#[cfg(windows)]
4+
async fn windows_main() -> io::Result<()> {
5+
use tokio::io::AsyncWriteExt;
6+
use tokio::io::{AsyncBufReadExt, BufReader};
7+
use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
8+
9+
const PIPE_NAME: &str = r"\\.\pipe\named-pipe-single-client";
10+
11+
let server = ServerOptions::new().create(PIPE_NAME)?;
12+
13+
let server = tokio::spawn(async move {
14+
// Note: we wait for a client to connect.
15+
server.connect().await?;
16+
17+
let mut server = BufReader::new(server);
18+
19+
let mut buf = String::new();
20+
server.read_line(&mut buf).await?;
21+
server.write_all(b"pong\n").await?;
22+
Ok::<_, io::Error>(buf)
23+
});
24+
25+
let client = tokio::spawn(async move {
26+
// There's no need to use a connect loop here, since we know that the
27+
// server is already up - `open` was called before spawning any of the
28+
// tasks.
29+
let client = ClientOptions::new().open(PIPE_NAME)?;
30+
31+
let mut client = BufReader::new(client);
32+
33+
let mut buf = String::new();
34+
client.write_all(b"ping\n").await?;
35+
client.read_line(&mut buf).await?;
36+
Ok::<_, io::Error>(buf)
37+
});
38+
39+
let (server, client) = tokio::try_join!(server, client)?;
40+
41+
assert_eq!(server?, "ping\n");
42+
assert_eq!(client?, "pong\n");
43+
44+
Ok(())
45+
}
46+
47+
#[tokio::main]
48+
async fn main() -> io::Result<()> {
49+
#[cfg(windows)]
50+
{
51+
windows_main().await?;
52+
}
53+
54+
#[cfg(not(windows))]
55+
{
56+
println!("Named pipes are only supported on Windows!");
57+
}
58+
59+
Ok(())
60+
}

tokio/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ net = [
5454
"mio/tcp",
5555
"mio/udp",
5656
"mio/uds",
57+
"winapi/namedpipeapi",
5758
]
5859
process = [
5960
"bytes",
@@ -115,6 +116,9 @@ version = "0.3.8"
115116
default-features = false
116117
optional = true
117118

119+
[target.'cfg(windows)'.dev-dependencies.ntapi]
120+
version = "0.3.6"
121+
118122
[dev-dependencies]
119123
tokio-test = { version = "0.4.0", path = "../tokio-test" }
120124
tokio-stream = { version = "0.1", path = "../tokio-stream" }

tokio/src/doc/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//! Types which are documented locally in the Tokio crate, but does not actually
2+
//! live here.
3+
//!
4+
//! **Note** this module is only visible on docs.rs, you cannot use it directly
5+
//! in your own code.
6+
7+
/// The name of a type which is not defined here.
8+
///
9+
/// This is typically used as an alias for another type, like so:
10+
///
11+
/// ```rust,ignore
12+
/// /// See [some::other::location](https://example.com).
13+
/// type DEFINED_ELSEWHERE = crate::doc::NotDefinedHere;
14+
/// ```
15+
///
16+
/// This type is uninhabitable like the [`never` type] to ensure that no one
17+
/// will ever accidentally use it.
18+
///
19+
/// [`never` type]: https://doc.rust-lang.org/std/primitive.never.html
20+
pub enum NotDefinedHere {}
21+
22+
pub mod os;
23+
pub mod winapi;

tokio/src/doc/os.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//! See [std::os](https://doc.rust-lang.org/std/os/index.html).
2+
3+
/// Platform-specific extensions to `std` for Windows.
4+
///
5+
/// See [std::os::windows](https://doc.rust-lang.org/std/os/windows/index.html).
6+
pub mod windows {
7+
/// Windows-specific extensions to general I/O primitives.
8+
///
9+
/// See [std::os::windows::io](https://doc.rust-lang.org/std/os/windows/io/index.html).
10+
pub mod io {
11+
/// See [std::os::windows::io::RawHandle](https://doc.rust-lang.org/std/os/windows/io/type.RawHandle.html)
12+
pub type RawHandle = crate::doc::NotDefinedHere;
13+
14+
/// See [std::os::windows::io::AsRawHandle](https://doc.rust-lang.org/std/os/windows/io/trait.AsRawHandle.html)
15+
pub trait AsRawHandle {
16+
/// See [std::os::windows::io::FromRawHandle::from_raw_handle](https://doc.rust-lang.org/std/os/windows/io/trait.AsRawHandle.html#tymethod.as_raw_handle)
17+
fn as_raw_handle(&self) -> RawHandle;
18+
}
19+
20+
/// See [std::os::windows::io::FromRawHandle](https://doc.rust-lang.org/std/os/windows/io/trait.FromRawHandle.html)
21+
pub trait FromRawHandle {
22+
/// See [std::os::windows::io::FromRawHandle::from_raw_handle](https://doc.rust-lang.org/std/os/windows/io/trait.FromRawHandle.html#tymethod.from_raw_handle)
23+
unsafe fn from_raw_handle(handle: RawHandle) -> Self;
24+
}
25+
}
26+
}

tokio/src/doc/winapi.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
//! See [winapi].
2+
//!
3+
//! [winapi]: https://docs.rs/winapi
4+
5+
/// See [winapi::shared](https://docs.rs/winapi/*/winapi/shared/index.html).
6+
pub mod shared {
7+
/// See [winapi::shared::winerror](https://docs.rs/winapi/*/winapi/shared/winerror/index.html).
8+
#[allow(non_camel_case_types)]
9+
pub mod winerror {
10+
/// See [winapi::shared::winerror::ERROR_ACCESS_DENIED][winapi]
11+
///
12+
/// [winapi]: https://docs.rs/winapi/*/winapi/shared/winerror/constant.ERROR_ACCESS_DENIED.html
13+
pub type ERROR_ACCESS_DENIED = crate::doc::NotDefinedHere;
14+
15+
/// See [winapi::shared::winerror::ERROR_PIPE_BUSY][winapi]
16+
///
17+
/// [winapi]: https://docs.rs/winapi/*/winapi/shared/winerror/constant.ERROR_PIPE_BUSY.html
18+
pub type ERROR_PIPE_BUSY = crate::doc::NotDefinedHere;
19+
20+
/// See [winapi::shared::winerror::ERROR_MORE_DATA][winapi]
21+
///
22+
/// [winapi]: https://docs.rs/winapi/*/winapi/shared/winerror/constant.ERROR_MORE_DATA.html
23+
pub type ERROR_MORE_DATA = crate::doc::NotDefinedHere;
24+
}
25+
}
26+
27+
/// See [winapi::um](https://docs.rs/winapi/*/winapi/um/index.html).
28+
pub mod um {
29+
/// See [winapi::um::winbase](https://docs.rs/winapi/*/winapi/um/winbase/index.html).
30+
#[allow(non_camel_case_types)]
31+
pub mod winbase {
32+
/// See [winapi::um::winbase::PIPE_TYPE_MESSAGE][winapi]
33+
///
34+
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_TYPE_MESSAGE.html
35+
pub type PIPE_TYPE_MESSAGE = crate::doc::NotDefinedHere;
36+
37+
/// See [winapi::um::winbase::PIPE_TYPE_BYTE][winapi]
38+
///
39+
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_TYPE_BYTE.html
40+
pub type PIPE_TYPE_BYTE = crate::doc::NotDefinedHere;
41+
42+
/// See [winapi::um::winbase::PIPE_CLIENT_END][winapi]
43+
///
44+
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_CLIENT_END.html
45+
pub type PIPE_CLIENT_END = crate::doc::NotDefinedHere;
46+
47+
/// See [winapi::um::winbase::PIPE_SERVER_END][winapi]
48+
///
49+
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.PIPE_SERVER_END.html
50+
pub type PIPE_SERVER_END = crate::doc::NotDefinedHere;
51+
52+
/// See [winapi::um::winbase::SECURITY_IDENTIFICATION][winapi]
53+
///
54+
/// [winapi]: https://docs.rs/winapi/*/winapi/um/winbase/constant.SECURITY_IDENTIFICATION.html
55+
pub type SECURITY_IDENTIFICATION = crate::doc::NotDefinedHere;
56+
}
57+
58+
/// See [winapi::um::minwinbase](https://docs.rs/winapi/*/winapi/um/minwinbase/index.html).
59+
#[allow(non_camel_case_types)]
60+
pub mod minwinbase {
61+
/// See [winapi::um::minwinbase::SECURITY_ATTRIBUTES][winapi]
62+
///
63+
/// [winapi]: https://docs.rs/winapi/*/winapi/um/minwinbase/constant.SECURITY_ATTRIBUTES.html
64+
pub type SECURITY_ATTRIBUTES = crate::doc::NotDefinedHere;
65+
}
66+
}

tokio/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,28 @@ mod util;
442442
/// ```
443443
pub mod stream {}
444444

445+
// local re-exports of platform specific things, allowing for decent
446+
// documentation to be shimmed in on docs.rs
447+
448+
#[cfg(docsrs)]
449+
pub mod doc;
450+
451+
#[cfg(docsrs)]
452+
#[allow(unused)]
453+
pub(crate) use self::doc::os;
454+
455+
#[cfg(not(docsrs))]
456+
#[allow(unused)]
457+
pub(crate) use std::os;
458+
459+
#[cfg(docsrs)]
460+
#[allow(unused)]
461+
pub(crate) use self::doc::winapi;
462+
463+
#[cfg(all(not(docsrs), windows, feature = "net"))]
464+
#[allow(unused)]
465+
pub(crate) use ::winapi;
466+
445467
cfg_macros! {
446468
/// Implementation detail of the `select!` macro. This macro is **not**
447469
/// intended to be used as part of the public API and is permitted to

tokio/src/macros/cfg.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,16 @@ macro_rules! cfg_net_unix {
182182
}
183183
}
184184

185+
macro_rules! cfg_net_windows {
186+
($($item:item)*) => {
187+
$(
188+
#[cfg(all(any(docsrs, windows), feature = "net"))]
189+
#[cfg_attr(docsrs, doc(cfg(all(windows, feature = "net"))))]
190+
$item
191+
)*
192+
}
193+
}
194+
185195
macro_rules! cfg_process {
186196
($($item:item)*) => {
187197
$(

tokio/src/net/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,7 @@ cfg_net_unix! {
4646
pub use unix::listener::UnixListener;
4747
pub use unix::stream::UnixStream;
4848
}
49+
50+
cfg_net_windows! {
51+
pub mod windows;
52+
}

0 commit comments

Comments
 (0)