Skip to content

Commit 04d3517

Browse files
committed
Unify client reconnection thread in CollectorOutput
1 parent 75b7613 commit 04d3517

File tree

8 files changed

+132
-144
lines changed

8 files changed

+132
-144
lines changed

collector/lib/CollectorOutput.cpp

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
#include "internalapi/sensor/collector.pb.h"
44
#include "internalapi/sensor/collector_iservice.pb.h"
55

6-
#include "HostInfo.h"
6+
#include "GRPCUtil.h"
77

88
namespace collector {
99

1010
CollectorOutput::CollectorOutput(const CollectorConfig& config) {
1111
if (config.grpc_channel != nullptr) {
12-
auto sensor_client = std::make_unique<SensorClient>(config.grpc_channel);
13-
auto signal_client = std::make_unique<SignalServiceClient>(config.grpc_channel);
12+
channel_ = config.grpc_channel;
13+
14+
auto sensor_client = std::make_unique<SensorClient>(channel_);
15+
auto signal_client = std::make_unique<SignalServiceClient>(channel_);
1416
sensor_clients_.emplace_back(std::move(sensor_client));
1517
signal_clients_.emplace_back(std::move(signal_client));
1618
}
@@ -26,39 +28,62 @@ CollectorOutput::CollectorOutput(const CollectorConfig& config) {
2628
CLOG(FATAL) << "No available output configured";
2729
}
2830

29-
StartClients(sensor_clients_);
30-
StartClients(signal_clients_);
31+
thread_.Start([this] { EstablishGrpcStream(); });
32+
}
33+
34+
void CollectorOutput::HandleOutputError() {
35+
CLOG(ERROR) << "GRPC stream interrupted";
36+
stream_active_.store(false, std::memory_order_release);
37+
stream_interrupted_.notify_one();
3138
}
3239

33-
namespace {
34-
SignalHandler::Result SensorOutput(std::vector<std::unique_ptr<ISensorClient>>& clients, const sensor::MsgFromCollector& msg) {
35-
for (auto& client : clients) {
40+
SignalHandler::Result CollectorOutput::SensorOutput(const sensor::MsgFromCollector& msg) {
41+
for (auto& client : sensor_clients_) {
3642
auto res = client->SendMsg(msg);
37-
if (res != SignalHandler::PROCESSED) {
38-
return res;
43+
switch (res) {
44+
case SignalHandler::PROCESSED:
45+
break;
46+
47+
case SignalHandler::ERROR:
48+
HandleOutputError();
49+
return res;
50+
51+
case SignalHandler::IGNORED:
52+
case SignalHandler::NEEDS_REFRESH:
53+
case SignalHandler::FINISHED:
54+
return res;
3955
}
4056
}
4157
return SignalHandler::PROCESSED;
4258
}
4359

44-
SignalHandler::Result SignalOutput(std::vector<std::unique_ptr<ISignalServiceClient>>& clients, const sensor::SignalStreamMessage& msg) {
45-
for (auto& client : clients) {
60+
SignalHandler::Result CollectorOutput::SignalOutput(const sensor::SignalStreamMessage& msg) {
61+
for (auto& client : signal_clients_) {
4662
auto res = client->PushSignals(msg);
47-
if (res != SignalHandler::PROCESSED) {
48-
return res;
63+
switch (res) {
64+
case SignalHandler::PROCESSED:
65+
break;
66+
67+
case SignalHandler::ERROR:
68+
HandleOutputError();
69+
return res;
70+
71+
case SignalHandler::IGNORED:
72+
case SignalHandler::NEEDS_REFRESH:
73+
case SignalHandler::FINISHED:
74+
return res;
4975
}
5076
}
5177
return SignalHandler::PROCESSED;
5278
}
53-
} // namespace
5479

5580
SignalHandler::Result CollectorOutput::SendMsg(const MessageType& msg) {
5681
auto visitor = [this](auto&& m) {
5782
using T = std::decay_t<decltype(m)>;
5883
if constexpr (std::is_same_v<T, sensor::MsgFromCollector>) {
59-
return SensorOutput(sensor_clients_, m);
84+
return SensorOutput(m);
6085
} else if constexpr (std::is_same_v<T, sensor::SignalStreamMessage>) {
61-
return SignalOutput(signal_clients_, m);
86+
return SignalOutput(m);
6287
}
6388

6489
// Unknown type
@@ -82,4 +107,43 @@ void CollectorOutput::Register() {
82107
}
83108
}
84109
}
110+
111+
void CollectorOutput::EstablishGrpcStream() {
112+
while (EstablishGrpcStreamSingle()) {
113+
}
114+
CLOG(INFO) << "Signal service client terminating.";
115+
}
116+
117+
bool CollectorOutput::EstablishGrpcStreamSingle() {
118+
std::mutex mtx;
119+
std::unique_lock<std::mutex> lock(mtx);
120+
stream_interrupted_.wait(lock, [this]() { return !stream_active_.load(std::memory_order_acquire) || thread_.should_stop(); });
121+
if (thread_.should_stop()) {
122+
return false;
123+
}
124+
125+
CLOG(INFO) << "Trying to establish GRPC stream...";
126+
127+
if (!WaitForChannelReady(channel_, [this]() { return thread_.should_stop(); })) {
128+
return false;
129+
}
130+
if (thread_.should_stop()) {
131+
return false;
132+
}
133+
134+
// Refresh all clients
135+
auto success = true;
136+
for (const auto& client : signal_clients_) {
137+
success &= client->Refresh();
138+
}
139+
140+
for (const auto& client : sensor_clients_) {
141+
success &= client->Refresh();
142+
}
143+
144+
if (success) {
145+
stream_active_.store(true, std::memory_order_release);
146+
}
147+
return true;
148+
}
85149
} // namespace collector

collector/lib/CollectorOutput.h

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,42 +17,40 @@ using MessageType = std::variant<sensor::MsgFromCollector, sensor::SignalStreamM
1717

1818
class CollectorOutput {
1919
public:
20-
CollectorOutput(const CollectorOutput&) = default;
20+
CollectorOutput(const CollectorOutput&) = delete;
2121
CollectorOutput(CollectorOutput&&) = delete;
2222
CollectorOutput& operator=(const CollectorOutput&) = delete;
2323
CollectorOutput& operator=(CollectorOutput&&) = delete;
2424

2525
CollectorOutput(const CollectorConfig& config);
2626

2727
~CollectorOutput() {
28-
StopClients(sensor_clients_);
29-
StopClients(signal_clients_);
28+
stream_interrupted_.notify_one();
29+
thread_.Stop();
3030
}
3131

3232
SignalHandler::Result SendMsg(const MessageType& msg);
3333
void Register();
3434

35-
bool UseSensorClient() { return use_sensor_client_; }
35+
bool UseSensorClient() const { return use_sensor_client_; }
3636

3737
private:
38-
template <typename T>
39-
void StartClients(std::vector<T>& clients) {
40-
for (auto& client : clients) {
41-
client->Start();
42-
}
43-
}
38+
void EstablishGrpcStream();
39+
bool EstablishGrpcStreamSingle();
4440

45-
template <typename T>
46-
void StopClients(std::vector<T>& clients) {
47-
for (auto& client : clients) {
48-
client->Stop();
49-
}
50-
}
41+
void HandleOutputError();
42+
SignalHandler::Result SensorOutput(const sensor::MsgFromCollector& msg);
43+
SignalHandler::Result SignalOutput(const sensor::SignalStreamMessage& msg);
5144

5245
std::vector<std::unique_ptr<ISensorClient>> sensor_clients_;
5346
std::vector<std::unique_ptr<ISignalServiceClient>> signal_clients_;
5447

5548
bool use_sensor_client_{true};
49+
50+
StoppableThread thread_;
51+
std::atomic<bool> stream_active_{false};
52+
std::condition_variable stream_interrupted_;
53+
std::shared_ptr<grpc::Channel> channel_;
5654
};
5755

5856
} // namespace collector

collector/lib/CollectorService.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ void CollectorService::RunForever() {
9696
// Start monitoring services.
9797
config_loader_.Start();
9898

99+
output_.Register();
100+
99101
CLOG(INFO) << "Network scrape interval set to " << config_.ScrapeInterval() << " seconds";
100102

101103
if (net_status_notifier_) {
@@ -106,8 +108,6 @@ void CollectorService::RunForever() {
106108
CLOG(FATAL) << "Unable to start collector stats exporter";
107109
}
108110

109-
output_.Register();
110-
111111
system_inspector_.Start();
112112

113113
ControlValue cv;

collector/lib/SensorClient.cpp

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,23 @@
11
#include "SensorClient.h"
22

3-
#include "GRPCUtil.h"
43
#include "Logging.h"
54

65
namespace collector {
7-
bool SensorClient::EstablishGRPCStreamSingle() {
8-
std::mutex mtx;
9-
std::unique_lock<std::mutex> lock(mtx);
10-
stream_interrupted_.wait(lock, [this]() { return !stream_active_.load(std::memory_order_acquire) || thread_.should_stop(); });
11-
if (thread_.should_stop()) {
12-
return false;
13-
}
14-
15-
CLOG(INFO) << "Trying to establish GRPC stream for signals ...";
16-
17-
if (!WaitForChannelReady(channel_, [this]() { return thread_.should_stop(); })) {
18-
return false;
19-
}
20-
if (thread_.should_stop()) {
21-
return false;
22-
}
23-
24-
// stream writer
6+
bool SensorClient::Refresh() {
257
context_ = std::make_unique<grpc::ClientContext>();
268
writer_ = DuplexClient::CreateWithReadsIgnored(&sensor::CollectorService::Stub::AsyncCommunicate, channel_, context_.get());
279
if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) {
2810
CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ...";
2911
CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message();
3012
writer_.reset();
31-
return true;
13+
return false;
3214
}
33-
CLOG(INFO) << "Successfully established GRPC stream for signals.";
3415

3516
first_write_ = true;
3617
stream_active_.store(true, std::memory_order_release);
3718
return true;
3819
}
3920

40-
void SensorClient::EstablishGRPCStream() {
41-
while (EstablishGRPCStreamSingle());
42-
CLOG(INFO) << "Signal service client terminating.";
43-
}
44-
45-
void SensorClient::Start() {
46-
thread_.Start([this] { EstablishGRPCStream(); });
47-
}
48-
49-
void SensorClient::Stop() {
50-
stream_interrupted_.notify_one();
51-
thread_.Stop();
52-
context_->TryCancel();
53-
context_.reset();
54-
}
55-
5621
SignalHandler::Result SensorClient::SendMsg(const sensor::MsgFromCollector& msg) {
5722
if (!stream_active_.load(std::memory_order_acquire)) {
5823
CLOG_THROTTLED(ERROR, std::chrono::seconds(10))
@@ -65,16 +30,15 @@ SignalHandler::Result SensorClient::SendMsg(const sensor::MsgFromCollector& msg)
6530
return SignalHandler::NEEDS_REFRESH;
6631
}
6732

68-
if (!writer_->Write(msg)) {
33+
auto res = writer_->Write(msg);
34+
if (!res) {
6935
auto status = writer_->FinishNow();
7036
if (!status.ok()) {
7137
CLOG(ERROR) << "GRPC writes failed: (" << status.error_code() << ") " << status.error_message();
7238
}
7339
writer_.reset();
7440

7541
stream_active_.store(false, std::memory_order_release);
76-
CLOG(ERROR) << "GRPC stream interrupted";
77-
stream_interrupted_.notify_one();
7842
return SignalHandler::ERROR;
7943
}
8044

collector/lib/SensorClient.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ class ISensorClient {
2626
ISensorClient& operator=(ISensorClient&&) = delete;
2727
virtual ~ISensorClient() = default;
2828

29-
virtual void Start() = 0;
30-
virtual void Stop() = 0;
29+
virtual bool Refresh() = 0;
3130

3231
virtual SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) = 0;
3332
};
@@ -36,24 +35,26 @@ class SensorClient : public ISensorClient {
3635
public:
3736
using Service = sensor::CollectorService;
3837

38+
SensorClient(const SensorClient&) = delete;
39+
SensorClient(SensorClient&&) = delete;
40+
SensorClient& operator=(const SensorClient&) = delete;
41+
SensorClient& operator=(SensorClient&&) = delete;
42+
~SensorClient() override {
43+
context_->TryCancel();
44+
}
45+
3946
explicit SensorClient(std::shared_ptr<grpc::Channel> channel)
4047
: channel_(std::move(channel)) {
4148
}
4249

43-
void Start() override;
44-
void Stop() override;
50+
bool Refresh() override;
4551

4652
SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) override;
4753

4854
private:
49-
void EstablishGRPCStream();
50-
bool EstablishGRPCStreamSingle();
51-
5255
std::shared_ptr<grpc::Channel> channel_;
5356

54-
StoppableThread thread_;
5557
std::atomic<bool> stream_active_ = false;
56-
std::condition_variable stream_interrupted_;
5758

5859
// This needs to have the same lifetime as the class.
5960
std::unique_ptr<grpc::ClientContext> context_;
@@ -63,8 +64,7 @@ class SensorClient : public ISensorClient {
6364
};
6465

6566
class SensorClientStdout : public ISensorClient {
66-
void Start() override {}
67-
void Stop() override {}
67+
bool Refresh() override { return true; }
6868

6969
SignalHandler::Result SendMsg(const sensor::MsgFromCollector& msg) override {
7070
LogProtobufMessage(msg);

0 commit comments

Comments
 (0)