Skip to content

Commit 407df52

Browse files
authored
Tracing in topic write session (#12649)
1 parent 121c8b3 commit 407df52

36 files changed

+491
-211
lines changed

ydb/core/client/server/msgbus_server_pq_metacache.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/public/lib/deprecated/kicli/kicli.h>
66

77
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
8+
#include <ydb/library/wilson_ids/wilson.h>
89
#include <ydb/core/tx/tx_proxy/proxy.h>
910
#include <ydb/core/tx/schemeshard/schemeshard.h>
1011
#include <ydb/core/tx/scheme_board/cache.h>
@@ -357,16 +358,18 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
357358
bool FirstRequestDone = false;
358359
bool SyncVersion;
359360
bool ShowPrivate;
361+
NWilson::TSpan Span;
360362

361363
TWaiter(const TActorId& waiterId, const TString& dbRoot, bool syncVersion, bool showPrivate,
362-
const TVector<NPersQueue::TDiscoveryConverterPtr>& topics, EWaiterType type)
364+
const TVector<NPersQueue::TDiscoveryConverterPtr>& topics, EWaiterType type, NWilson::TSpan span = {})
363365

364366
: WaiterId(waiterId)
365367
, DbRoot(dbRoot)
366368
, Topics(topics)
367369
, Type(type)
368370
, SyncVersion(syncVersion)
369371
, ShowPrivate(showPrivate)
372+
, Span(std::move(span))
370373
{}
371374

372375
bool ApplyResult(std::shared_ptr<TSchemeCacheNavigate>& result) {
@@ -457,7 +460,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
457460
}
458461
SendSchemeCacheRequest(
459462
std::make_shared<TWaiter>(ev->Sender, DbRoot, msg.SyncVersion, msg.ShowPrivate, ev->Get()->Topics,
460-
EWaiterType::DescribeCustomTopics),
463+
EWaiterType::DescribeCustomTopics, NWilson::TSpan(TWilsonTopic::TopicDetailed, NWilson::TTraceId(ev->TraceId), "Topic.SchemeCacheRequest", NWilson::EFlags::AUTO_END)),
461464
ctx
462465
);
463466
}
@@ -532,7 +535,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
532535

533536
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "send request for " << (waiter->Type == EWaiterType::DescribeAllTopics ? " all " : "") << waiter->GetTopics().size() << " topics, got " << DescribeTopicsWaiters.size() << " requests infly");
534537

535-
ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release()));
538+
ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release()), 0, 0, waiter->Span.GetTraceId());
536539
}
537540

