@@ -48,6 +48,10 @@ using namespace NConcurrency;
48
48
49
49
// //////////////////////////////////////////////////////////////////////////////
50
50
51
+ static constexpr auto & Logger = NetLogger;
52
+
53
+ // //////////////////////////////////////////////////////////////////////////////
54
+
51
55
namespace {
52
56
53
57
int GetLastNetworkError ()
@@ -93,39 +97,40 @@ ssize_t WriteToFD(TFileDescriptor fd, const char* buffer, size_t length)
93
97
#endif
94
98
}
95
99
96
- enum class EPipeReadStatus
97
- {
98
- PipeEmpty,
99
- PipeNotEmpty,
100
- NotSupportedError,
101
- };
102
-
103
- EPipeReadStatus CheckPipeReadStatus (const TString& pipePath)
100
+ TErrorOr<int > CheckPipeBytesLeftToRead (const TString& pipePath) noexcept
104
101
{
105
102
#ifdef _linux_
106
103
int bytesLeft = 0 ;
107
104
105
+ auto makeSystemError = [&] (TFormatString<> message) {
106
+ return TError (message)
107
+ << TError::FromSystem ()
108
+ << TErrorAttribute (" pipe_path" , pipePath);
109
+ };
110
+
108
111
{
109
112
int flags = O_RDONLY | O_CLOEXEC | O_NONBLOCK;
110
113
int fd = HandleEintr (::open, pipePath.c_str (), flags);
111
114
115
+ if (fd == -1 ) {
116
+ return makeSystemError (" Failed to open file descriptor" );
117
+ }
118
+
112
119
int ret = ::ioctl (fd, FIONREAD, &bytesLeft);
113
- if (ret == -1 && errno == EINVAL) {
114
- // Some linux platforms do not support
115
- // FIONREAD call. In such cases we
116
- // expect EINVAL error.
117
- return EPipeReadStatus::NotSupportedError;
120
+
121
+ if (ret == -1 ) {
122
+ return makeSystemError (" ioctl failed" );
118
123
}
119
124
120
- SafeClose (fd, /* ignoreBadFD*/ false );
125
+ if (!TryClose (fd, /* ignoreBadFD*/ false )) {
126
+ return makeSystemError (" Failed to close file descriptor" );
127
+ }
121
128
}
122
129
123
- return bytesLeft == 0
124
- ? EPipeReadStatus::PipeEmpty
125
- : EPipeReadStatus::PipeNotEmpty;
130
+ return bytesLeft;
126
131
#else
127
132
Y_UNUSED (pipePath);
128
- return EPipeReadStatus::NotSupportedError ;
133
+ return TError ( " Unsupported platform " ) ;
129
134
#endif
130
135
}
131
136
@@ -340,13 +345,19 @@ class TDeliveryFencedWriteOperation
340
345
{
341
346
auto result = TWriteOperation::PerformIO (fd);
342
347
if (IsWriteComplete (result)) {
343
- auto pipeReadStatus = CheckPipeReadStatus (PipePath_);
344
- if (pipeReadStatus == EPipeReadStatus::NotSupportedError) {
345
- return TError (" Delivery fenced write failed: FIONDREAD is not supported on your platform" )
346
- << TError::FromSystem ();
348
+ auto bytesLeftOrError = CheckPipeBytesLeftToRead (PipePath_);
349
+
350
+
351
+ if (!bytesLeftOrError.IsOK ()) {
352
+ YT_LOG_ERROR (bytesLeftOrError, " Delivery fenced write failed" );
353
+ return std::move (bytesLeftOrError).Wrap ();
354
+ } else {
355
+ YT_LOG_DEBUG (" Delivery fenced write pipe check finished (BytesLeft: %v)" , bytesLeftOrError.Value ());
347
356
}
348
357
349
- result.Value ().Retry = (pipeReadStatus != EPipeReadStatus::PipeEmpty);
358
+ result.Value ().Retry = (bytesLeftOrError.Value () != 0 );
359
+ } else {
360
+ YT_LOG_DEBUG (" Delivery fenced write to pipe step finished (Result: %v)" , result);
350
361
}
351
362
352
363
return result;
0 commit comments