Skip to content

Bugfix: fixed the following bugs #202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion trpc/client/service_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ bool ServiceProxy::CheckTimeout(const ClientContextPtr& context) {
}
result.SetErrorMessage(error);

TRPC_LOG_ERROR(error);
TRPC_FMT_ERROR_EVERY_N(1000, "{}", error);
context->SetStatus(std::move(result));

return true;
Expand Down
4 changes: 4 additions & 0 deletions trpc/runtime/common/heartbeat/heartbeat_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <sys/syscall.h>
#include <unistd.h>

#include <shared_mutex>
#include <utility>

#include "trpc/runtime/threadmodel/common/worker_thread.h"
Expand All @@ -24,13 +25,16 @@

namespace trpc {

std::shared_mutex thread_heartbeat_function_mutex_;
ThreadHeartBeatFunction thread_heartbeat_function{nullptr};

void RegisterThreadHeartBeatFunction(ThreadHeartBeatFunction&& heartbeat_function) {
std::unique_lock<std::shared_mutex> lock(thread_heartbeat_function_mutex_);
thread_heartbeat_function = std::move(heartbeat_function);
}

void HeartBeat(uint32_t task_queue_size) {
std::shared_lock<std::shared_mutex> lock(thread_heartbeat_function_mutex_);
if (TRPC_UNLIKELY(!thread_heartbeat_function)) {
return;
}
Expand Down
1 change: 1 addition & 0 deletions trpc/runtime/iomodel/reactor/fiber/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cc_library(
"//trpc/coroutine:fiber_timer",
"//trpc/runtime/iomodel/reactor",
"//trpc/runtime/iomodel/reactor/common:connection",
"//trpc/runtime/iomodel/reactor/fiber:fiber_reactor",
"//trpc/util:align",
"//trpc/util:likely",
"//trpc/util:ref_ptr",
Expand Down
13 changes: 11 additions & 2 deletions trpc/runtime/iomodel/reactor/fiber/fiber_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "trpc/coroutine/fiber.h"
#include "trpc/coroutine/fiber_condition_variable.h"
#include "trpc/coroutine/fiber_timer.h"
#include "trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h"
#include "trpc/util/log/logging.h"
#include "trpc/util/likely.h"
#include "trpc/util/thread/internal/memory_barrier.h"
Expand Down Expand Up @@ -359,7 +360,7 @@ void FiberConnection::QueueCleanupCallbackCheck() {
conn_unavailable_ = true;
}
// No need to take a reference to us, `OnCleanup()` has not been called.
GetReactor()->SubmitTask([this] {
Function<void()> cleanup_task = [this] {
// The load below acts as a fence (paired with `exchange` above). (But
// does it make sense?)
(void)read_mostly_.seldomly_used->cleanup_queued.load(std::memory_order_acquire);
Expand All @@ -386,7 +387,15 @@ void FiberConnection::QueueCleanupCallbackCheck() {
std::scoped_lock _(read_mostly_.seldomly_used->cleanup_lk);
read_mostly_.seldomly_used->cleanup_completed = true;
read_mostly_.seldomly_used->cleanup_cv.notify_one();
});
};

if (!fiber::IsReactorKeepRunning()) {
GetReactor()->SubmitTask(std::move(cleanup_task));
} else {
// In fiber reactor keep running mode, since `FiberMutex` is used in cleanup_task, it may cause coroutine switching, so it is not put into reactor to run
bool res = StartFiberDetached(std::move(cleanup_task));
TRPC_ASSERT(res && "StartFiber for cleanup failed");
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions trpc/runtime/iomodel/reactor/fiber/fiber_reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ void SetReactorKeepRunning() {
reactor_keep_running = true;
}

bool IsReactorKeepRunning() {
return reactor_keep_running;
}

void SetReactorTaskQueueSize(uint32_t size) {
reactor_task_queue_size = size;
}
Expand Down
3 changes: 3 additions & 0 deletions trpc/runtime/iomodel/reactor/fiber/fiber_reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ void SetReactorNumPerSchedulingGroup(size_t size);
/// @brief Set reactor keep running
void SetReactorKeepRunning();

/// @brief Is reactor keep running
bool IsReactorKeepRunning();

/// @brief Set fiber reactor task queue size
void SetReactorTaskQueueSize(uint32_t size);

Expand Down
13 changes: 12 additions & 1 deletion trpc/stream/stream_var.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@

namespace trpc::stream {

namespace {

constexpr std::string_view kStreamRuntimeInfo = "trpc_stream_runtime_info";

} // namespace

StreamVarPtr CreateStreamVar(const std::string& var_path) { return MakeRefCounted<StreamVar>(var_path); }

StreamVar::StreamVar(const std::string& var_path)
Expand Down Expand Up @@ -115,10 +121,15 @@ bool StreamVarHelper::ReportMetrics(const std::string& metrics_name,

for (const auto& [var_path, var_value] : metrics) {
SingleAttrMetricsInfo single_attr_info;
single_attr_info.name = kStreamRuntimeInfo;
single_attr_info.dimension = var_path;
single_attr_info.value = static_cast<double>(var_value);
single_attr_info.policy = MetricsPolicy::SUM;
metrics_ptr->SingleAttrReport(std::move(single_attr_info));
if (metrics_ptr->SingleAttrReport(single_attr_info) != 0) {
TRPC_FMT_TRACE("trpc stream var metrics report failed, dimension:{} value:{}", single_attr_info.dimension,
single_attr_info.value);
return false;
}
TRPC_FMT_TRACE("trpc stream var metrics report, dimension:{} value:{}", single_attr_info.dimension,
single_attr_info.value);
}
Expand Down
7 changes: 6 additions & 1 deletion trpc/stream/stream_var_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ class TestStreamVarMetrics : public trpc::Metrics {

int ModuleReport(const ModuleMetricsInfo& info) override { return 0; }

int SingleAttrReport(const SingleAttrMetricsInfo& info) override { return 0; }
int SingleAttrReport(const SingleAttrMetricsInfo& info) override {
if (info.name.empty() || info.dimension.empty()) {
return -1;
}
return 0;
}

int MultiAttrReport(const MultiAttrMetricsInfo& info) override { return 0; }

Expand Down