538541
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {

ydb/core/grpc_services/base/base.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,7 @@ class TGRpcRequestBiStreamWrapper
906906
void SetDiskQuotaExceeded(bool) override {
907907
}
908908

909-
void RefreshToken(const TString& token, const TActorContext& ctx, TActorId id);
909+
void RefreshToken(const TString& token, const TActorContext& ctx, TActorId id, NWilson::TTraceId traceId = {});
910910

911911
void SetRespHook(TRespHook&&) override {
912912
/* cannot add hook to bidirect streaming */

ydb/core/grpc_services/grpc_request_proxy.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ class TGRpcRequestProxyImpl
7777
void HandleSchemeBoard(TSchemeBoardEvents::TEvNotifyDelete::TPtr& ev);
7878
void ReplayEvents(const TString& databaseName, const TActorContext& ctx);
7979

80-
void MaybeStartTracing(IRequestProxyCtx& ctx);
80+
template<class TEvent>
81+
void MaybeStartTracing(TAutoPtr<TEventHandle<TEvent>>& event);
8182

8283
static bool IsAuthStateOK(const IRequestProxyCtx& ctx);
8384

@@ -153,7 +154,7 @@ class TGRpcRequestProxyImpl
153154
return;
154155
}
155156

156-
MaybeStartTracing(*requestBaseCtx);
157+
MaybeStartTracing(event);
157158

158159
if (IsAuthStateOK(*requestBaseCtx)) {
159160
Handle(event, ctx);
@@ -436,7 +437,9 @@ bool TGRpcRequestProxyImpl::IsAuthStateOK(const IRequestProxyCtx& ctx) {
436437
return false;
437438
}
438439

439-
void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) {
440+
template<class TEvent>
441+
void TGRpcRequestProxyImpl::MaybeStartTracing(TAutoPtr<TEventHandle<TEvent>>& event) {
442+
IRequestProxyCtx& ctx = *event->Get();
440443
auto isTracingDecided = ctx.IsTracingDecided();
441444
if (!isTracingDecided) {
442445
return;
@@ -445,8 +448,12 @@ void TGRpcRequestProxyImpl::MaybeStartTracing(IRequestProxyCtx& ctx) {
445448
return;
446449
}
447450

448-
TMaybe<TString> traceparentHeader = ctx.GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER);
449-
NWilson::TTraceId traceId = NJaegerTracing::HandleTracing(ctx.GetRequestDiscriminator(), traceparentHeader);
451+
NWilson::TTraceId traceId = NWilson::TTraceId(event->TraceId); // Can be not empty in case of internal subrequests // In this case it is part of the big request
452+
if (!traceId) {
453+
TMaybe<TString> traceparentHeader = ctx.GetPeerMetaValues(NYdb::OTEL_TRACE_HEADER);
454+
traceId = NJaegerTracing::HandleTracing(ctx.GetRequestDiscriminator(), traceparentHeader);
455+
}
456+
450457
if (traceId) {
451458
NWilson::TSpan grpcRequestProxySpan(TWilsonGrpc::RequestProxy, std::move(traceId), "GrpcRequestProxy");
452459
if (auto database = ctx.GetDatabaseName()) {

ydb/core/grpc_services/local_rate_limiter.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ TActorId RateLimiterAcquireUseSameMailbox(
1313
const TDuration& duration,
1414
std::function<void()>&& onSuccess,
1515
std::function<void()>&& onTimeout,
16-
const TActorContext& ctx)
16+
const TActorContext& ctx,
17+
NWilson::TTraceId traceId)
1718
{
1819
auto cb = [onSuccess{std::move(onSuccess)}, onTimeout{std::move(onTimeout)}]
1920
(const Ydb::RateLimiter::AcquireResourceResponse& resp)
@@ -43,18 +44,20 @@ TActorId RateLimiterAcquireUseSameMailbox(
4344
fullPath.DatabaseName,
4445
fullPath.Token,
4546
std::move(cb),
46-
ctx);
47+
ctx,
48+
std::move(traceId));
4749
}
4850

4951
TActorId RateLimiterAcquireUseSameMailbox(
5052
Ydb::RateLimiter::AcquireResourceRequest&& request,
5153
const TString& database,
5254
const TString& token,
5355
std::function<void(Ydb::RateLimiter::AcquireResourceResponse resp)>&& cb,
54-
const TActorContext& ctx)
56+
const TActorContext& ctx,
57+
NWilson::TTraceId traceId)
5558
{
5659
return DoLocalRpcSameMailbox<NKikimr::NGRpcService::TEvAcquireRateLimiterResource>(
57-
std::move(request), std::move(cb), database, token, ctx);
60+
std::move(request), std::move(cb), database, token, ctx, false, std::move(traceId));
5861
}
5962

6063
TActorId RateLimiterAcquireUseSameMailbox(
@@ -63,7 +66,8 @@ TActorId RateLimiterAcquireUseSameMailbox(
6366
const TDuration& duration,
6467
std::function<void()>&& onSuccess,
6568
std::function<void()>&& onTimeout,
66-
const TActorContext& ctx)
69+
const TActorContext& ctx,
70+
NWilson::TTraceId traceId)
6771
{
6872
if (const auto maybeRlPath = reqCtx.GetRlPath()) {
6973
auto cb = [onSuccess{std::move(onSuccess)}, onTimeout{std::move(onTimeout)}]
@@ -95,7 +99,8 @@ TActorId RateLimiterAcquireUseSameMailbox(
9599
reqCtx.GetDatabaseName().GetOrElse(""),
96100
reqCtx.GetSerializedToken(),
97101
std::move(cb),
98-
ctx);
102+
ctx,
103+
std::move(traceId));
99104
}
100105
return {};
101106
}

