Skip to content

Commit ff786dd

Browse files
authored
add hazmat.FdStream (#1129)
Closes #829
1 parent a48bdd2 commit ff786dd

File tree

7 files changed

+101
-51
lines changed

7 files changed

+101
-51
lines changed

docs/source/reference-hazmat.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,26 @@ All environments provide the following functions:
174174
yourself afterwards.
175175

176176

177+
Unix-specific API
178+
-----------------
179+
180+
`FdStream` supports wrapping Unix files (such as a pipe or TTY) as
181+
a stream.
182+
183+
If you have two different file descriptors for sending and receiving,
184+
and want to bundle them together into a single bidirectional
185+
`~trio.abc.Stream`, then use `trio.StapledStream`::
186+
187+
bidirectional_stream = trio.StapledStream(
188+
trio.hazmat.FdStream(write_fd),
189+
trio.hazmat.FdStream(read_fd)
190+
)
191+
192+
.. autoclass:: FdStream
193+
:show-inheritance:
194+
:members:
195+
196+
177197
Kqueue-specific API
178198
-------------------
179199

newsfragments/829.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `trio.hazmat.FdStream` for wrapping a Unix file descriptor as a `~trio.abc.Stream`.

trio/_subprocess.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import os
2-
import select
32
import subprocess
4-
from functools import partial
3+
from typing import Optional
54

6-
from ._abc import AsyncResource
5+
from ._abc import AsyncResource, SendStream, ReceiveStream
76
from ._highlevel_generic import StapledStream
87
from ._sync import Lock
98
from ._subprocess_platform import (
@@ -101,9 +100,10 @@ def _init(
101100
.format(key)
102101
)
103102

104-
self.stdin = None
105-
self.stdout = None
106-
self.stderr = None
103+
self.stdin = None # type: Optional[SendStream]
104+
self.stdout = None # type: Optional[ReceiveStream]
105+
self.stderr = None # type: Optional[ReceiveStream]
106+
self.stdio = None # type: Optional[StapledStream]
107107

108108
if os.name == "posix":
109109
if isinstance(command, str) and not options.get("shell"):
@@ -153,8 +153,6 @@ def _init(
153153

154154
if self.stdin is not None and self.stdout is not None:
155155
self.stdio = StapledStream(self.stdin, self.stdout)
156-
else:
157-
self.stdio = None
158156

159157
self.args = self._proc.args
160158
self.pid = self._proc.pid

trio/_subprocess_platform/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,15 @@ def create_pipe_from_child_output() -> Tuple[ReceiveStream, int]:
6767

6868
try:
6969
if os.name == "posix":
70-
from .._unix_pipes import PipeSendStream, PipeReceiveStream
70+
from ..hazmat import FdStream
7171

7272
def create_pipe_to_child_stdin(): # noqa: F811
7373
rfd, wfd = os.pipe()
74-
return PipeSendStream(wfd), rfd
74+
return FdStream(wfd), rfd
7575

7676
def create_pipe_from_child_output(): # noqa: F811
7777
rfd, wfd = os.pipe()
78-
return PipeReceiveStream(rfd), wfd
78+
return FdStream(rfd), wfd
7979

8080
elif os.name == "nt":
8181
from .._windows_pipes import PipeSendStream, PipeReceiveStream

trio/_unix_pipes.py

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
import fcntl
21
import os
32
import errno
43

5-
from ._abc import SendStream, ReceiveStream
4+
from ._abc import Stream
65
from ._util import ConflictDetector
76

87
import trio
98

9+
if os.name != "posix":
10+
# We raise an error here rather than gating the import in hazmat.py
11+
# in order to keep jedi static analysis happy.
12+
raise ImportError
13+
1014

1115
class _FdHolder:
1216
# This class holds onto a raw file descriptor, in non-blocking mode, and
@@ -33,9 +37,9 @@ def __init__(self, fd: int):
3337
if not isinstance(fd, int):
3438
raise TypeError("file descriptor must be an int")
3539
self.fd = fd
36-
# Flip the fd to non-blocking mode
37-
flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
38-
fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
40+
# Store original state, and ensure non-blocking mode is enabled
41+
self._original_is_blocking = os.get_blocking(fd)
42+
os.set_blocking(fd, False)
3943

4044
@property
4145
def closed(self):
@@ -53,6 +57,7 @@ def _raw_close(self):
5357
return
5458
fd = self.fd
5559
self.fd = -1
60+
os.set_blocking(fd, self._original_is_blocking)
5661
os.close(fd)
5762

5863
def __del__(self):
@@ -65,21 +70,53 @@ async def aclose(self):
6570
await trio.hazmat.checkpoint()
6671

6772

68-
class PipeSendStream(SendStream):
69-
"""Represents a send stream over an os.pipe object."""
73+
class FdStream(Stream):
74+
"""
75+
Represents a stream given the file descriptor to a pipe, TTY, etc.
76+
77+
*fd* must refer to a file that is open for reading and/or writing and
78+
supports non-blocking I/O (pipes and TTYs will work, on-disk files probably
79+
not). The returned stream takes ownership of the fd, so closing the stream
80+
will close the fd too. As with `os.fdopen`, you should not directly use
81+
an fd after you have wrapped it in a stream using this function.
82+
83+
To be used as a Trio stream, an open file must be placed in non-blocking
84+
mode. Unfortunately, this impacts all I/O that goes through the
85+
underlying open file, including I/O that uses a different
86+
file descriptor than the one that was passed to Trio. If other threads
87+
or processes are using file descriptors that are related through `os.dup`
88+
or inheritance across `os.fork` to the one that Trio is using, they are
89+
unlikely to be prepared to have non-blocking I/O semantics suddenly
90+
thrust upon them. For example, you can use ``FdStream(os.dup(0))`` to
91+
obtain a stream for reading from standard input, but it is only safe to
92+
do so with heavy caveats: your stdin must not be shared by any other
93+
processes and you must not make any calls to synchronous methods of
94+
`sys.stdin` until the stream returned by `FdStream` is closed. See
95+
`issue #174 <https://github.com/python-trio/trio/issues/174>`__ for a
96+
discussion of the challenges involved in relaxing this restriction.
97+
98+
Args:
99+
fd (int): The fd to be wrapped.
100+
101+
Returns:
102+
A new `FdStream` object.
103+
"""
70104

71105
def __init__(self, fd: int):
72106
self._fd_holder = _FdHolder(fd)
73-
self._conflict_detector = ConflictDetector(
74-
"another task is using this pipe"
107+
self._send_conflict_detector = ConflictDetector(
108+
"another task is using this stream for send"
109+
)
110+
self._receive_conflict_detector = ConflictDetector(
111+
"another task is using this stream for receive"
75112
)
76113

77114
async def send_all(self, data: bytes):
78-
with self._conflict_detector:
115+
with self._send_conflict_detector:
79116
# have to check up front, because send_all(b"") on a closed pipe
80117
# should raise
81118
if self._fd_holder.closed:
82-
raise trio.ClosedResourceError("this pipe was already closed")
119+
raise trio.ClosedResourceError("file was already closed")
83120
await trio.hazmat.checkpoint()
84121
length = len(data)
85122
# adapted from the SocketStream code
@@ -94,40 +131,24 @@ async def send_all(self, data: bytes):
94131
except OSError as e:
95132
if e.errno == errno.EBADF:
96133
raise trio.ClosedResourceError(
97-
"this pipe was closed"
134+
"file was already closed"
98135
) from None
99136
else:
100137
raise trio.BrokenResourceError from e
101138

102139
async def wait_send_all_might_not_block(self) -> None:
103-
with self._conflict_detector:
140+
with self._send_conflict_detector:
104141
if self._fd_holder.closed:
105-
raise trio.ClosedResourceError("this pipe was already closed")
142+
raise trio.ClosedResourceError("file was already closed")
106143
try:
107144
await trio.hazmat.wait_writable(self._fd_holder.fd)
108145
except BrokenPipeError as e:
109146
# kqueue: raises EPIPE on wait_writable instead
110147
# of sending, which is annoying
111148
raise trio.BrokenResourceError from e
112149

113-
async def aclose(self):
114-
await self._fd_holder.aclose()
115-
116-
def fileno(self):
117-
return self._fd_holder.fd
118-
119-
120-
class PipeReceiveStream(ReceiveStream):
121-
"""Represents a receive stream over an os.pipe object."""
122-
123-
def __init__(self, fd: int):
124-
self._fd_holder = _FdHolder(fd)
125-
self._conflict_detector = ConflictDetector(
126-
"another task is using this pipe"
127-
)
128-
129150
async def receive_some(self, max_bytes: int) -> bytes:
130-
with self._conflict_detector:
151+
with self._receive_conflict_detector:
131152
if not isinstance(max_bytes, int):
132153
raise TypeError("max_bytes must be integer >= 1")
133154

@@ -143,7 +164,7 @@ async def receive_some(self, max_bytes: int) -> bytes:
143164
except OSError as e:
144165
if e.errno == errno.EBADF:
145166
raise trio.ClosedResourceError(
146-
"this pipe was closed"
167+
"file was already closed"
147168
) from None
148169
else:
149170
raise trio.BrokenResourceError from e

trio/hazmat.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
but useful for extending Trio's functionality.
44
"""
55

6+
import os
67
import sys
78

89
# This is the union of a subset of trio/_core/ and some things from trio/*.py.
@@ -22,6 +23,12 @@
2223
spawn_system_task, wait_readable, wait_writable, notify_closing
2324
)
2425

26+
# Unix-specific symbols
27+
try:
28+
from ._unix_pipes import FdStream
29+
except ImportError:
30+
pass
31+
2532
# Kqueue-specific symbols
2633
try:
2734
from ._core import (

trio/tests/test_unix_pipes.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111
posix = os.name == "posix"
1212
pytestmark = pytest.mark.skipif(not posix, reason="posix only")
1313
if posix:
14-
from .._unix_pipes import PipeSendStream, PipeReceiveStream
14+
from .._unix_pipes import FdStream
15+
else:
16+
with pytest.raises(ImportError):
17+
from .._unix_pipes import FdStream
1518

1619

1720
# Have to use quoted types so import doesn't crash on windows
18-
async def make_pipe() -> "Tuple[PipeSendStream, PipeReceiveStream]":
21+
async def make_pipe() -> "Tuple[FdStream, FdStream]":
1922
"""Makes a new pair of pipes."""
2023
(r, w) = os.pipe()
21-
return PipeSendStream(w), PipeReceiveStream(r)
24+
return FdStream(w), FdStream(r)
2225

2326

2427
async def make_clogged_pipe():
@@ -49,7 +52,7 @@ async def make_clogged_pipe():
4952

5053
async def test_send_pipe():
5154
r, w = os.pipe()
52-
async with PipeSendStream(w) as send:
55+
async with FdStream(w) as send:
5356
assert send.fileno() == w
5457
await send.send_all(b"123")
5558
assert (os.read(r, 8)) == b"123"
@@ -59,7 +62,7 @@ async def test_send_pipe():
5962

6063
async def test_receive_pipe():
6164
r, w = os.pipe()
62-
async with PipeReceiveStream(r) as recv:
65+
async with FdStream(r) as recv:
6366
assert (recv.fileno()) == r
6467
os.write(w, b"123")
6568
assert (await recv.receive_some(8)) == b"123"
@@ -93,10 +96,10 @@ async def reader():
9396

9497
async def test_pipe_errors():
9598
with pytest.raises(TypeError):
96-
PipeReceiveStream(None)
99+
FdStream(None)
97100

98101
with pytest.raises(ValueError):
99-
await PipeReceiveStream(0).receive_some(0)
102+
await FdStream(0).receive_some(0)
100103

101104

102105
async def test_del():
@@ -146,7 +149,7 @@ async def test_misdirected_aclose_regression():
146149
if r2_fd != old_r_fd: # pragma: no cover
147150
os.dup2(r2_fd, old_r_fd)
148151
os.close(r2_fd)
149-
async with PipeReceiveStream(old_r_fd) as r2:
152+
async with FdStream(old_r_fd) as r2:
150153
assert r2.fileno() == old_r_fd
151154

152155
# And now set up a background task that's working on the new receive

0 commit comments

Comments
 (0)