Skip to content

Commit 894bd0b

Browse files
authored
[core] Integrate scoped dup2 (#51179)
Signed-off-by: dentiny <dentinyhao@gmail.com>
1 parent 39ba86b commit 894bd0b

File tree

2 files changed

+13
-43
lines changed

2 files changed

+13
-43
lines changed

src/ray/util/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ ray_cc_library(
299299
hdrs = ["stream_redirection_utils.h"],
300300
deps = [
301301
":pipe_logger",
302+
":scoped_dup2_wrapper",
302303
":stream_redirection_options",
303304
":util",
304305
],

src/ray/util/stream_redirection_utils.cc

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616

1717
#include <cstring>
1818
#include <functional>
19+
#include <memory>
1920
#include <mutex>
2021
#include <utility>
2122
#include <vector>
2223

2324
#include "ray/util/compat.h"
2425
#include "ray/util/pipe_logger.h"
26+
#include "ray/util/scoped_dup2_wrapper.h"
2527
#include "ray/util/util.h"
2628

2729
#if defined(_WIN32)
@@ -36,72 +38,39 @@ namespace {
3638
struct RedirectionHandleWrapper {
3739
RedirectionFileHandle redirection_file_handle;
3840
// Used for restoration.
39-
MEMFD_TYPE_NON_UNIQUE saved_stream_handle;
41+
std::unique_ptr<ScopedDup2Wrapper> scoped_dup2_wrapper;
4042
};
4143

4244
// TODO(hjiang): Revisit later, should be able to save some heap allocation with
4345
// absl::InlinedVector.
4446
//
4547
// Maps from original stream file handle (i.e. stdout/stderr) to its stream redirector.
46-
absl::flat_hash_map<int, RedirectionHandleWrapper> redirection_file_handles;
48+
absl::flat_hash_map<MEMFD_TYPE_NON_UNIQUE, RedirectionHandleWrapper>
49+
redirection_file_handles;
4750

4851
// Block synchronize on stream redirection related completion, should be call **EXACTLY
4952
// ONCE** at program termination.
5053
std::once_flag stream_exit_once_flag;
5154
void SyncOnStreamRedirection() {
52-
for (auto &[stream_fd, handle] : redirection_file_handles) {
53-
// Restore old stream fd.
54-
#if defined(__APPLE__) || defined(__linux__)
55-
RAY_CHECK_NE(dup2(handle.saved_stream_handle, stream_fd), -1)
56-
<< "Fails to restore file descriptor " << strerror(errno);
57-
#elif defined(_WIN32)
58-
int duped_fd = _open_osfhandle(reinterpret_cast<intptr_t>(handle.saved_stream_handle),
59-
_O_WRONLY);
60-
RAY_CHECK_NE(_dup2(duped_fd, stream_fd), -1) << "Fails to duplicate file descriptor.";
61-
#endif
62-
55+
for (auto &[_, handle] : redirection_file_handles) {
56+
handle.scoped_dup2_wrapper = nullptr;
6357
handle.redirection_file_handle.Close();
6458
}
6559
}
6660

6761
// Redirect the given [stream_fd] based on the specified option.
68-
void RedirectStream(int stream_fd, const StreamRedirectionOption &opt) {
62+
void RedirectStream(MEMFD_TYPE_NON_UNIQUE stream_fd, const StreamRedirectionOption &opt) {
6963
std::call_once(stream_exit_once_flag, []() {
7064
RAY_CHECK_EQ(std::atexit(SyncOnStreamRedirection), 0)
7165
<< "Fails to register stream redirection termination hook.";
7266
});
7367

7468
RedirectionFileHandle handle = CreateRedirectionFileHandle(opt);
75-
76-
#if defined(__APPLE__) || defined(__linux__)
77-
// Duplicate stream fd for later restoration.
78-
MEMFD_TYPE_NON_UNIQUE duped_stream_fd = dup(stream_fd);
79-
RAY_CHECK_NE(duped_stream_fd, -1)
80-
<< "Fails to duplicate stream fd " << stream_fd << " because " << strerror(errno);
81-
82-
RAY_CHECK_NE(dup2(handle.GetWriteHandle(), stream_fd), -1)
83-
<< "Fails to duplicate file descriptor " << strerror(errno);
84-
#elif defined(_WIN32)
85-
// Duplicate stream fd for later restoration.
86-
MEMFD_TYPE_NON_UNIQUE duped_stream_fd;
87-
BOOL result = DuplicateHandle(GetCurrentProcess(),
88-
(HANDLE)_get_osfhandle(stream_fd),
89-
GetCurrentProcess(),
90-
&duped_stream_fd,
91-
0,
92-
FALSE,
93-
DUPLICATE_SAME_ACCESS);
94-
RAY_CHECK(result);
95-
96-
int pipe_write_fd =
97-
_open_osfhandle(reinterpret_cast<intptr_t>(handle.GetWriteHandle()), _O_WRONLY);
98-
RAY_CHECK_NE(_dup2(pipe_write_fd, stream_fd), -1)
99-
<< "Fails to duplicate file descriptor.";
100-
#endif
69+
auto scoped_dup2_wrapper = ScopedDup2Wrapper::New(handle.GetWriteHandle(), stream_fd);
10170

10271
RedirectionHandleWrapper handle_wrapper;
10372
handle_wrapper.redirection_file_handle = std::move(handle);
104-
handle_wrapper.saved_stream_handle = duped_stream_fd;
73+
handle_wrapper.scoped_dup2_wrapper = std::move(scoped_dup2_wrapper);
10574

10675
const bool is_new =
10776
redirection_file_handles.emplace(stream_fd, std::move(handle_wrapper)).second;
@@ -118,10 +87,10 @@ void FlushOnRedirectedStream(int stream_fd) {
11887
} // namespace
11988

12089
void RedirectStdout(const StreamRedirectionOption &opt) {
121-
RedirectStream(GetStdoutFd(), opt);
90+
RedirectStream(GetStdoutHandle(), opt);
12291
}
12392
void RedirectStderr(const StreamRedirectionOption &opt) {
124-
RedirectStream(GetStderrFd(), opt);
93+
RedirectStream(GetStderrHandle(), opt);
12594
}
12695
void FlushOnRedirectedStdout() { FlushOnRedirectedStream(GetStdoutFd()); }
12796
void FlushOnRedirectedStderr() { FlushOnRedirectedStream(GetStderrFd()); }

0 commit comments

Comments
 (0)