ydb/core/grpc_services/local_rate_limiter.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,23 @@ TActorId RateLimiterAcquireUseSameMailbox(
4141
const TDuration& duration,
4242
std::function<void()>&& onSuccess,
4343
std::function<void()>&& onTimeout,
44-
const TActorContext& ctx);
44+
const TActorContext& ctx,
45+
NWilson::TTraceId traceId = {});
4546

4647
TActorId RateLimiterAcquireUseSameMailbox(
4748
Ydb::RateLimiter::AcquireResourceRequest&& request,
4849
const TString& database,
4950
const TString& token,
50-
std::function<void(Ydb::RateLimiter::AcquireResourceResponse resp)>&& cb, const TActorContext &ctx);
51+
std::function<void(Ydb::RateLimiter::AcquireResourceResponse resp)>&& cb, const TActorContext& ctx,
52+
NWilson::TTraceId traceId = {});
5153

5254
TActorId RateLimiterAcquireUseSameMailbox(const NGRpcService::IRequestCtxBase& reqCtx,
5355
ui64 required,
5456
const TDuration& duration,
5557
std::function<void()>&& onSuccess,
5658
std::function<void()>&& onFail,
57-
const NActors::TActorContext &ctx);
59+
const NActors::TActorContext& ctx,
60+
NWilson::TTraceId traceId = {});
5861

5962
struct TRlConfig {
6063
struct TOnReqAction {

ydb/core/grpc_services/local_rpc/local_rpc.h

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
#include <ydb/core/base/appdata.h>
77

8+
#include <ydb/library/actors/wilson/wilson_span.h>
9+
#include <ydb/library/wilson_ids/wilson.h>
10+
811
#include <library/cpp/threading/future/future.h>
912

1013
namespace NKikimr {
@@ -48,9 +51,12 @@ class TLocalRpcCtxImpl<TRpc, TCbWrapper, false> : public NGRpcService::IRequestN
4851
using TBase = TLocalRpcCtxImplData<TRpc, TCbWrapper>;
4952

5053
template<typename TCb>
51-
TLocalRpcCtxImpl(TCb&& cb)
54+
TLocalRpcCtxImpl(TCb&& cb, NWilson::TTraceId = {})
5255
: TBase(std::forward<TCb>(cb))
5356
{}
57+
58+
protected:
59+
NWilson::TSpan Span;
5460
};
5561

5662
template<typename TRpc, typename TCbWrapper>
@@ -59,8 +65,9 @@ class TLocalRpcCtxImpl<TRpc, TCbWrapper, true> : public NGRpcService::IRequestOp
5965
using TBase = TLocalRpcCtxImplData<TRpc, TCbWrapper>;
6066

6167
template<typename TCb>
62-
TLocalRpcCtxImpl(TCb&& cb)
68+
TLocalRpcCtxImpl(TCb&& cb, NWilson::TTraceId traceId = {})
6369
: TBase(std::forward<TCb>(cb))
70+
, Span(TWilsonGrpc::RequestProxy, std::move(traceId), "LocalRpc")
6471
{}
6572

6673
public:
@@ -77,6 +84,7 @@ class TLocalRpcCtxImpl<TRpc, TCbWrapper, true> : public NGRpcService::IRequestOp
7784
NYql::IssuesToMessage(TBase::IssueManager.GetIssues(), deferred->mutable_issues());
7885
auto data = deferred->mutable_result();
7986
data->PackFrom(result);
87+
EndSpan(status);
8088
TBase::CbWrapper(resp);
8189
}
8290

@@ -93,14 +101,25 @@ class TLocalRpcCtxImpl<TRpc, TCbWrapper, true> : public NGRpcService::IRequestOp
93101
}
94102
auto data = deferred->mutable_result();
95103
data->PackFrom(result);
104+
EndSpan(status);
96105
TBase::CbWrapper(resp);
97106
}
98107

