Skip to content

Commit 75e63d3

Browse files
committed
fix: tcp stream fd closed when there are still users of it
1 parent 11086cb commit 75e63d3

File tree

4 files changed

+78
-64
lines changed

4 files changed

+78
-64
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
### Added
66
- `network::client::tcp::TcpStream` not supports async connection, provided with `connect_async` and `connect_timeout_async` methods
77

8+
### Fixes
9+
- `network::client::tcp::TcpStream` does not close underlying fd anymore. Now fd will be closed only when the last copy of tcp stream is dropped.
10+
11+
### Deprecated
12+
- `network::client::tcp::UnsafeSendSyncTcpStream` is now deprected. `network::client::tcp::TcpStream` should be used instead.
813

914

1015
# [6.0.0] Nov 20 2024

examples/async-h1-client/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use http_types::{Method, Request, Url};
22
use tarantool::error::Error;
33
use tarantool::fiber;
44
use tarantool::network::client::tcp::TcpStream;
5-
use tarantool::network::client::tcp::UnsafeSendSyncTcpStream;
65
use tarantool::proc;
76

87
#[proc]
@@ -17,14 +16,12 @@ fn get(url: &str) -> Result<(), Error> {
1716
let mut res = match url.scheme() {
1817
"http" => {
1918
let stream = TcpStream::connect(host, 80).map_err(Error::other)?;
20-
let stream = UnsafeSendSyncTcpStream(stream);
2119
println!("Sending request over http...");
2220
async_h1::connect(stream, req).await.map_err(Error::other)?
2321
}
2422
#[cfg(feature = "tls")]
2523
"https" => {
2624
let stream = TcpStream::connect(host, 443).map_err(Error::other)?;
27-
let stream = UnsafeSendSyncTcpStream(stream);
2825
let stream = async_native_tls::connect(host, stream)
2926
.await
3027
.map_err(Error::other)?;

tarantool/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "tarantool"
33
description = "Tarantool rust bindings"
4-
version = "6.0.0"
4+
version = "6.1.0"
55
authors = [
66
"Dmitriy Koltsov <dkoltsov@picodata.io>",
77
"Georgy Moshkin <gmoshkin@picodata.io>",

tarantool/src/network/client/tcp.rs

Lines changed: 72 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![allow(deprecated)]
2+
13
//! Contains an implementation of a custom async coio based [`TcpStream`].
24
//!
35
//! ## Example
@@ -16,14 +18,14 @@
1618
//! # };
1719
//! ```
1820
19-
use std::cell::{self, Cell};
21+
use std::cell::Cell;
2022
use std::ffi::{CString, NulError};
2123
use std::future::{self};
2224
use std::mem::{self, MaybeUninit};
2325
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd};
2426
use std::os::unix::io::RawFd;
2527
use std::pin::Pin;
26-
use std::rc::{self, Rc};
28+
use std::rc::Rc;
2729
use std::task::{Context, Poll};
2830
use std::time::Duration;
2931
use std::{io, marker, vec};
@@ -104,6 +106,54 @@ impl Drop for AutoCloseFd {
104106
}
105107
}
106108

109+
/// A store for raw file descriptor so we can allow cloning actual `TcpStream` properly.
110+
#[derive(Debug)]
111+
struct TcpInner {
112+
/// A raw tcp socket file descriptor. Replaced with `None` when the stream
113+
/// is closed.
114+
fd: Cell<Option<RawFd>>,
115+
}
116+
117+
impl TcpInner {
118+
#[inline(always)]
119+
#[track_caller]
120+
fn close(&self) -> io::Result<()> {
121+
let Some(fd) = self.fd.take() else {
122+
return Ok(());
123+
};
124+
// SAFETY: safe because we close the `fd` only once
125+
let rc = unsafe { ffi::coio_close(fd) };
126+
if rc != 0 {
127+
let e = io::Error::last_os_error();
128+
if e.raw_os_error() == Some(libc::EBADF) {
129+
crate::say_error!("close({fd}): Bad file descriptor");
130+
if cfg!(debug_assertions) {
131+
panic!("close({}): Bad file descriptor", fd);
132+
}
133+
}
134+
return Err(e);
135+
}
136+
Ok(())
137+
}
138+
139+
#[inline(always)]
140+
fn fd(&self) -> io::Result<RawFd> {
141+
let Some(fd) = self.fd.get() else {
142+
let e = io::Error::new(io::ErrorKind::Other, "socket closed already");
143+
return Err(e);
144+
};
145+
Ok(fd)
146+
}
147+
}
148+
149+
impl Drop for TcpInner {
150+
fn drop(&mut self) {
151+
if let Err(e) = self.close() {
152+
crate::say_error!("TcpInner::drop: closing tcp stream inner failed: {e}");
153+
}
154+
}
155+
}
156+
107157
/// Async TcpStream based on fibers and coio.
108158
///
109159
/// Use [timeout][t] on top of read or write operations on [`TcpStream`]
@@ -118,15 +168,8 @@ impl Drop for AutoCloseFd {
118168
/// [t]: crate::fiber::async::timeout::timeout
119169
#[derive(Debug, Clone)]
120170
pub struct TcpStream {
121-
/// A raw tcp socket file descriptor. Replaced with `None` when the stream
122-
/// is closed.
123-
///
124-
/// Note that it's wrapped in a `Rc`, because the outer `TcpStream` needs to
125-
/// be mutably borrowable (thanks to AsyncWrite & AsyncRead traits) and it
126-
/// doesn't make sense to wrap it in a Mutex of any sort, because it's
127-
/// perfectly safe to read & write on a tcp socket even from concurrent threads,
128-
/// but we only use it from different fibers.
129-
fd: Rc<Cell<Option<RawFd>>>,
171+
/// An actual fd which also stored it's open/close state.
172+
inner: Rc<TcpInner>,
130173
}
131174

132175
impl TcpStream {
@@ -248,32 +291,22 @@ impl TcpStream {
248291

249292
#[inline(always)]
250293
#[track_caller]
251-
pub fn close(&mut self) -> io::Result<()> {
252-
let Some(fd) = self.fd.take() else {
253-
// Already closed.
254-
return Ok(());
255-
};
256-
257-
// SAFETY: safe because we close the `fd` only once
258-
let rc = unsafe { ffi::coio_close(fd) };
259-
if rc != 0 {
260-
let e = io::Error::last_os_error();
261-
if e.raw_os_error() == Some(libc::EBADF) {
262-
crate::say_error!("close({fd}): Bad file descriptor");
263-
if cfg!(debug_assertions) {
264-
panic!("close({}): Bad file descriptor", fd);
265-
}
266-
}
267-
return Err(e);
268-
}
269-
Ok(())
294+
pub fn close(&self) -> io::Result<()> {
295+
self.inner.close()
270296
}
271297
}
272298

299+
/// SAFETY: completely unsafe, but we are allowed to do this cause sending/sharing following stream to/from another thread
300+
/// SAFETY: will take no effect due to no runtime within it
301+
unsafe impl Send for TcpStream {}
302+
unsafe impl Sync for TcpStream {}
303+
273304
impl From<RawFd> for TcpStream {
274305
fn from(value: RawFd) -> Self {
275306
Self {
276-
fd: rc::Rc::new(cell::Cell::new(Some(value))),
307+
inner: Rc::new(TcpInner {
308+
fd: Cell::new(Some(value)),
309+
}),
277310
}
278311
}
279312
}
@@ -290,10 +323,7 @@ impl AsyncWrite for TcpStream {
290323
cx: &mut Context<'_>,
291324
buf: &[u8],
292325
) -> Poll<io::Result<usize>> {
293-
let Some(fd) = self.fd.get() else {
294-
let e = io::Error::new(io::ErrorKind::Other, "socket closed already");
295-
return Poll::Ready(Err(e));
296-
};
326+
let fd = self.inner.fd()?;
297327

298328
let (result, err) = (
299329
// `self.fd` must be nonblocking for this to work correctly
@@ -325,25 +355,17 @@ impl AsyncWrite for TcpStream {
325355
}
326356

327357
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
328-
if self.fd.get().is_none() {
329-
let e = io::Error::new(io::ErrorKind::Other, "socket closed already");
330-
return Poll::Ready(Err(e));
331-
};
332-
358+
self.inner.fd()?;
333359
// [`TcpStream`] similarily to std does not buffer anything,
334360
// so there is nothing to flush.
335361
//
336362
// If buffering is needed use [`futures::io::BufWriter`] on top of this stream.
337363
Poll::Ready(Ok(()))
338364
}
339365

340-
fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
341-
if self.fd.get().is_none() {
342-
let e = io::Error::new(io::ErrorKind::Other, "socket closed already");
343-
return Poll::Ready(Err(e));
344-
};
345-
346-
let res = self.close();
366+
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
367+
self.inner.fd()?;
368+
let res = self.inner.close();
347369
Poll::Ready(res)
348370
}
349371
}
@@ -354,13 +376,10 @@ impl AsyncRead for TcpStream {
354376
cx: &mut Context<'_>,
355377
buf: &mut [u8],
356378
) -> Poll<io::Result<usize>> {
357-
let Some(fd) = self.fd.get() else {
358-
let e = io::Error::new(io::ErrorKind::Other, "socket closed already");
359-
return Poll::Ready(Err(e));
360-
};
379+
let fd = self.inner.fd()?;
361380

362381
let (result, err) = (
363-
// `self.fd` must be nonblocking for this to work correctly
382+
// `self.inner.fd` must be nonblocking for this to work correctly
364383
unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) },
365384
io::Error::last_os_error(),
366385
);
@@ -389,14 +408,6 @@ impl AsyncRead for TcpStream {
389408
}
390409
}
391410

392-
impl Drop for TcpStream {
393-
fn drop(&mut self) {
394-
if let Err(e) = self.close() {
395-
crate::say_error!("TcpStream::drop: closing tcp stream failed: {e}");
396-
}
397-
}
398-
}
399-
400411
/// Resolves provided url and port to a sequence of sock addrs.
401412
///
402413
/// # Returns
@@ -628,6 +639,7 @@ impl<'a> From<&'a SockAddr> for AddrInfo<'a> {
628639
/// necessary when working with our async runtime, which is single threaded.
629640
#[derive(Debug, Clone)]
630641
#[repr(transparent)]
642+
#[deprecated = "Use `TcpStream` instead"]
631643
pub struct UnsafeSendSyncTcpStream(pub TcpStream);
632644

633645
unsafe impl Send for UnsafeSendSyncTcpStream {}

0 commit comments

Comments
 (0)