Skip to content

Commit e114110

Browse files
feat(data_integrity_trails): generate traceId in grpc (#9105)
1 parent c10bc68 commit e114110

File tree

6 files changed

+93
-17
lines changed

6 files changed

+93
-17
lines changed

ydb/core/grpc_services/base/base.h

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <ydb/core/grpc_streaming/grpc_streaming.h>
2727
#include <ydb/core/tx/scheme_board/events.h>
2828
#include <ydb/core/base/events.h>
29+
#include <ydb/core/util/ulid.h>
2930

3031
#include <ydb/library/actors/wilson/wilson_span.h>
3132

@@ -747,7 +748,12 @@ class TGRpcRequestBiStreamWrapper
747748
TGRpcRequestBiStreamWrapper(TIntrusivePtr<IStreamCtx> ctx, bool rlAllowed = true)
748749
: Ctx_(ctx)
749750
, RlAllowed_(rlAllowed)
750-
{ }
751+
, TraceId(GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER))
752+
{
753+
if (!TraceId) {
754+
TraceId = UlidGen.Next().ToString();
755+
}
756+
}
751757

752758
bool IsClientLost() const override {
753759
// TODO: Implement for BiDirectional streaming
@@ -848,7 +854,7 @@ class TGRpcRequestBiStreamWrapper
848854
}
849855

850856
TMaybe<TString> GetTraceId() const override {
851-
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
857+
return TraceId;
852858
}
853859

854860
NWilson::TTraceId GetWilsonTraceId() const override {
@@ -926,6 +932,8 @@ class TGRpcRequestBiStreamWrapper
926932
IGRpcProxyCounters::TPtr Counters_;
927933
NWilson::TSpan Span_;
928934
bool IsTracingDecided_ = false;
935+
TULIDGenerator UlidGen;
936+
TMaybe<TString> TraceId;
929937
};
930938

931939
template <typename TDerived>
@@ -1033,7 +1041,12 @@ class TGRpcRequestWrapperImpl
10331041

10341042
TGRpcRequestWrapperImpl(NYdbGrpc::IRequestContextBase* ctx)
10351043
: Ctx_(ctx)
1036-
{ }
1044+
, TraceId(GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER))
1045+
{
1046+
if (!TraceId) {
1047+
TraceId = UlidGen.Next().ToString();
1048+
}
1049+
}
10371050

10381051
const TMaybe<TString> GetYdbToken() const override {
10391052
return ExtractYdbToken(Ctx_->GetPeerMetaValues(NYdb::YDB_AUTH_TICKET_HEADER));
@@ -1165,7 +1178,7 @@ class TGRpcRequestWrapperImpl
11651178
}
11661179

11671180
TMaybe<TString> GetTraceId() const override {
1168-
return GetPeerMetaValues(NYdb::YDB_TRACE_ID_HEADER);
1181+
return TraceId;
11691182
}
11701183

11711184
NWilson::TTraceId GetWilsonTraceId() const override {
@@ -1368,6 +1381,8 @@ class TGRpcRequestWrapperImpl
13681381
TAuditLogHook AuditLogHook;
13691382
bool RequestFinished = false;
13701383
bool IsTracingDecided_ = false;
1384+
TULIDGenerator UlidGen;
1385+
TMaybe<TString> TraceId;
13711386
};
13721387

13731388
template <ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived>

ydb/core/kqp/gateway/kqp_gateway.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class IKqpGateway : public NYql::IKikimrGateway {
206206

207207
virtual NThreading::TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query,
208208
TQueryData::TPtr params, const TAstQuerySettings& settings,
209-
const Ydb::Table::TransactionSettings& txSettings) = 0;
209+
const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) = 0;
210210

211211
virtual NThreading::TFuture<TQueryResult> ExplainScanQueryAst(const TString& cluster, const TString& query) = 0;
212212