99108
void SendOperation(const Ydb::Operations::Operation& operation) override {
100109
TResp resp;
101110
resp.mutable_operation()->CopyFrom(operation);
111+
EndSpan(operation.status());
102112
TBase::CbWrapper(resp);
103113
}
114+
115+
protected:
116+
void EndSpan(Ydb::StatusIds::StatusCode status) {
117+
Span.Attribute("status", static_cast<int>(status));
118+
Span.End();
119+
}
120+
121+
protected:
122+
NWilson::TSpan Span;
104123
};
105124

106125
template<typename TRpc, typename TCbWrapper, bool IsOperation = TRpc::IsOp>
@@ -116,8 +135,9 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
116135
const TString& databaseName,
117136
const TMaybe<TString>& token,
118137
const TMaybe<TString>& requestType,
119-
bool internalCall)
120-
: TBase(std::forward<TCb>(cb))
138+
bool internalCall,
139+
NWilson::TTraceId traceId = {})
140+
: TBase(std::forward<TCb>(cb), std::move(traceId))
121141
, Request(std::forward<TProto>(req))
122142
, DatabaseName(databaseName)
123143
, RequestType(requestType)
@@ -126,6 +146,11 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
126146
if (token && !token->empty()) {
127147
InternalToken = new NACLib::TUserToken(*token);
128148
}
149+
150+
if (DatabaseName) {
151+
this->Span.Attribute("database", DatabaseName);
152+
}
153+
this->Span.Attribute("request_type", GetRequestName());
129154
}
130155

131156
bool HasClientCapability(const TString&) const override {
@@ -226,7 +251,7 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
226251
}
227252

228253
NWilson::TTraceId GetWilsonTraceId() const override {
229-
return {};
254+
return this->Span.GetTraceId();
230255
}
231256

