diff --git a/src/common/classes/Spinlock.h b/src/common/classes/Spinlock.h new file mode 100644 index 00000000000..132214c00d9 --- /dev/null +++ b/src/common/classes/Spinlock.h @@ -0,0 +1,73 @@ +/* + * The contents of this file are subject to the Initial + * Developer's Public License Version 1.0 (the "License"); + * you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. + * + * Software distributed under the License is distributed AS IS, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. + * See the License for the specific language governing rights + * and limitations under the License. + * + * The Original Code was created by Adriano dos Santos Fernandes + * for the Firebird Open Source RDBMS project. + * + * Copyright (c) 2025 Adriano dos Santos Fernandes + * and all contributors signed below. + * + * All Rights Reserved. + * Contributor(s): ______________________________________. + */ + +#ifndef CLASSES_SPINLOCK_H +#define CLASSES_SPINLOCK_H + +#include +#include + +namespace Firebird { + + +// Spinlock implementation that can be used in shared memory. +// Based in example found in https://en.cppreference.com/w/cpp/atomic/atomic_flag.html +// Compatible with std::lock_guard, std::scoped_lock and std::unique_lock. +class SpinLock +{ + std::atomic_flag atomicFlag = ATOMIC_FLAG_INIT; + +public: + void lock() noexcept + { + while (atomicFlag.test_and_set(std::memory_order_acquire)) + { +#if defined(__cpp_lib_atomic_wait) && __cpp_lib_atomic_wait >= 201907L + // Since C++20, locks can be acquired only after notification in the unlock, + // avoiding any unnecessary spinning. + // Note that even though wait guarantees it returns only after the value has + // changed, the lock is acquired after the next condition check. + atomicFlag.wait(true, std::memory_order_relaxed); +#else + std::this_thread::yield(); +#endif + } + } + + bool try_lock() noexcept + { + return !atomicFlag.test_and_set(std::memory_order_acquire); + } + + void unlock() noexcept + { + atomicFlag.clear(std::memory_order_release); +#if defined(__cpp_lib_atomic_wait) && __cpp_lib_atomic_wait >= 201907L + atomicFlag.notify_one(); +#endif + } +}; + + +} // namespace Firebird + +#endif // CLASSES_SPINLOCK_H diff --git a/src/common/classes/locks.cpp b/src/common/classes/locks.cpp index 1f4f482d0d6..a66a6ca621c 100644 --- a/src/common/classes/locks.cpp +++ b/src/common/classes/locks.cpp @@ -38,14 +38,7 @@ namespace Firebird { -#if defined(WIN_NT) - -void Spinlock::init() -{ - SetCriticalSectionSpinCount(&spinlock, 4000); -} - -#else //posix mutex +#if !defined(WIN_NT) pthread_mutexattr_t Mutex::attr; diff --git a/src/common/classes/locks.h b/src/common/classes/locks.h index cda84ae1e4a..2c44ca01df3 100644 --- a/src/common/classes/locks.h +++ b/src/common/classes/locks.h @@ -50,8 +50,7 @@ class Exception; // Needed for catch #ifdef WIN_NT -// Generic process-local mutex and spinlock. The latter -// is used to manage memory heaps in a threaded environment. +// Generic process-local mutex. // Windows version of the class @@ -151,23 +150,6 @@ class Mutex : public Reasons Mutex& operator=(const Mutex&); }; -class Spinlock : public Mutex -{ -private: - void init(); - -public: - Spinlock() - { - init(); - } - - explicit Spinlock(MemoryPool&) - { - init(); - } -}; - #else //WIN_NT // Pthreads version of the class @@ -272,51 +254,6 @@ friend class Condition; Mutex& operator=(const Mutex&); }; -#ifdef NOT_USED_OR_REPLACED // we do not use spinlocks currently -class Spinlock -{ -private: - pthread_spinlock_t spinlock; -public: - Spinlock() - { - if (pthread_spin_init(&spinlock, false)) - system_call_failed::raise("pthread_spin_init"); - } - - explicit Spinlock(MemoryPool&) - { - if (pthread_spin_init(&spinlock, false)) - system_call_failed::raise("pthread_spin_init"); - } - - ~Spinlock() - { - if (pthread_spin_destroy(&spinlock)) - system_call_failed::raise("pthread_spin_destroy"); - } - - void enter() - { - if (pthread_spin_lock(&spinlock)) - system_call_failed::raise("pthread_spin_lock"); - } - - void leave() - { - if (pthread_spin_unlock(&spinlock)) - system_call_failed::raise("pthread_spin_unlock"); - } - -private: - // Forbid copying - Spinlock(const Spinlock&); - Spinlock& operator=(const Spinlock&); -}; -#else -typedef Mutex Spinlock; -#endif - #endif //WIN_NT diff --git a/src/jrd/ProfilerManager.cpp b/src/jrd/ProfilerManager.cpp index 0897ad355e3..7b5c6eb1b72 100644 --- a/src/jrd/ProfilerManager.cpp +++ b/src/jrd/ProfilerManager.cpp @@ -33,8 +33,10 @@ #include "../jrd/met_proto.h" #include "../jrd/pag_proto.h" #include "../jrd/tra_proto.h" +#include "../common/classes/Spinlock.h" #include +#include #ifdef WIN_NT #include @@ -100,11 +102,13 @@ namespace event_t clientEvent; USHORT bufferSize; std::atomic tag; + std::atomic_uint seq; + SpinLock bufferMutex; char userName[USERNAME_LENGTH + 1]; // \0 if has PROFILE_ANY_ATTACHMENT alignas(FB_ALIGNMENT) UCHAR buffer[4096]; }; - static const USHORT VERSION = 2; + static const USHORT VERSION = 3; public: ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server = false); @@ -179,7 +183,7 @@ class Jrd::ProfilerListener final listener->watcherThread(); } - void processCommand(thread_db* tdbb); + void processCommand(thread_db* tdbb, ProfilerIpc::Tag tag, UCharBuffer& buffer); private: Attachment* const attachment; @@ -736,6 +740,8 @@ ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmen { Guard guard(this); + header->seq = 0; + if (sharedMemory->eventInit(&header->serverEvent) != FB_SUCCESS) (Arg::Gds(isc_random) << "ProfilerIpc eventInit(serverEvent) failed").raise(); } @@ -817,18 +823,17 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag, } }); - const SLONG value = sharedMemory->eventClear(&header->clientEvent); + const SLONG clientEventCounter = sharedMemory->eventClear(&header->clientEvent); + + std::unique_lock bufferMutexLock(header->bufferMutex); - const Tag oldTag = header->tag.exchange(tag); - switch (oldTag) + switch (header->tag) { case Tag::NOP: - header->tag = oldTag; (Arg::Gds(isc_random) << "Remote attachment failed to start listener thread").raise(); break; case Tag::SERVER_EXITED: - header->tag = oldTag; (Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise(); break; @@ -846,41 +851,49 @@ void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag, fb_assert(inSize <= sizeof(header->buffer)); memcpy(header->buffer, in, inSize); + header->tag = tag; + const auto seq = ++header->seq; + + bufferMutexLock.unlock(); + if (sharedMemory->eventPost(&header->serverEvent) != FB_SUCCESS) (Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise(); + const SLONG TIMEOUT = 500 * 1000; // 0.5 sec + const int serverPID = header->serverEvent.event_pid; + + while (true) { - const SLONG TIMEOUT = 500 * 1000; // 0.5 sec + { // scope + EngineCheckout cout(tdbb, FB_FUNCTION); - const int serverPID = header->serverEvent.event_pid; - while (true) - { + if (sharedMemory->eventWait(&header->clientEvent, clientEventCounter, TIMEOUT) == FB_SUCCESS) + break; + + if (serverPID != getpid() && !ISC_check_process_existence(serverPID)) { - EngineCheckout cout(tdbb, FB_FUNCTION); - if (sharedMemory->eventWait(&header->clientEvent, value, TIMEOUT) == FB_SUCCESS) - break; + // Server process was died or exited + fb_assert((header->tag == tag) || header->tag == Tag::SERVER_EXITED); - if (serverPID != getpid() && !ISC_check_process_existence(serverPID)) + if (header->tag == tag) { - // Server process was died or exited - fb_assert((header->tag == tag) || header->tag == Tag::SERVER_EXITED); - - if (header->tag == tag) + header->tag = Tag::SERVER_EXITED; + if (header->serverEvent.event_pid) { - header->tag = Tag::SERVER_EXITED; - if (header->serverEvent.event_pid) - { - sharedMemory->eventFini(&header->serverEvent); - header->serverEvent.event_pid = 0; - } + sharedMemory->eventFini(&header->serverEvent); + header->serverEvent.event_pid = 0; } - break; } + + break; } - JRD_reschedule(tdbb, true); } + + JRD_reschedule(tdbb, true); } + bufferMutexLock.lock(); + switch (header->tag) { case Tag::SERVER_EXITED: @@ -977,7 +990,7 @@ void ProfilerListener::watcherThread() { while (!exiting) { - const SLONG value = sharedMemory->eventClear(&header->serverEvent); + const SLONG serverEventCounter = sharedMemory->eventClear(&header->serverEvent); if (startup) { @@ -986,6 +999,10 @@ void ProfilerListener::watcherThread() } else { + ProfilerIpc::Tag tag; + unsigned seq; + UCharBuffer buffer; + fb_assert(header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP); try @@ -993,11 +1010,27 @@ void ProfilerListener::watcherThread() FbLocalStatus statusVector; EngineContextHolder tdbb(&statusVector, attachment->getInterface(), FB_FUNCTION); - processCommand(tdbb); - header->tag = ProfilerIpc::Tag::RESPONSE; + { // scope + std::unique_lock bufferMutexLock(header->bufferMutex); + + if (header->userName[0] && attachment->getUserName() != header->userName) + status_exception::raise(Arg::Gds(isc_miss_prvlg) << "PROFILE_ANY_ATTACHMENT"); + + fb_assert(header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP); + + tag = header->tag; + seq = header->seq; + memcpy(buffer.getBuffer(header->bufferSize, false), header->buffer, header->bufferSize); + } + + processCommand(tdbb, tag, buffer); + + tag = ProfilerIpc::Tag::RESPONSE; } catch (const status_exception& e) { + tag = ProfilerIpc::Tag::EXCEPTION; + //// TODO: Serialize status vector instead of formated message. const ISC_STATUS* status = e.value(); @@ -1012,20 +1045,34 @@ void ProfilerListener::watcherThread() errorMsg += temp; } - header->bufferSize = MIN(errorMsg.length(), sizeof(header->buffer) - 1); - strncpy((char*) header->buffer, errorMsg.c_str(), sizeof(header->buffer)); - header->buffer[header->bufferSize] = '\0'; - - header->tag = ProfilerIpc::Tag::EXCEPTION; + header->bufferSize = MIN(errorMsg.length(), sizeof(header->buffer)); + memcpy(header->buffer, errorMsg.c_str(), header->bufferSize); } - sharedMemory->eventPost(&header->clientEvent); + fb_assert(buffer.getCount() <= sizeof(header->buffer)); + + { // scope + std::unique_lock bufferMutexLock(header->bufferMutex, std::try_to_lock); + + // Otherwise a client lost interest in the response. + if (bufferMutexLock.owns_lock() && header->seq == seq) + { + if (header->seq == seq) + { + header->tag = tag; + header->bufferSize = buffer.getCount(); + memcpy(header->buffer, buffer.begin(), buffer.getCount()); + + sharedMemory->eventPost(&header->clientEvent); + } + } + } } if (exiting) break; - sharedMemory->eventWait(&header->serverEvent, value, 0); + sharedMemory->eventWait(&header->serverEvent, serverEventCounter, 0); } } catch (const Exception& ex) @@ -1033,11 +1080,16 @@ void ProfilerListener::watcherThread() iscLogException("Error in profiler watcher thread\n", ex); } - const ProfilerIpc::Tag oldTag = header->tag.exchange(ProfilerIpc::Tag::SERVER_EXITED); - if (oldTag >= ProfilerIpc::Tag::FIRST_CLIENT_OP) - { - fb_assert(header->clientEvent.event_pid); - sharedMemory->eventPost(&header->clientEvent); + { // scope + std::unique_lock bufferMutexLock(header->bufferMutex); + + if (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP) + { + fb_assert(header->clientEvent.event_pid); + sharedMemory->eventPost(&header->clientEvent); + } + + header->tag = ProfilerIpc::Tag::SERVER_EXITED; } try @@ -1051,70 +1103,75 @@ void ProfilerListener::watcherThread() } } -void ProfilerListener::processCommand(thread_db* tdbb) +void ProfilerListener::processCommand(thread_db* tdbb, ProfilerIpc::Tag tag, UCharBuffer& buffer) { - const auto header = ipc->sharedMemory->getHeader(); const auto profilerManager = attachment->getProfilerManager(tdbb); - if (header->userName[0] && attachment->getUserName() != header->userName) - status_exception::raise(Arg::Gds(isc_miss_prvlg) << "PROFILE_ANY_ATTACHMENT"); - using Tag = ProfilerIpc::Tag; - switch (header->tag) + switch (tag) { case Tag::CANCEL_SESSION: + fb_assert(buffer.isEmpty()); profilerManager->cancelSession(); - header->bufferSize = 0; + buffer.resize(0); break; case Tag::DISCARD: + fb_assert(buffer.isEmpty()); profilerManager->discard(); - header->bufferSize = 0; + buffer.resize(0); break; case Tag::FINISH_SESSION: { - const auto in = reinterpret_cast(header->buffer); - fb_assert(sizeof(*in) == header->bufferSize); + const auto in = reinterpret_cast(buffer.begin()); + fb_assert(sizeof(*in) == buffer.getCount()); + profilerManager->finishSession(tdbb, in->flush); - header->bufferSize = 0; + + buffer.resize(0); break; } case Tag::FLUSH: + fb_assert(buffer.isEmpty()); profilerManager->flush(); - header->bufferSize = 0; + buffer.resize(0); break; case Tag::PAUSE_SESSION: { - const auto in = reinterpret_cast(header->buffer); - fb_assert(sizeof(*in) == header->bufferSize); + const auto in = reinterpret_cast(buffer.begin()); + fb_assert(sizeof(*in) == buffer.getCount()); + profilerManager->pauseSession(in->flush); - header->bufferSize = 0; + + buffer.resize(0); break; } case Tag::RESUME_SESSION: + fb_assert(buffer.isEmpty()); profilerManager->resumeSession(); - header->bufferSize = 0; + buffer.resize(0); break; case Tag::SET_FLUSH_INTERVAL: { - const auto in = reinterpret_cast(header->buffer); - fb_assert(sizeof(*in) == header->bufferSize); + const auto in = reinterpret_cast(buffer.begin()); + fb_assert(sizeof(*in) == buffer.getCount()); profilerManager->setFlushInterval(in->flushInterval); - header->bufferSize = 0; + + buffer.resize(0); break; } case Tag::START_SESSION: { - const auto in = reinterpret_cast(header->buffer); - fb_assert(sizeof(*in) == header->bufferSize); + const auto in = reinterpret_cast(buffer.begin()); + fb_assert(sizeof(*in) == buffer.getCount()); const string description(in->description.str, in->descriptionNull ? 0 : in->description.length); @@ -1125,14 +1182,12 @@ void ProfilerListener::processCommand(thread_db* tdbb) const string pluginOptions(in->pluginOptions.str, in->pluginOptionsNull ? 0 : in->pluginOptions.length); - const auto out = reinterpret_cast(header->buffer); - static_assert(sizeof(*out) <= sizeof(header->buffer), "Buffer size too small"); - header->bufferSize = sizeof(*out); + const auto out = reinterpret_cast(buffer.begin()); + buffer.resize(sizeof(*out)); out->sessionIdNull = FB_FALSE; out->sessionId = profilerManager->startSession(tdbb, flushInterval, pluginName, description, pluginOptions); - break; }