Skip to content

Commit 2af168b

Browse files
committed
add real cores and system threads stats (#16309)
1 parent 4b130d6 commit 2af168b

File tree

8 files changed

+317
-9
lines changed

8 files changed

+317
-9
lines changed

ydb/core/protos/node_whiteboard.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,16 @@ enum EConfigState {
281281
Outdated = 1;
282282
}
283283

284+
message TSystemThreadInfo {
285+
optional string Name = 1;
286+
optional uint32 Threads = 2;
287+
optional double SystemUsage = 3;
288+
optional double UserUsage = 4;
289+
optional uint32 MinorPageFaults = 5;
290+
optional uint32 MajorPageFaults = 6;
291+
map<string, uint32> States = 7;
292+
}
293+
284294
message TSystemStateInfo {
285295
message TPoolStats {
286296
optional string Name = 1;
@@ -346,6 +356,8 @@ message TSystemStateInfo {
346356
optional uint32 CoresTotal = 40;
347357
optional float NetworkUtilization = 41;
348358
optional uint64 NetworkWriteThroughput = 42;
359+
optional uint32 RealNumberOfCpus = 43; // number of cpus without cgroups limitations
360+
repeated TSystemThreadInfo Threads = 44;
349361
}
350362

351363
message TEvSystemStateRequest {

ydb/core/tablet/node_whiteboard.cpp

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <ydb/core/node_whiteboard/node_whiteboard.h>
1010
#include <ydb/core/base/nameservice.h>
1111
#include <ydb/core/base/counters.h>
12+
#include <ydb/core/util/cpuinfo.h>
1213
#include <ydb/core/util/tuples.h>
1314

1415
#include <util/string/split.h>
@@ -46,6 +47,7 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
4647
SystemStateInfo.SetNodeName(nodeName);
4748
}
4849
SystemStateInfo.SetNumberOfCpus(NSystemInfo::NumberOfCpus());
50+
SystemStateInfo.SetRealNumberOfCpus(NKikimr::RealNumberOfCpus());
4951
auto version = GetProgramRevision();
5052
if (!version.empty()) {
5153
SystemStateInfo.SetVersion(version);
@@ -56,8 +58,13 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
5658
SystemStateInfo.SetStartTime(ctx.Now().MilliSeconds());
5759
ctx.Send(ctx.SelfID, new TEvPrivate::TEvUpdateRuntimeStats());
5860

59-
auto group = NKikimr::GetServiceCounters(NKikimr::AppData()->Counters, "utils")
60-
->GetSubgroup("subsystem", "whiteboard");
61+
auto utils = NKikimr::GetServiceCounters(NKikimr::AppData()->Counters, "utils");
62+
UserTime = utils->GetCounter("Process/UserTime", true);
63+
SysTime = utils->GetCounter("Process/SystemTime", true);
64+
MinorPageFaults = utils->GetCounter("Process/MinorPageFaults", true);
65+
MajorPageFaults = utils->GetCounter("Process/MajorPageFaults", true);
66+
NumThreads = utils->GetCounter("Process/NumThreads", false);
67+
auto group = utils->GetSubgroup("subsystem", "whiteboard");
6168
MaxClockSkewWithPeerUsCounter = group->GetCounter("MaxClockSkewWithPeerUs");
6269
MaxClockSkewPeerIdCounter = group->GetCounter("MaxClockSkewPeerId");
6370

@@ -78,8 +85,19 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
7885
NKikimrWhiteboard::TSystemStateInfo SystemStateInfo;
7986
THolder<NTracing::ITraceCollection> TabletIntrospectionData;
8087

81-
::NMonitoring::TDynamicCounters::TCounterPtr MaxClockSkewWithPeerUsCounter;
82-
::NMonitoring::TDynamicCounters::TCounterPtr MaxClockSkewPeerIdCounter;
88+
NMonitoring::TDynamicCounters::TCounterPtr MaxClockSkewWithPeerUsCounter;
89+
NMonitoring::TDynamicCounters::TCounterPtr MaxClockSkewPeerIdCounter;
90+
NMonitoring::TDynamicCounters::TCounterPtr UserTime;
91+
ui64 SavedUserTime = 0;
92+
NMonitoring::TDynamicCounters::TCounterPtr SysTime;
93+
ui64 SavedSysTime = 0;
94+
NMonitoring::TDynamicCounters::TCounterPtr MinorPageFaults;
95+
ui64 SavedMinorPageFaults = 0;
96+
NMonitoring::TDynamicCounters::TCounterPtr MajorPageFaults;
97+
ui64 SavedMajorPageFaults = 0;
98+
NMonitoring::TDynamicCounters::TCounterPtr NumThreads;
99+
100+
TSystemThreadsMonitor ThreadsMonitor;
83101

84102
template <typename PropertyType>
85103
static ui64 GetDifference(PropertyType a, PropertyType b) {
@@ -1099,15 +1117,18 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
10991117
}
11001118

11011119
void Handle(TEvPrivate::TEvUpdateRuntimeStats::TPtr &, const TActorContext &ctx) {
1102-
static constexpr TDuration UPDATE_PERIOD = TDuration::Seconds(15);
1120+
static constexpr int UPDATE_PERIOD_SECONDS = 15;
1121+
static constexpr TDuration UPDATE_PERIOD = TDuration::Seconds(UPDATE_PERIOD_SECONDS);
1122+
auto now = TActivationContext::Now();
1123+
11031124
{
11041125
NKikimrWhiteboard::TSystemStateInfo systemStatsUpdate;
11051126
TVector<double> loadAverage = GetLoadAverage();
11061127
for (double d : loadAverage) {
11071128
systemStatsUpdate.AddLoadAverage(d);
11081129
}
11091130
if (CheckedMerge(SystemStateInfo, systemStatsUpdate)) {
1110-
SystemStateInfo.SetChangeTime(ctx.Now().MilliSeconds());
1131+
SystemStateInfo.SetChangeTime(now.MilliSeconds());
11111132
}
11121133
}
11131134

@@ -1124,12 +1145,24 @@ class TNodeWhiteboardService : public TActorBootstrapped<TNodeWhiteboardService>
11241145
SystemStateInfo.SetNetworkUtilization(MaxNetworkUtilization);
11251146
MaxNetworkUtilization = 0;
11261147
}
1127-
11281148
{
1129-
SystemStateInfo.SetNetworkWriteThroughput(SumNetworkWriteThroughput / UPDATE_PERIOD.Seconds());
1149+
SystemStateInfo.SetNetworkWriteThroughput(SumNetworkWriteThroughput / UPDATE_PERIOD_SECONDS);
11301150
SumNetworkWriteThroughput = 0;
11311151
}
1132-
1152+
auto threadPools = ThreadsMonitor.GetThreadPools(now);
1153+
SystemStateInfo.ClearThreads();
1154+
for (const auto& threadPool : threadPools) {
1155+
auto* threadInfo = SystemStateInfo.AddThreads();
1156+
threadInfo->SetName(threadPool.Name);
1157+
threadInfo->SetThreads(threadPool.Threads);
1158+
threadInfo->SetSystemUsage(threadPool.SystemUsage);
1159+
threadInfo->SetUserUsage(threadPool.UserUsage);
1160+
threadInfo->SetMajorPageFaults(threadPool.MajorPageFaults);
1161+
threadInfo->SetMinorPageFaults(threadPool.MinorPageFaults);
1162+
for (const auto& state : threadPool.States) {
1163+
threadInfo->MutableStates()->emplace(state.first, state.second);
1164+
}
1165+
}
11331166
UpdateSystemState(ctx);
11341167
ctx.Schedule(UPDATE_PERIOD, new TEvPrivate::TEvUpdateRuntimeStats());
11351168
}

ydb/core/util/cpuinfo.cpp

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
#include "cpuinfo.h"
2+
#include <cstdlib>
3+
#include <dirent.h>
4+
#include <fcntl.h>
5+
#include <fstream>
6+
#include <unistd.h>
7+
#include <unordered_set>
8+
#include <util/string/ascii.h>
9+
#include <util/string/builder.h>
10+
#include <util/system/info.h>
11+
12+
size_t NKikimr::RealNumberOfCpus() {
13+
// copy-pasted from util/system/info.cpp
14+
#if defined(_linux_)
15+
unsigned ret;
16+
int fd, nread, column;
17+
char buf[512];
18+
static const char matchstr[] = "processor\t:";
19+
20+
fd = open("/proc/cpuinfo", O_RDONLY);
21+
22+
if (fd == -1) {
23+
abort();
24+
}
25+
26+
column = 0;
27+
ret = 0;
28+
29+
while (true) {
30+
nread = read(fd, buf, sizeof(buf));
31+
32+
if (nread <= 0) {
33+
break;
34+
}
35+
36+
for (int i = 0; i < nread; ++i) {
37+
const char ch = buf[i];
38+
39+
if (ch == '\n') {
40+
column = 0;
41+
} else if (column != -1) {
42+
if (AsciiToLower(ch) == matchstr[column]) {
43+
++column;
44+
45+
if (column == sizeof(matchstr) - 1) {
46+
column = -1;
47+
++ret;
48+
}
49+
} else {
50+
column = -1;
51+
}
52+
}
53+
}
54+
}
55+
56+
if (ret == 0) {
57+
abort();
58+
}
59+
60+
close(fd);
61+
62+
return ret;
63+
#else
64+
return NSystemInfo::NumberOfCpus();
65+
#endif
66+
}
67+
68+
double TicksPerSec() {
69+
#ifdef _SC_CLK_TCK
70+
return sysconf(_SC_CLK_TCK);
71+
#else
72+
return 1;
73+
#endif
74+
}
75+
76+
std::vector<NKikimr::TSystemThreadsMonitor::TSystemThreadPoolInfo> NKikimr::TSystemThreadsMonitor::GetThreadPools(TInstant now) {
77+
UpdateSystemThreads(getpid());
78+
std::vector<TSystemThreadPoolInfo> result;
79+
result.reserve(NameToThreadIndex.size());
80+
double passedSeconds = (now - UpdateTime).SecondsFloat();
81+
double ticks = TicksPerSec() * passedSeconds;
82+
for (const auto& [name, tids] : NameToThreadIndex) {
83+
TSystemThreadPoolInfo& info = result.emplace_back();
84+
info.Name = name;
85+
info.Threads = tids.size();
86+
ui64 majorPageFaults = 0;
87+
ui64 minorPageFaults = 0;
88+
ui64 systemTime = 0;
89+
ui64 userTime = 0;
90+
std::array<ui16, 256> states{};
91+
for (pid_t tid : tids) {
92+
const TSystemThreadPoolState& state = SystemThreads[tid];
93+
majorPageFaults += state.DeltaMajorPageFaults;
94+
minorPageFaults += state.DeltaMinorPageFaults;
95+
systemTime += state.DeltaSystemTime;
96+
userTime += state.DeltaUserTime;
97+
if (state.State >= 'A' && state.State <= 'z') {
98+
states[size_t(state.State)]++;
99+
}
100+
}
101+
for (char c = 'A'; c <= 'z'; ++c) {
102+
if (states[size_t(c)] > 0) {
103+
info.States.emplace_back(c, states[c]);
104+
}
105+
}
106+
info.MajorPageFaults = double(majorPageFaults) / passedSeconds;
107+
info.MinorPageFaults = double(minorPageFaults) / passedSeconds;
108+
info.SystemUsage = double(systemTime) / ticks / info.Threads;
109+
info.UserUsage = double(userTime) / ticks / info.Threads;
110+
}
111+
UpdateTime = now;
112+
return result;
113+
}
114+
115+
void NKikimr::TSystemThreadsMonitor::UpdateSystemThreads(pid_t pid) {
116+
#if defined(_linux_)
117+
NameToThreadIndex.clear();
118+
std::string taskDir = "/proc/" + std::to_string(pid) + "/task";
119+
DIR* dir = opendir(taskDir.c_str());
120+
if (!dir) {
121+
SystemThreads.clear();
122+
return;
123+
}
124+
std::unordered_set<pid_t> currentThreads;
125+
currentThreads.reserve(SystemThreads.size());
126+
for (const auto& [tid, state] : SystemThreads) {
127+
currentThreads.insert(tid);
128+
}
129+
struct dirent* entry;
130+
while ((entry = readdir(dir)) != nullptr) {
131+
if (entry->d_name[0] == '.') {
132+
continue; // skip '.' and '..'
133+
}
134+
pid_t tid = atoi(entry->d_name);
135+
if (tid > 0) {
136+
UpdateSystemThread(pid, tid);
137+
currentThreads.erase(tid);
138+
}
139+
}
140+
for (pid_t tid : currentThreads) {
141+
SystemThreads.erase(tid);
142+
}
143+
closedir(dir);
144+
#else
145+
Y_UNUSED(pid);
146+
#endif
147+
}
148+
149+
void NKikimr::TSystemThreadsMonitor::UpdateSystemThread(pid_t pid, pid_t tid) {
150+
#if defined(_linux_)
151+
TSystemThreadPoolState& state = SystemThreads[tid];
152+
std::ifstream file("/proc/" + std::to_string(pid) + "/task/" + std::to_string(tid) + "/stat");
153+
if (file) {
154+
std::string line;
155+
std::getline(file, line);
156+
size_t name_start = line.find('(');
157+
size_t name_end = line.find(')', name_start);
158+
if (name_start == std::string::npos || name_end == std::string::npos) {
159+
return;
160+
}
161+
std::string threadName = line.substr(name_start + 1, name_end - name_start - 1);
162+
std::istringstream iss(line.substr(name_end + 2));
163+
std::string ignore;
164+
ui64 majorPageFaults;
165+
ui64 minorPageFaults;
166+
ui64 systemTime;
167+
ui64 userTime;
168+
iss >> state.State >> ignore >> ignore >> ignore >> ignore >> ignore >> ignore
169+
>> minorPageFaults >> ignore >> majorPageFaults >> ignore >> userTime >> systemTime;
170+
state.DeltaMajorPageFaults = majorPageFaults - state.MajorPageFaults;
171+
state.DeltaMinorPageFaults = minorPageFaults - state.MinorPageFaults;
172+
state.DeltaSystemTime = systemTime - state.SystemTime;
173+
state.DeltaUserTime = userTime - state.UserTime;
174+
state.MajorPageFaults = majorPageFaults;
175+
state.MinorPageFaults = minorPageFaults;
176+
state.SystemTime = systemTime;
177+
state.UserTime = userTime;
178+
//Cerr << threadName << " " << state.SystemTime << " " << state.UserTime << Endl;
179+
std::string name = GetNormalizedThreadName(threadName);
180+
NameToThreadIndex[name].push_back(tid);
181+
}
182+
#else
183+
Y_UNUSED(tid);
184+
#endif
185+
}
186+
187+
std::string NKikimr::TSystemThreadsMonitor::GetNormalizedThreadName(const std::string& name) {
188+
std::string result = name;
189+
while (!result.empty() && (isdigit(result.back()) || result.back() == '-' || result.back() == ' ')) {
190+
result.resize(result.size() - 1);
191+
}
192+
if (result.empty()) {
193+
return name;
194+
}
195+
// some well-known hacks
196+
if (result == "default-executo") {
197+
result = "default-executor";
198+
}
199+
if (result == "resolver-execut") {
200+
result = "resolver-executor";
201+
}
202+
if (result == "grpc_global_tim") {
203+
result = "grpc_global_timer";
204+
}
205+
if (result == "kikimr.Schedule") {
206+
result = "kikimr.Scheduler";
207+
}
208+
return result;
209+
}

ydb/core/util/cpuinfo.h

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#pragma once
2+
#include <cstddef>
3+
#include <sys/types.h>
4+
#include <map>
5+
#include <unordered_map>
6+
#include <util/datetime/base.h>
7+
#include <util/generic/string.h>
8+
9+
namespace NKikimr {
10+
11+
std::size_t RealNumberOfCpus();
12+
13+
class TSystemThreadsMonitor {
14+
public:
15+
struct TSystemThreadPoolInfo { // returned data per refresh per second
16+
TString Name;
17+
ui32 Threads;
18+
float SystemUsage;
19+
float UserUsage;
20+
ui32 MajorPageFaults;
21+
ui32 MinorPageFaults;
22+
std::vector<std::pair<char, ui32>> States;
23+
};
24+
25+
std::vector<TSystemThreadPoolInfo> GetThreadPools(TInstant now);
26+
27+
private:
28+
struct TSystemThreadPoolState { // stored state of a thread
29+
char State;
30+
ui64 SystemTime;
31+
ui64 UserTime;
32+
ui64 MajorPageFaults;
33+
ui64 MinorPageFaults;
34+
ui64 DeltaSystemTime;
35+
ui64 DeltaUserTime;
36+
ui64 DeltaMajorPageFaults;
37+
ui64 DeltaMinorPageFaults;
38+
};
39+
40+
std::unordered_map<pid_t, TSystemThreadPoolState> SystemThreads;
41+
std::map<std::string, std::vector<pid_t>> NameToThreadIndex;
42+
TInstant UpdateTime;
43+
44+
void UpdateSystemThreads(pid_t pid);
45+
void UpdateSystemThread(pid_t pid, pid_t tid);
46+
static std::string GetNormalizedThreadName(const std::string& name);
47+
};
48+
49+
} // namespace NKikimr

ydb/core/util/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ SRCS(
1414
console.cpp
1515
console.h
1616
counted_leaky_bucket.h
17+
cpuinfo.cpp
18+
cpuinfo.h
1719
defs.h
1820
event_priority_queue.h
1921
failure_injection.cpp

0 commit comments

Comments
 (0)