232257
TInstant GetDeadline() const override {
@@ -301,14 +326,14 @@ void SetRequestSyncOperationMode(TRequest&) {
301326
template<typename TRpc>
302327
NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(typename TRpc::TRequest&& proto, const TString& database,
303328
const TMaybe<TString>& token, const TMaybe<TString>& requestType,
304-
TActorSystem* actorSystem, bool internalCall = false)
329+
TActorSystem* actorSystem, bool internalCall = false, NWilson::TTraceId traceId = {})
305330
{
306331
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();
307332

308333
SetRequestSyncOperationMode(proto);
309334

310335
using TCbWrapper = TPromiseWrapper<typename TRpc::TResponse>;
311-
auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token, requestType, internalCall);
336+
auto req = new TLocalRpcCtx<TRpc, TCbWrapper>(std::move(proto), TCbWrapper(promise), database, token, requestType, internalCall, std::move(traceId));
312337
auto actor = TRpc::CreateRpcActor(req);
313338
actorSystem->Register(actor, TMailboxType::HTSwap, actorSystem->AppData<TAppData>()->UserPoolId);
314339

@@ -328,7 +353,8 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(
328353
const TMaybe<TString>& requestType,
329354
TActorSystem* actorSystem,
330355
const TMap<TString, TString>& peerMeta,
331-
bool internalCall = false
356+
bool internalCall = false,
357+
NWilson::TTraceId traceId = {}
332358
)
333359
{
334360
auto promise = NThreading::NewPromise<typename TRpc::TResponse>();
@@ -342,7 +368,8 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(
342368
database,
343369
token,
344370
requestType,
345-
internalCall
371+
internalCall,
372+
std::move(traceId)
346373
);
347374

348375
for (const auto& [key, value] : peerMeta) {
@@ -358,18 +385,18 @@ NThreading::TFuture<typename TRpc::TResponse> DoLocalRpc(
358385
template<typename TRpc>
359386
TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb,
360387
const TString& database, const TMaybe<TString>& token, const TMaybe<TString>& requestType,
361-
const TActorContext& ctx, bool internalCall = false)
388+
const TActorContext& ctx, bool internalCall = false, NWilson::TTraceId traceId = {})
362389
{
363390
SetRequestSyncOperationMode(proto);
364391

365-
auto req = new TLocalRpcCtx<TRpc, std::function<void(typename TRpc::TResponse)>>(std::move(proto), std::move(cb), database, token, requestType, internalCall);
392+
auto req = new TLocalRpcCtx<TRpc, std::function<void(typename TRpc::TResponse)>>(std::move(proto), std::move(cb), database, token, requestType, internalCall, std::move(traceId));
366393
auto actor = TRpc::CreateRpcActor(req);
367394
return ctx.RegisterWithSameMailbox(actor);
368395
}
369396

370397
template<typename TRpc>
371-
TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb, const TString& database, const TMaybe<TString>& token, const TActorContext& ctx, bool internalCall = false) {
372-
return DoLocalRpcSameMailbox<TRpc>(std::move(proto), std::move(cb), database, token, Nothing(), ctx, internalCall);
398+
TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function<void(typename TRpc::TResponse)>&& cb, const TString& database, const TMaybe<TString>& token, const TActorContext& ctx, bool internalCall = false, NWilson::TTraceId traceId = {}) {
399+
return DoLocalRpcSameMailbox<TRpc>(std::move(proto), std::move(cb), database, token, Nothing(), ctx, internalCall, std::move(traceId));
373400
}
374401

375402
//// Streaming part

ydb/core/grpc_services/rpc_calls.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ std::pair<TString, TString> SplitPath(const TString& path) {
7575
return {path.substr(0, splitPos), path.substr(splitPos + 1)};
7676
}
7777

78-
void RefreshTokenSendRequest(const TActorContext& ctx, IEventBase* refreshTokenRequest) {
79-
ctx.Send(CreateGRpcRequestProxyId(), refreshTokenRequest);
78+
void RefreshTokenSendRequest(const TActorContext& ctx, IEventBase* refreshTokenRequest, NWilson::TTraceId traceId) {
79+
ctx.Send(CreateGRpcRequestProxyId(), refreshTokenRequest, 0, 0, std::move(traceId));
8080
}
8181

8282
void RefreshTokenReplyUnauthenticated(TActorId recipient, TActorId sender, NYql::TIssues&& issues) {

ydb/core/grpc_services/rpc_calls.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,15 @@ struct TRefreshTokenTypeForRequest<TEvStreamTopicWriteRequest> {
117117
// RefreshToken Send/Reply interface hides lowlevel details.
118118
// Used to avoid unwanted compile time dependencies.
119119
//
120-
void RefreshTokenSendRequest(const TActorContext& ctx, IEventBase* refreshTokenRequest);
120+
void RefreshTokenSendRequest(const TActorContext& ctx, IEventBase* refreshTokenRequest, NWilson::TTraceId traceId);
121121
void RefreshTokenReplyUnauthenticated(TActorId recipient, TActorId sender, NYql::TIssues&& issues);
122122
void RefreshTokenReplyUnavailable(TActorId recipient, NYql::TIssues&& issues);
123123

124124
template <ui32 TRpcId, typename TReq, typename TResp>
125-
void TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp>::RefreshToken(const TString& token, const TActorContext& ctx, TActorId id) {
125+
void TGRpcRequestBiStreamWrapper<TRpcId, TReq, TResp>::RefreshToken(const TString& token, const TActorContext& ctx, TActorId id, NWilson::TTraceId traceId) {
126126
using TSelf = typename std::remove_pointer<decltype(this)>::type;
127127
using TRefreshToken = typename TRefreshTokenTypeForRequest<TSelf>::type;
128-
RefreshTokenSendRequest(ctx, new TRefreshToken(token, GetDatabaseName().GetOrElse(""), id));
128+
RefreshTokenSendRequest(ctx, new TRefreshToken(token, GetDatabaseName().GetOrElse(""), id), std::move(traceId));
129129
}
130130

131131
template <ui32 TRpcId>

0 commit comments

Comments
 (0)