Skip to content

Implement highlevel unix socket listeners #3187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions newsfragments/3187.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``trio.open_unix_listener``, ``trio.serve_unix``, and ``trio.UnixSocketListener`` to support ``SOCK_STREAM`` `Unix domain sockets <https://en.wikipedia.org/wiki/Unix_domain_socket>`__
5 changes: 5 additions & 0 deletions src/trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,16 @@
serve_tcp as serve_tcp,
)
from ._highlevel_open_tcp_stream import open_tcp_stream as open_tcp_stream
from ._highlevel_open_unix_listeners import (
open_unix_listener as open_unix_listener,
serve_unix as serve_unix,
)
from ._highlevel_open_unix_stream import open_unix_socket as open_unix_socket
from ._highlevel_serve_listeners import serve_listeners as serve_listeners
from ._highlevel_socket import (
SocketListener as SocketListener,
SocketStream as SocketStream,
UnixSocketListener as UnixSocketListener,
)
from ._highlevel_ssl_helpers import (
open_ssl_over_tcp_listeners as open_ssl_over_tcp_listeners,
Expand Down
173 changes: 173 additions & 0 deletions src/trio/_highlevel_open_unix_listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from __future__ import annotations

import os
import sys
from typing import TYPE_CHECKING

import trio
import trio.socket as tsocket
from trio import TaskStatus

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable


try:
from trio.socket import AF_UNIX

HAS_UNIX = True
except ImportError:
HAS_UNIX = False


# Default backlog size:
#
# Having the backlog too low can cause practical problems (a perfectly healthy
# service that starts failing to accept connections if they arrive in a
# burst).
#
# Having it too high doesn't really cause any problems. Like any buffer, you
# want backlog queue to be zero usually, and it won't save you if you're
# getting connection attempts faster than you can call accept() on an ongoing
# basis. But unlike other buffers, this one doesn't really provide any
# backpressure. If a connection gets stuck waiting in the backlog queue, then
# from the peer's point of view the connection succeeded but then their
# send/recv will stall until we get to it, possibly for a long time. OTOH if
# there isn't room in the backlog queue, then their connect stalls, possibly
# for a long time, which is pretty much the same thing.
#
# A large backlog can also use a bit more kernel memory, but this seems fairly
# negligible these days.
#
# So this suggests we should make the backlog as large as possible. This also
# matches what Golang does. However, they do it in a weird way, where they
# have a bunch of code to sniff out the configured upper limit for backlog on
# different operating systems. But on every system, passing in a too-large
# backlog just causes it to be silently truncated to the configured maximum,
# so this is unnecessary -- we can just pass in "infinity" and get the maximum
# that way. (Verified on Windows, Linux, macOS using
# https://github.com/python-trio/trio/wiki/notes-to-self#measure-listen-backlogpy
def _compute_backlog(backlog: int | None) -> int:
# Many systems (Linux, BSDs, ...) store the backlog in a uint16 and are
# missing overflow protection, so we apply our own overflow protection.
# https://github.com/golang/go/issues/5030
if not isinstance(backlog, int) and backlog is not None:
raise TypeError(f"backlog must be an int or None, not {backlog!r}")
if backlog is None:
return 0xFFFF
return min(backlog, 0xFFFF)


async def open_unix_listener(
path: str | bytes | os.PathLike[str] | os.PathLike[bytes],
*,
mode: int | None = None, # 0o666,
backlog: int | None = None,
) -> trio.UnixSocketListener:
"""Create :class:`SocketListener` objects to listen for connections.
Opens a connection to the specified
`Unix domain socket <https://en.wikipedia.org/wiki/Unix_domain_socket>`__.

You must have read/write permission on the specified file to connect.

Args:

path (str): Filename of UNIX socket to create and listen on.
Absolute or relative paths may be used.

mode (int or None): The socket file permissions.
UNIX permissions are usually specified in octal numbers.
If you leave this as ``None``, Trio will not change the mode from
the operating system's default.

backlog (int or None): The listen backlog to use. If you leave this as
``None`` then Trio will pick a good default. (Currently: whatever
your system has configured as the maximum backlog.)

Returns:
:class:`UnixSocketListener`

Raises:
:class:`TypeError` if invalid arguments.
:class:`RuntimeError`: If AF_UNIX sockets are not supported.
"""
if not HAS_UNIX:
raise RuntimeError("Unix sockets are not supported on this platform")

computed_backlog = _compute_backlog(backlog)

fspath = await trio.Path(os.fsdecode(path)).absolute()

folder = fspath.parent
if not await folder.exists():
raise FileNotFoundError(f"Socket folder does not exist: {folder!r}")

# much more simplified logic vs tcp sockets - one socket type and only one
# possible location to connect to
sock = tsocket.socket(AF_UNIX, tsocket.SOCK_STREAM)
try:
# See https://github.com/python-trio/trio/issues/39
if sys.platform != "win32":
sock.setsockopt(tsocket.SOL_SOCKET, tsocket.SO_REUSEADDR, 1)

await sock.bind(str(fspath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about this because the original PR does some fun stuff here. Specifically, what if there are two calls to this trying to make a file, does this work? Could we add some tests for this? (along with other tests -- just noticed there are none.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the original PR seems to handle this path being e.g. a directory. Does this PR? (yet another test!)


sock.listen(computed_backlog)

if mode is not None:
await fspath.chmod(mode)

return trio.UnixSocketListener(sock)
except BaseException as exc:
sock.close()
try:
os.unlink(str(fspath))
except BaseException as exc_2:
raise exc_2 from exc
raise


async def serve_unix(
handler: Callable[[trio.SocketStream], Awaitable[object]],
path: str | bytes | os.PathLike[str] | os.PathLike[bytes],
*,
backlog: int | None = None,
handler_nursery: trio.Nursery | None = None,
task_status: TaskStatus[list[trio.UnixSocketListener]] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Listen for incoming UNIX connections, and for each one start a task
running ``handler(stream)``.
This is a thin convenience wrapper around :func:`open_unix_listener` and
:func:`serve_listeners` – see them for full details.
.. warning::
If ``handler`` raises an exception, then this function doesn't do
anything special to catch it – so by default the exception will
propagate out and crash your server. If you don't want this, then catch
exceptions inside your ``handler``, or use a ``handler_nursery`` object
that responds to exceptions in some other way.
When used with ``nursery.start`` you get back the newly opened listeners.
Args:
handler: The handler to start for each incoming connection. Passed to
:func:`serve_listeners`.
path: The socket file name.
Passed to :func:`open_unix_listener`.
backlog: The listen backlog, or None to have a good default picked.
Passed to :func:`open_tcp_listener`.
handler_nursery: The nursery to start handlers in, or None to use an
internal nursery. Passed to :func:`serve_listeners`.
task_status: This function can be used with ``nursery.start``.
Returns:
This function only returns when cancelled.
Raises:
RuntimeError: If AF_UNIX sockets are not supported.
"""
if not HAS_UNIX:
raise RuntimeError("Unix sockets are not supported on this platform")

listener = await open_unix_listener(path, backlog=backlog)
await trio.serve_listeners(
handler,
[listener],
handler_nursery=handler_nursery,
task_status=task_status,
)
78 changes: 77 additions & 1 deletion src/trio/_highlevel_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import errno
from contextlib import contextmanager, suppress
from typing import TYPE_CHECKING, overload
from os import unlink
from typing import TYPE_CHECKING, Final, overload

import trio

Expand Down Expand Up @@ -31,6 +32,8 @@
errno.ENOTSOCK,
}

HAS_UNIX: Final = hasattr(tsocket, "AF_UNIX")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is cleaner than the definition in _highlevel_open_unix_listeners.py



@contextmanager
def _translate_socket_errors_to_stream_errors() -> Generator[None, None, None]:
Expand Down Expand Up @@ -412,3 +415,76 @@ async def aclose(self) -> None:
"""Close this listener and its underlying socket."""
self.socket.close()
await trio.lowlevel.checkpoint()


@final
class UnixSocketListener(Listener[SocketStream]):
"""A :class:`~trio.abc.Listener` that uses a listening socket to accept
incoming connections as :class:`SocketStream` objects.

Args:
socket: The Trio socket object to wrap. Must have type ``SOCK_STREAM``,
and be listening.

Note that the :class:`UnixSocketListener` "takes ownership" of the given
socket; closing the :class:`UnixSocketListener` will also close the socket
and unlink its associated file.

.. attribute:: socket

The Trio socket object that this stream wraps.

"""

def __init__(self, socket: SocketType) -> None:
if not HAS_UNIX:
raise RuntimeError("Unix sockets are not supported on this platform")
if not isinstance(socket, tsocket.SocketType):
raise TypeError("SocketListener requires a Trio socket object")
if socket.type != tsocket.SOCK_STREAM:
raise ValueError("SocketListener requires a SOCK_STREAM socket")
try:
listening = socket.getsockopt(tsocket.SOL_SOCKET, tsocket.SO_ACCEPTCONN)
except OSError:
# SO_ACCEPTCONN fails on macOS; we just have to trust the user.
pass
else:
if not listening:
raise ValueError("SocketListener requires a listening socket")

self.socket = socket

async def accept(self) -> SocketStream:
"""Accept an incoming connection.

Returns:
:class:`SocketStream`

Raises:
OSError: if the underlying call to ``accept`` raises an unexpected
error.
ClosedResourceError: if you already closed the socket.

This method handles routine errors like ``ECONNABORTED``, but passes
other errors on to its caller. In particular, it does *not* make any
special effort to handle resource exhaustion errors like ``EMFILE``,
``ENFILE``, ``ENOBUFS``, ``ENOMEM``.

"""
while True:
try:
sock, _ = await self.socket.accept()
except OSError as exc:
if exc.errno in _closed_stream_errnos:
raise trio.ClosedResourceError from None
if exc.errno not in _ignorable_accept_errnos:
raise
else:
return SocketStream(sock)

async def aclose(self) -> None:
"""Close this listener, its underlying socket, and unlink its associated file."""
path = self.socket.getsockname()
self.socket.close()
unlink(path)
await trio.lowlevel.checkpoint()
Loading