@@ -215,21 +215,21 @@ class IKqpGateway : public NYql::IKikimrGateway {
215215

216216
virtual NThreading::TFuture<TQueryResult> StreamExecDataQueryAst(const TString& cluster, const TString& query,
217217
TQueryData::TPtr, const TAstQuerySettings& settings,
218-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) = 0;
218+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
219219

220220
virtual NThreading::TFuture<TQueryResult> StreamExecScanQueryAst(const TString& cluster, const TString& query,
221221
TQueryData::TPtr, const TAstQuerySettings& settings, const NActors::TActorId& target,
222222
std::shared_ptr<NGRpcService::IRequestCtxMtSafe> rpcCtx) = 0;
223223

224224
virtual NThreading::TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query,
225225
TQueryData::TPtr params, const TAstQuerySettings& settings,
226-
const Ydb::Table::TransactionSettings& txSettings) = 0;
226+
const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) = 0;
227227

228228
virtual NThreading::TFuture<TQueryResult> ExplainGenericQuery(const TString& cluster, const TString& query) = 0;
229229

230230
virtual NThreading::TFuture<TQueryResult> StreamExecGenericQuery(const TString& cluster, const TString& query,
231231
TQueryData::TPtr params, const TAstQuerySettings& settings,
232-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) = 0;
232+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target, const TMaybe<TString>& traceId) = 0;
233233
};
234234

235235
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, NKikimrKqp::EQueryType queryType, const TString& database,

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,7 +1891,8 @@ class TKikimrIcGateway : public IKqpGateway {
18911891

18921892
TFuture<TQueryResult> StreamExecDataQueryAst(const TString& cluster, const TString& query,
18931893
TQueryData::TPtr params, const TAstQuerySettings& settings,
1894-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) override
1894+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target,
1895+
const TMaybe<TString>& traceId) override
18951896
{
18961897
YQL_ENSURE(cluster == Cluster);
18971898

@@ -1903,6 +1904,10 @@ class TKikimrIcGateway : public IKqpGateway {
19031904
ev->Record.SetUserToken(UserToken->GetSerializedToken());
19041905
}
19051906

1907+
if (traceId) {
1908+
ev->Record.SetTraceId(*traceId);
1909+
}
1910+
19061911
ev->Record.MutableRequest()->SetDatabase(Database);
19071912
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
19081913
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML);
@@ -1990,7 +1995,8 @@ class TKikimrIcGateway : public IKqpGateway {
19901995
}
19911996

19921997
TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
1993-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override
1998+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
1999+
const TMaybe<TString>& traceId) override
19942000
{
19952001
YQL_ENSURE(cluster == Cluster);
19962002

@@ -2002,6 +2008,10 @@ class TKikimrIcGateway : public IKqpGateway {
20022008
ev->Record.SetUserToken(UserToken->GetSerializedToken());
20032009
}
20042010

2011+
if (traceId) {
2012+
ev->Record.SetTraceId(*traceId);
2013+
}
2014+
20052015
ev->Record.MutableRequest()->SetDatabase(Database);
20062016
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
20072017
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_AST_DML);
@@ -2065,12 +2075,17 @@ class TKikimrIcGateway : public IKqpGateway {
20652075
}
20662076

