Skip to content

Commit 00f8ed7

Browse files
author
babenko
committed
Refactor and simplify inotify watches
commit_hash:df3b57704dbfda17915e4aa68619491e48d96937
1 parent fe9c0a0 commit 00f8ed7

File tree

3 files changed

+76
-112
lines changed

3 files changed

+76
-112
lines changed

yt/yt/core/logging/log_manager.cpp

Lines changed: 66 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -609,27 +609,61 @@ class TLogManager::TImpl
609609
return EmplaceOrCrash(KeyToCachedWriter_, cacheKey, writers)->second;
610610
}
611611

612-
std::unique_ptr<TInotifyWatch> CreateNotificationWatch(
612+
TInotifyHandle* TryGetNotificationHandle()
613+
{
614+
if (!NotificationHandle_ && !NotificationHandleCreationFailed_) {
615+
try {
616+
NotificationHandle_ = std::make_unique<TInotifyHandle>();
617+
} catch (const std::exception& ex) {
618+
YT_LOG_ERROR(ex, "Error creating inotify handle, watching disabled");
619+
NotificationHandleCreationFailed_ = true;
620+
}
621+
}
622+
return NotificationHandle_.get();
623+
}
624+
625+
std::unique_ptr<TInotifyWatch> TryCreateNotificationWatch(
613626
const TLogManagerConfigPtr& config,
614627
const IFileLogWriterPtr& writer)
615628
{
616629
#ifdef _linux_
617630
if (config->WatchPeriod) {
618-
if (!NotificationHandle_) {
619-
NotificationHandle_ = std::make_unique<TInotifyHandle>();
631+
auto* notifcationHandle = TryGetNotificationHandle();
632+
if (!notifcationHandle) {
633+
return nullptr;
634+
}
635+
636+
try {
637+
return std::make_unique<TInotifyWatch>(
638+
notifcationHandle,
639+
writer->GetFileName(),
640+
EInotifyWatchEvents::DeleteSelf | EInotifyWatchEvents::MoveSelf);
641+
} catch (const std::exception& ex) {
642+
// Watch can fail to initialize if the writer is disabled
643+
// e.g. due to the lack of space.
644+
YT_LOG_ERROR(ex, "Error creating inotify watch (Path: %v)",
645+
writer->GetFileName());
646+
return nullptr;
620647
}
621-
return std::make_unique<TInotifyWatch>(
622-
NotificationHandle_.get(),
623-
writer->GetFileName(),
624-
EInotifyWatchEvents::DeleteSelf | EInotifyWatchEvents::MoveSelf,
625-
BIND(&ILogWriter::Reload, writer));
626648
}
627649
#else
628650
Y_UNUSED(config, writer);
629651
#endif
630652
return nullptr;
631653
}
632654

655+
void CreateNotificationWatchForWriter(
656+
const TLogManagerConfigPtr& config,
657+
const IFileLogWriterPtr& writer)
658+
{
659+
if (auto watch = TryCreateNotificationWatch(config, writer)) {
660+
EmplaceOrCrash(NotificationWatchWDToWriter_, watch->GetWD(), writer);
661+
EmplaceOrCrash(WriterToNotificationWatch_, writer, std::move(watch));
662+
} else {
663+
InsertOrCrash(WritersWithFailedNotificationWatches_, writer);
664+
}
665+
}
666+
633667
void UpdateConfig(const TConfigEvent& event)
634668
{
635669
YT_ASSERT_THREAD_AFFINITY(LoggingThread);
@@ -709,9 +743,10 @@ class TLogManager::TImpl
709743

710744
NameToWriter_.clear();
711745
KeyToCachedWriter_.clear();
712-
WDToNotificationWatch_.clear();
713-
NotificationWatches_.clear();
714-
InvalidNotificationWatches_.clear();
746+
747+
WriterToNotificationWatch_.clear();
748+
NotificationWatchWDToWriter_.clear();
749+
WritersWithFailedNotificationWatches_.clear();
715750

716751
for (const auto& [name, writerConfig] : config->Writers) {
717752
auto typedWriterConfig = ConvertTo<TLogWriterConfigPtr>(writerConfig);
@@ -729,11 +764,7 @@ class TLogManager::TImpl
729764
EmplaceOrCrash(NameToWriter_, name, writer);
730765

731766
if (auto fileWriter = DynamicPointerCast<IFileLogWriter>(writer)) {
732-
auto watch = CreateNotificationWatch(config, fileWriter);
733-
if (watch) {
734-
RegisterNotificatonWatch(watch.get());
735-
NotificationWatches_.push_back(std::move(watch));
736-
}
767+
CreateNotificationWatchForWriter(config, fileWriter);
737768
}
738769
}
739770

@@ -827,56 +858,31 @@ class TLogManager::TImpl
827858
}
828859
}
829860

830-
void RegisterNotificatonWatch(TInotifyWatch* watch)
831-
{
832-
YT_ASSERT_THREAD_AFFINITY(LoggingThread);
833-
834-
if (watch->IsValid()) {
835-
// Watch can fail to initialize if the writer is disabled
836-
// e.g. due to the lack of space.
837-
EmplaceOrCrash(WDToNotificationWatch_, watch->GetWD(), watch);
838-
} else {
839-
InvalidNotificationWatches_.push_back(watch);
840-
}
841-
}
842-
843861
void WatchWriters()
844862
{
845863
YT_ASSERT_THREAD_AFFINITY(LoggingThread);
846864

847-
if (!NotificationHandle_) {
865+
auto* notificationHandle = TryGetNotificationHandle();
866+
if (!notificationHandle) {
848867
return;
849868
}
850869

851-
int previousWD = -1;
852-
while (auto pollResult = NotificationHandle_->Poll()) {
853-
if (pollResult->WD == previousWD) {
854-
continue;
855-
}
856-
auto it = WDToNotificationWatch_.find(pollResult->WD);
857-
auto jt = WDToNotificationWatch_.end();
858-
if (it == jt) {
859-
continue;
860-
}
870+
auto config = Config_.Acquire();
861871

862-
auto* watch = it->second;
863-
watch->Run();
872+
// Always reload writers and retry registration for invalid watches.
873+
auto writersToReconsider = std::exchange(WritersWithFailedNotificationWatches_, {});
864874

865-
if (watch->GetWD() != pollResult->WD) {
866-
WDToNotificationWatch_.erase(it);
867-
RegisterNotificatonWatch(watch);
875+
while (auto pollResult = notificationHandle->Poll()) {
876+
if (auto writer = GetOrDefault(NotificationWatchWDToWriter_, pollResult->WD)) {
877+
EraseOrCrash(NotificationWatchWDToWriter_, pollResult->WD);
878+
EraseOrCrash(WriterToNotificationWatch_, writer);
879+
InsertOrCrash(writersToReconsider, writer);
868880
}
869-
870-
previousWD = pollResult->WD;
871881
}
872-
// Handle invalid watches, try to register they again.
873-
{
874-
std::vector<TInotifyWatch*> invalidNotificationWatches;
875-
invalidNotificationWatches.swap(InvalidNotificationWatches_);
876-
for (auto* watch : invalidNotificationWatches) {
877-
watch->Run();
878-
RegisterNotificatonWatch(watch);
879-
}
882+
883+
for (const auto& writer : writersToReconsider) {
884+
writer->Reload();
885+
CreateNotificationWatchForWriter(config, writer);
880886
}
881887
}
882888

@@ -1358,9 +1364,11 @@ class TLogManager::TImpl
13581364
const IThreadPoolPtr CompressionThreadPool_;
13591365

13601366
std::unique_ptr<TInotifyHandle> NotificationHandle_;
1361-
std::vector<std::unique_ptr<TInotifyWatch>> NotificationWatches_;
1362-
THashMap<int, TInotifyWatch*> WDToNotificationWatch_;
1363-
std::vector<TInotifyWatch*> InvalidNotificationWatches_;
1367+
bool NotificationHandleCreationFailed_ = false;
1368+
1369+
THashMap<IFileLogWriterPtr, std::unique_ptr<TInotifyWatch>> WriterToNotificationWatch_;
1370+
THashMap<int, IFileLogWriterPtr> NotificationWatchWDToWriter_;
1371+
THashSet<IFileLogWriterPtr> WritersWithFailedNotificationWatches_;
13641372

13651373
THashMap<TString, TLoggingAnchor*> AnchorMap_;
13661374
std::atomic<TLoggingAnchor*> FirstAnchor_ = nullptr;

yt/yt/core/misc/inotify.cpp

Lines changed: 9 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -77,64 +77,30 @@ std::optional<TInotifyHandle::TPollResult> TInotifyHandle::Poll()
7777
TInotifyWatch::TInotifyWatch(
7878
TInotifyHandle* handle,
7979
std::string path,
80-
EInotifyWatchEvents mask,
81-
TClosure callback)
80+
EInotifyWatchEvents mask)
8281
: FD_(handle->GetFD())
8382
, Path_(std::move(path))
8483
, Mask_(mask)
85-
, Callback_(std::move(callback))
8684
{
87-
YT_VERIFY(FD_ >= 0);
88-
CreateWatch();
89-
}
90-
91-
TInotifyWatch::~TInotifyWatch()
92-
{
93-
DropWatch();
94-
}
95-
96-
bool TInotifyWatch::IsValid() const
97-
{
98-
return WD_ >= 0;
99-
}
100-
101-
void TInotifyWatch::Run()
102-
{
103-
// Unregister before create a new file.
104-
DropWatch();
105-
Callback_();
106-
// Register the newly created file.
107-
CreateWatch();
108-
}
109-
110-
void TInotifyWatch::CreateWatch()
111-
{
112-
YT_VERIFY(WD_ < 0);
11385
#ifdef _linux_
86+
YT_VERIFY(FD_ >= 0);
11487
WD_ = inotify_add_watch(
11588
FD_,
11689
Path_.c_str(),
11790
ToUnderlying(Mask_));
118-
11991
if (WD_ < 0) {
120-
YT_LOG_ERROR(
121-
TError::FromSystem(errno),
122-
"Error registering watch (Path: %v)",
123-
Path_);
12492
WD_ = -1;
125-
} else if (WD_ > 0) {
126-
YT_LOG_DEBUG("Watch registered (WD: %v, Path: %v)",
127-
WD_,
128-
Path_);
129-
} else {
130-
YT_ABORT();
93+
THROW_ERROR_EXCEPTION("Error registering watch for %v",
94+
Path_)
95+
<< TError::FromSystem();
13196
}
132-
#else
133-
WD_ = -1;
97+
YT_LOG_DEBUG("Watch registered (WD: %v, Path: %v)",
98+
WD_,
99+
Path_);
134100
#endif
135101
}
136102

137-
void TInotifyWatch::DropWatch()
103+
TInotifyWatch::~TInotifyWatch()
138104
{
139105
#ifdef _linux_
140106
if (WD_ > 0) {
@@ -144,7 +110,6 @@ void TInotifyWatch::DropWatch()
144110
inotify_rm_watch(FD_, WD_);
145111
}
146112
#endif
147-
WD_ = -1;
148113
}
149114

150115
////////////////////////////////////////////////////////////////////////////////

yt/yt/core/misc/inotify.h

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,24 +58,15 @@ class TInotifyWatch
5858
TInotifyWatch(
5959
TInotifyHandle* handle,
6060
std::string path,
61-
EInotifyWatchEvents mask,
62-
TClosure callback);
61+
EInotifyWatchEvents mask);
6362
~TInotifyWatch();
6463

65-
bool IsValid() const;
66-
67-
void Run();
68-
6964
DEFINE_BYVAL_RO_PROPERTY(int, FD, -1);
7065
DEFINE_BYVAL_RO_PROPERTY(int, WD, -1);
7166

7267
private:
7368
const std::string Path_;
7469
const EInotifyWatchEvents Mask_;
75-
const TClosure Callback_;
76-
77-
void CreateWatch();
78-
void DropWatch();
7970
};
8071

8172
////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)