diff --git a/trpc/client/service_proxy.cc b/trpc/client/service_proxy.cc index b624adc4..32d309db 100644 --- a/trpc/client/service_proxy.cc +++ b/trpc/client/service_proxy.cc @@ -104,7 +104,7 @@ bool ServiceProxy::CheckTimeout(const ClientContextPtr& context) { } result.SetErrorMessage(error); - TRPC_LOG_ERROR(error); + TRPC_FMT_ERROR_EVERY_N(1000, "{}", error); context->SetStatus(std::move(result)); return true; diff --git a/trpc/runtime/common/heartbeat/heartbeat_info.cc b/trpc/runtime/common/heartbeat/heartbeat_info.cc index 13878fcb..ba0ee44d 100644 --- a/trpc/runtime/common/heartbeat/heartbeat_info.cc +++ b/trpc/runtime/common/heartbeat/heartbeat_info.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include "trpc/runtime/threadmodel/common/worker_thread.h" @@ -24,13 +25,16 @@ namespace trpc { +std::shared_mutex thread_heartbeat_function_mutex_; ThreadHeartBeatFunction thread_heartbeat_function{nullptr}; void RegisterThreadHeartBeatFunction(ThreadHeartBeatFunction&& heartbeat_function) { + std::unique_lock lock(thread_heartbeat_function_mutex_); thread_heartbeat_function = std::move(heartbeat_function); } void HeartBeat(uint32_t task_queue_size) { + std::shared_lock lock(thread_heartbeat_function_mutex_); if (TRPC_UNLIKELY(!thread_heartbeat_function)) { return; } diff --git a/trpc/runtime/iomodel/reactor/fiber/BUILD b/trpc/runtime/iomodel/reactor/fiber/BUILD index f3f1afc9..489475f8 100644 --- a/trpc/runtime/iomodel/reactor/fiber/BUILD +++ b/trpc/runtime/iomodel/reactor/fiber/BUILD @@ -28,6 +28,7 @@ cc_library( "//trpc/coroutine:fiber_timer", "//trpc/runtime/iomodel/reactor", "//trpc/runtime/iomodel/reactor/common:connection", + "//trpc/runtime/iomodel/reactor/fiber:fiber_reactor", "//trpc/util:align", "//trpc/util:likely", "//trpc/util:ref_ptr", diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc b/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc index d076139c..7699949e 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc @@ -16,6 +16,7 @@ #include "trpc/coroutine/fiber.h" #include "trpc/coroutine/fiber_condition_variable.h" #include "trpc/coroutine/fiber_timer.h" +#include "trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h" #include "trpc/util/log/logging.h" #include "trpc/util/likely.h" #include "trpc/util/thread/internal/memory_barrier.h" @@ -359,7 +360,7 @@ void FiberConnection::QueueCleanupCallbackCheck() { conn_unavailable_ = true; } // No need to take a reference to us, `OnCleanup()` has not been called. - GetReactor()->SubmitTask([this] { + Function cleanup_task = [this] { // The load below acts as a fence (paired with `exchange` above). (But // does it make sense?) (void)read_mostly_.seldomly_used->cleanup_queued.load(std::memory_order_acquire); @@ -386,7 +387,15 @@ void FiberConnection::QueueCleanupCallbackCheck() { std::scoped_lock _(read_mostly_.seldomly_used->cleanup_lk); read_mostly_.seldomly_used->cleanup_completed = true; read_mostly_.seldomly_used->cleanup_cv.notify_one(); - }); + }; + + if (!fiber::IsReactorKeepRunning()) { + GetReactor()->SubmitTask(std::move(cleanup_task)); + } else { + // In fiber reactor keep running mode, since `FiberMutex` is used in cleanup_task, it may cause coroutine switching, so it is not put into reactor to run + bool res = StartFiberDetached(std::move(cleanup_task)); + TRPC_ASSERT(res && "StartFiber for cleanup failed"); + } } } } diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.cc b/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.cc index b884b9c2..90c38184 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.cc +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.cc @@ -155,6 +155,10 @@ void SetReactorKeepRunning() { reactor_keep_running = true; } +bool IsReactorKeepRunning() { + return reactor_keep_running; +} + void SetReactorTaskQueueSize(uint32_t size) { reactor_task_queue_size = size; } diff --git a/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h b/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h index 2567c568..fcf86101 100644 --- a/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h +++ b/trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h @@ -110,6 +110,9 @@ void SetReactorNumPerSchedulingGroup(size_t size); /// @brief Set reactor keep running void SetReactorKeepRunning(); +/// @brief Is reactor keep running +bool IsReactorKeepRunning(); + /// @brief Set fiber reactor task queue size void SetReactorTaskQueueSize(uint32_t size); diff --git a/trpc/stream/stream_var.cc b/trpc/stream/stream_var.cc index 89620ab1..d33c0d55 100644 --- a/trpc/stream/stream_var.cc +++ b/trpc/stream/stream_var.cc @@ -20,6 +20,12 @@ namespace trpc::stream { +namespace { + +constexpr std::string_view kStreamRuntimeInfo = "trpc_stream_runtime_info"; + +} // namespace + StreamVarPtr CreateStreamVar(const std::string& var_path) { return MakeRefCounted(var_path); } StreamVar::StreamVar(const std::string& var_path) @@ -115,10 +121,15 @@ bool StreamVarHelper::ReportMetrics(const std::string& metrics_name, for (const auto& [var_path, var_value] : metrics) { SingleAttrMetricsInfo single_attr_info; + single_attr_info.name = kStreamRuntimeInfo; single_attr_info.dimension = var_path; single_attr_info.value = static_cast(var_value); single_attr_info.policy = MetricsPolicy::SUM; - metrics_ptr->SingleAttrReport(std::move(single_attr_info)); + if (metrics_ptr->SingleAttrReport(single_attr_info) != 0) { + TRPC_FMT_TRACE("trpc stream var metrics report failed, dimension:{} value:{}", single_attr_info.dimension, + single_attr_info.value); + return false; + } TRPC_FMT_TRACE("trpc stream var metrics report, dimension:{} value:{}", single_attr_info.dimension, single_attr_info.value); } diff --git a/trpc/stream/stream_var_test.cc b/trpc/stream/stream_var_test.cc index 46573df5..6aee33f2 100644 --- a/trpc/stream/stream_var_test.cc +++ b/trpc/stream/stream_var_test.cc @@ -53,7 +53,12 @@ class TestStreamVarMetrics : public trpc::Metrics { int ModuleReport(const ModuleMetricsInfo& info) override { return 0; } - int SingleAttrReport(const SingleAttrMetricsInfo& info) override { return 0; } + int SingleAttrReport(const SingleAttrMetricsInfo& info) override { + if (info.name.empty() || info.dimension.empty()) { + return -1; + } + return 0; + } int MultiAttrReport(const MultiAttrMetricsInfo& info) override { return 0; }