20672077
TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
2068-
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings) override
2078+
const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
2079+
const TMaybe<TString>& traceId) override
20692080
{
20702081
YQL_ENSURE(cluster == Cluster);
20712082

20722083
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
20732084

2085+
if (traceId) {
2086+
ev->Record.SetTraceId(*traceId);
2087+
}
2088+
20742089
auto& request = *ev->Record.MutableRequest();
20752090
request.SetCollectStats(settings.CollectStats);
20762091

@@ -2091,7 +2106,8 @@ class TKikimrIcGateway : public IKqpGateway {
20912106

20922107
TFuture<TQueryResult> StreamExecGenericQuery(const TString& cluster, const TString& query,
20932108
TQueryData::TPtr params, const TAstQuerySettings& settings,
2094-
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target) override
2109+
const Ydb::Table::TransactionSettings& txSettings, const NActors::TActorId& target,
2110+
const TMaybe<TString>& traceId) override
20952111
{
20962112
YQL_ENSURE(cluster == Cluster);
20972113

@@ -2103,6 +2119,10 @@ class TKikimrIcGateway : public IKqpGateway {
21032119
ev->Record.SetUserToken(UserToken->GetSerializedToken());
21042120
}
21052121

2122+
if (traceId) {
2123+
ev->Record.SetTraceId(*traceId);
2124+
}
2125+
21062126
ev->Record.MutableRequest()->SetDatabase(Database);
21072127
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
21082128
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
811811
TKiDataQueryBlocks dataQueryBlocks(query);
812812

813813
auto queryAstStr = SerializeExpr(ctx, *query);
814+
TMaybe<TString> traceId = SessionCtx->GetUserRequestContext() ? SessionCtx->GetUserRequestContext()->TraceId : TMaybe<TString>{};
814815

815816
bool useGenericQuery = ShouldUseGenericQuery(dataQueryBlocks);
816817
bool useScanQuery = ShouldUseScanQuery(dataQueryBlocks, settings);
@@ -828,7 +829,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
828829
future = Gateway->ExplainGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText());
829830
} else {
830831
future = Gateway->ExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query),
831-
querySettings, txSettings);
832+
querySettings, txSettings, traceId);
832833
}
833834
} else if (useScanQuery) {
834835
ui64 rowsLimit = 0;
@@ -850,7 +851,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
850851
future = Gateway->ExplainDataQueryAst(Cluster, queryAstStr);
851852
} else {
852853
future = Gateway->ExecDataQueryAst(Cluster, queryAstStr, CollectParameters(query),
853-
querySettings, txSettings);
854+
querySettings, txSettings, traceId);
854855
}
855856
}
856857
break;
@@ -860,7 +861,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
860861
txSettings.mutable_serializable_read_write();
861862

862863
future = Gateway->StreamExecGenericQuery(Cluster, SessionCtx->Query().PreparingQuery->GetText(), CollectParameters(query),
863-
querySettings, txSettings, SessionCtx->Query().ReplyTarget);
864+
querySettings, txSettings, SessionCtx->Query().ReplyTarget, traceId);
864865
} else if (useScanQuery) {
865866
future = Gateway->StreamExecScanQueryAst(Cluster, queryAstStr, CollectParameters(query),
866867
querySettings, SessionCtx->Query().ReplyTarget, SessionCtx->Query().RpcCtx);
@@ -869,7 +870,7 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor {
869870
txSettings.mutable_serializable_read_write();
870871

871872
future = Gateway->StreamExecDataQueryAst(Cluster, queryAstStr, CollectParameters(query),
872-
querySettings, txSettings, SessionCtx->Query().ReplyTarget);
873+
querySettings, txSettings, SessionCtx->Query().ReplyTarget, traceId);
873874
}
874875
break;
875876

ydb/core/kqp/session_actor/kqp_worker_actor.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
194194
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
195195

196196
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
197-
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, nullptr);
197+
QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
198+
!Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext());
198199

199200
auto& queryRequest = QueryState->RequestEv;
200201
QueryState->ProxyRequestId = proxyRequestId;

ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,45 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
137137
// check datashard logs (should be empty, because DataShard only logs modification operations)
138138
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
139139
}
140+
141+
Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
142+
TKikimrSettings serverSettings;
143+
TStringStream ss;
144+
serverSettings.LogStream = &ss;
145+
TKikimrRunner kikimr(serverSettings);
146+
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
147+
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
148+
149+
150+
const auto query = R"(
151+
--!syntax_v1
152+
153+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
154+
(3u, "Value3"),
155+
(101u, "Value101"),
156+
(201u, "Value201");
157+
)";
158+
159+
if (Streaming) {
160+
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
161+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
162+
CollectStreamResult(result);
163+
} else {
164+
auto result = client.ExecuteYqlScript(query).GetValueSync();
165+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
166+
}
167+
168+
// check executer logs
169+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
170+
// check session actor logs (should contain double logs because this query was executed via worker actor)
171+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4);
172+
// check grpc logs
173+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
174+
// check datashard logs
175+
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
176+
177+
Cout << ss.Str() << Endl;
178+
}
140179
}
141180

142181
} // namespace NKqp

0 commit comments

Comments
 (0)