Skip to content

Commit 37e325a

Browse files
committed
Intermediate changes
commit_hash:26f232484de806b5e8d9650d8690d9033c889924
1 parent 71d6503 commit 37e325a

File tree

4 files changed

+51
-46
lines changed

4 files changed

+51
-46
lines changed

yt/yt/library/tracing/example/main.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
#include <util/system/env.h>
1010

11-
#include <random>
12-
1311
using namespace NYT;
1412
using namespace NYT::NTracing;
1513

@@ -99,7 +97,6 @@ NAuth::TTvmServiceConfigPtr GetTvmConfig()
9997
int main(int argc, char* argv[])
10098
{
10199
try {
102-
103100
bool test = false;
104101
auto usage = Format("usage: %v [--test] COLLECTOR_ENDPOINTS", argv[0]);
105102

yt/yt/library/tracing/jaeger/private.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,9 @@ YT_DEFINE_GLOBAL(const NProfiling::TProfiler, TracingProfiler, "/tracing");
1717

1818
////////////////////////////////////////////////////////////////////////////////
1919

20+
DECLARE_REFCOUNTED_CLASS(TJaegerTracer)
21+
DECLARE_REFCOUNTED_CLASS(TJaegerChannelManager)
22+
23+
////////////////////////////////////////////////////////////////////////////////
24+
2025
} // namespace NYT::NTracing

yt/yt/library/tracing/jaeger/tracer.cpp

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ using namespace NAuth;
3838

3939
////////////////////////////////////////////////////////////////////////////////
4040

41-
static const auto& Logger = JaegerLogger;
42-
static const auto& Profiler = TracingProfiler;
41+
static constexpr auto& Logger = JaegerLogger;
42+
static constexpr auto& Profiler = TracingProfiler;
4343

4444
////////////////////////////////////////////////////////////////////////////////
4545

@@ -50,7 +50,8 @@ static const TString TracingServiceAlias = "tracing";
5050

5151
void TJaegerTracerDynamicConfig::Register(TRegistrar registrar)
5252
{
53-
registrar.Parameter("collector_channel_config", &TThis::CollectorChannelConfig)
53+
registrar.Parameter("collector_channel", &TThis::CollectorChannel)
54+
.Alias("collector_channel_config")
5455
.Optional();
5556
registrar.Parameter("max_request_size", &TThis::MaxRequestSize)
5657
.Default();
@@ -110,8 +111,8 @@ TJaegerTracerConfigPtr TJaegerTracerConfig::ApplyDynamic(const TJaegerTracerDyna
110111
{
111112
auto config = New<TJaegerTracerConfig>();
112113
config->CollectorChannelConfig = CollectorChannelConfig;
113-
if (dynamicConfig->CollectorChannelConfig) {
114-
config->CollectorChannelConfig = dynamicConfig->CollectorChannelConfig;
114+
if (dynamicConfig->CollectorChannel) {
115+
config->CollectorChannelConfig = dynamicConfig->CollectorChannel;
115116
}
116117

117118
config->FlushPeriod = dynamicConfig->FlushPeriod.value_or(FlushPeriod);
@@ -231,9 +232,6 @@ std::vector<TK> ExtractKeys(THashMap<TK, TV> const& inputMap) {
231232

232233
////////////////////////////////////////////////////////////////////////////////
233234

234-
TBatchInfo::TBatchInfo()
235-
{ }
236-
237235
TBatchInfo::TBatchInfo(const TString& endpoint)
238236
: TracesDequeued_(Profiler().WithTag("endpoint", endpoint).Counter("/traces_dequeued"))
239237
, TracesDropped_(Profiler().WithTag("endpoint", endpoint).Counter("/traces_dropped"))
@@ -383,14 +381,14 @@ TInstant TJaegerChannelManager::GetReopenTime()
383381
}
384382

385383
TJaegerTracer::TJaegerTracer(
386-
const TJaegerTracerConfigPtr& config)
384+
TJaegerTracerConfigPtr config)
387385
: ActionQueue_(New<TActionQueue>("Jaeger"))
388386
, FlushExecutor_(New<TPeriodicExecutor>(
389387
ActionQueue_->GetInvoker(),
390-
BIND(&TJaegerTracer::Flush, MakeStrong(this)),
388+
BIND(&TJaegerTracer::DoFlush, MakeStrong(this)),
391389
config->FlushPeriod))
392-
, Config_(config)
393390
, TvmService_(config->TvmService ? CreateTvmService(config->TvmService) : nullptr)
391+
, Config_(std::move(config))
394392
{
395393
Profiler().AddFuncGauge("/enabled", MakeStrong(this), [this] {
396394
return Config_.Acquire()->IsEnabled();
@@ -553,18 +551,19 @@ void TJaegerTracer::DropFullQueue()
553551
}
554552
}
555553

556-
void TJaegerTracer::Flush()
554+
void TJaegerTracer::DoFlush()
557555
{
558556
YT_LOG_DEBUG("Started span flush");
559557

558+
560559
auto config = Config_.Acquire();
561560

562561
auto flushStartTime = TInstant::Now();
563562

564563
if (OpenChannelConfig_ != config->CollectorChannelConfig) {
565564
OpenChannelConfig_ = config->CollectorChannelConfig;
566565
for (auto& [endpoint, channel] : CollectorChannels_) {
567-
CollectorChannels_[endpoint].ForceReset(flushStartTime);
566+
CollectorChannels_[endpoint]->ForceReset(flushStartTime);
568567
}
569568
}
570569

@@ -582,20 +581,19 @@ void TJaegerTracer::Flush()
582581
return;
583582
}
584583

585-
std::stack<TString> toRemove;
586584
auto keys = ExtractKeys(BatchInfo_);
587-
588585
if (keys.empty()) {
589586
YT_LOG_DEBUG("Span batch info is empty");
590587
LastSuccessfulFlushTime_ = flushStartTime;
591588
NotifyEmptyQueue();
592589
return;
593590
}
594591

592+
std::stack<TString> toRemove;
595593
for (const auto& endpoint : keys) {
596594
auto [batches, batchCount, spanCount] = PeekQueue(config, endpoint);
597595
if (batchCount <= 0) {
598-
if (!CollectorChannels_.contains(endpoint) || flushStartTime > CollectorChannels_[endpoint].GetReopenTime() + config->EndpointChannelTimeout) {
596+
if (!CollectorChannels_.contains(endpoint) || flushStartTime > CollectorChannels_[endpoint]->GetReopenTime() + config->EndpointChannelTimeout) {
599597
toRemove.push(endpoint);
600598
}
601599
YT_LOG_DEBUG("Span queue is empty (Endpoint: %v)", endpoint);
@@ -614,16 +612,16 @@ void TJaegerTracer::Flush()
614612

615613
auto it = CollectorChannels_.find(endpoint);
616614
if (it == CollectorChannels_.end()) {
617-
it = CollectorChannels_.emplace(endpoint, TJaegerChannelManager(config, endpoint, TvmService_)).first;
615+
it = CollectorChannels_.emplace(endpoint, New<TJaegerChannelManager>(config, endpoint, TvmService_)).first;
618616
}
619617

620618
auto& channel = it->second;
621619

622-
if (channel.NeedsReopen(flushStartTime)) {
623-
channel = TJaegerChannelManager(config, endpoint, TvmService_);
620+
if (channel->NeedsReopen(flushStartTime)) {
621+
channel = New<TJaegerChannelManager>(config, endpoint, TvmService_);
624622
}
625623

626-
if (channel.Push(batches, spanCount)) {
624+
if (channel->Push(batches, spanCount)) {
627625
DropQueue(batchCount, endpoint);
628626
YT_LOG_DEBUG("Spans sent (Endpoint: %v)", endpoint);
629627
LastSuccessfulFlushTime_ = flushStartTime;

yt/yt/library/tracing/jaeger/tracer.h

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "public.h"
3+
#include "private.h"
44

55
#include <yt/yt/library/tracing/tracer.h>
66

@@ -28,7 +28,7 @@ class TJaegerTracerDynamicConfig
2828
: public NYTree::TYsonStruct
2929
{
3030
public:
31-
NRpc::NGrpc::TChannelConfigPtr CollectorChannelConfig;
31+
NRpc::NGrpc::TChannelConfigPtr CollectorChannel;
3232

3333
std::optional<i64> MaxRequestSize;
3434

@@ -98,12 +98,10 @@ DEFINE_REFCOUNTED_TYPE(TJaegerTracerConfig)
9898

9999
////////////////////////////////////////////////////////////////////////////////
100100

101-
DECLARE_REFCOUNTED_CLASS(TJaegerTracer)
102-
103101
class TBatchInfo
104102
{
105103
public:
106-
TBatchInfo();
104+
TBatchInfo() = default;
107105
explicit TBatchInfo(const TString& endpoint);
108106

109107
void PopFront();
@@ -113,18 +111,21 @@ class TBatchInfo
113111
std::tuple<std::vector<TSharedRef>, int, int> PeekQueue(const TJaegerTracerConfigPtr& config, std::optional<TSharedRef> processInfo);
114112

115113
private:
114+
const NProfiling::TCounter TracesDequeued_;
115+
const NProfiling::TCounter TracesDropped_;
116+
const NProfiling::TGauge MemoryUsage_;
117+
const NProfiling::TGauge TraceQueueSize_;
118+
116119
std::deque<std::pair<int, TSharedRef>> BatchQueue_;
117120

118121
i64 QueueMemory_ = 0;
119122
i64 QueueSize_ = 0;
120-
121-
NProfiling::TCounter TracesDequeued_;
122-
NProfiling::TCounter TracesDropped_;
123-
NProfiling::TGauge MemoryUsage_;
124-
NProfiling::TGauge TraceQueueSize_;
125123
};
126124

125+
////////////////////////////////////////////////////////////////////////////////
126+
127127
class TJaegerChannelManager
128+
: public TRefCounted
128129
{
129130
public:
130131
TJaegerChannelManager();
@@ -140,25 +141,29 @@ class TJaegerChannelManager
140141
TInstant GetReopenTime();
141142

142143
private:
143-
NRpc::IChannelPtr Channel_;
144-
NAuth::ITvmServicePtr TvmService_;
144+
const NAuth::ITvmServicePtr TvmService_;
145+
const TString Endpoint_;
145146

146-
TString Endpoint_;
147+
const TInstant ReopenTime_;
148+
const TDuration RpcTimeout_;
147149

148-
TInstant ReopenTime_;
149-
TDuration RpcTimeout_;
150+
const NProfiling::TCounter PushedBytes_;
151+
const NProfiling::TCounter PushErrors_;
152+
const NProfiling::TSummary PayloadSize_;
153+
const NProfiling::TEventTimer PushDuration_;
150154

151-
NProfiling::TCounter PushedBytes_;
152-
NProfiling::TCounter PushErrors_;
153-
NProfiling::TSummary PayloadSize_;
154-
NProfiling::TEventTimer PushDuration_;
155+
NRpc::IChannelPtr Channel_;
155156
};
156157

158+
DEFINE_REFCOUNTED_TYPE(TJaegerChannelManager)
159+
160+
////////////////////////////////////////////////////////////////////////////////
161+
157162
class TJaegerTracer
158163
: public ITracer
159164
{
160165
public:
161-
TJaegerTracer(const TJaegerTracerConfigPtr& config);
166+
explicit TJaegerTracer(TJaegerTracerConfigPtr config);
162167

163168
TFuture<void> WaitFlush();
164169

@@ -171,6 +176,7 @@ class TJaegerTracer
171176
private:
172177
const NConcurrency::TActionQueuePtr ActionQueue_;
173178
const NConcurrency::TPeriodicExecutorPtr FlushExecutor_;
179+
const NAuth::ITvmServicePtr TvmService_;
174180

175181
TAtomicIntrusivePtr<TJaegerTracerConfig> Config_;
176182

@@ -184,12 +190,11 @@ class TJaegerTracer
184190

185191
TAtomicObject<TPromise<void>> QueueEmptyPromise_ = NewPromise<void>();
186192

187-
THashMap<TString, TJaegerChannelManager> CollectorChannels_;
193+
THashMap<TString, TJaegerChannelManagerPtr> CollectorChannels_;
188194
NRpc::NGrpc::TChannelConfigPtr OpenChannelConfig_;
189195

190-
NAuth::ITvmServicePtr TvmService_;
191196

192-
void Flush();
197+
void DoFlush();
193198
void DequeueAll(const TJaegerTracerConfigPtr& config);
194199
void NotifyEmptyQueue();
195200

0 commit comments

Comments
 (0)