Skip to content

Commit 9c8c951

Browse files
logging(ydb): add data integrity logs to grpc and session actor (#6049)
1 parent 3f33b88 commit 9c8c951

19 files changed

+630
-19
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#pragma once
2+
3+
namespace NKikimr {
4+
namespace NDataIntegrity {
5+
6+
inline void LogKeyValue(const TString& key, const TString& value, TStringStream& ss, bool last = false) {
7+
ss << key << ": " << (value.Empty() ? "Empty" : value) << (last ? "" : ",");
8+
}
9+
10+
template <class TransactionSettings>
11+
inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream& ss) {
12+
switch (txSettings.tx_mode_case()) {
13+
case TransactionSettings::kSerializableReadWrite:
14+
LogKeyValue("TxMode", "SerializableReadWrite", ss);
15+
break;
16+
case TransactionSettings::kOnlineReadOnly:
17+
LogKeyValue("TxMode", "OnlineReadOnly", ss);
18+
LogKeyValue("AllowInconsistentReads", txSettings.online_read_only().allow_inconsistent_reads() ? "true" : "false", ss);
19+
break;
20+
case TransactionSettings::kStaleReadOnly:
21+
LogKeyValue("TxMode", "StaleReadOnly", ss);
22+
break;
23+
case TransactionSettings::kSnapshotReadOnly:
24+
LogKeyValue("TxMode", "SnapshotReadOnly", ss);
25+
break;
26+
case TransactionSettings::TX_MODE_NOT_SET:
27+
LogKeyValue("TxMode", "Undefined", ss);
28+
break;
29+
}
30+
}
31+
32+
template <class TxControl>
33+
inline void LogTxControl(const TxControl& txControl, TStringStream& ss)
34+
{
35+
switch (txControl.tx_selector_case()) {
36+
case TxControl::kTxId:
37+
LogKeyValue("TxId", txControl.tx_id(), ss);
38+
break;
39+
case TxControl::kBeginTx:
40+
LogKeyValue("BeginTx", "true", ss);
41+
LogTxSettings(txControl.begin_tx(), ss);
42+
break;
43+
case TxControl::TX_SELECTOR_NOT_SET:
44+
break;
45+
}
46+
47+
LogKeyValue("NeedCommitTx", txControl.commit_tx() ? "true" : "false", ss);
48+
}
49+
50+
}
51+
}
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
#pragma once
2+
3+
#include <ydb/public/api/protos/ydb_table.pb.h>
4+
#include <ydb/public/api/protos/ydb_scripting.pb.h>
5+
#include <ydb/public/api/protos/ydb_query.pb.h>
6+
#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
7+
#include <ydb/core/kqp/common/events/events.h>
8+
9+
namespace NKikimr {
10+
namespace NDataIntegrity {
11+
12+
// ExecuteDataQuery
13+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, const TActorContext& ctx) {
14+
auto log = [](const auto& traceId, const auto& request) {
15+
TStringStream ss;
16+
LogKeyValue("Component", "Grpc", ss);
17+
LogKeyValue("SessionId", request.session_id(), ss);
18+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
19+
LogTxControl(request.tx_control(), ss);
20+
LogKeyValue("Type", "ExecuteDataQueryRequest", ss, /*last*/ true);
21+
return ss.Str();
22+
};
23+
24+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
25+
}
26+
27+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
28+
auto log = [](const auto& traceId, const auto& request, const auto& response) {
29+
auto& record = response->Get()->Record.GetRef();
30+
31+
TStringStream ss;
32+
LogKeyValue("Component", "Grpc", ss);
33+
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
34+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
35+
LogKeyValue("Type", "ExecuteDataQueryResponse", ss);
36+
37+
if (request.tx_control().tx_selector_case() == Ydb::Table::TransactionControl::kBeginTx) {
38+
LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
39+
}
40+
41+
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
42+
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
43+
return ss.Str();
44+
};
45+
46+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
47+
}
48+
49+
// BeginTransaction
50+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::BeginTransactionRequest& request, const TActorContext& ctx) {
51+
auto log = [](const auto& traceId, const auto& request) {
52+
TStringStream ss;
53+
LogKeyValue("Component", "Grpc", ss);
54+
LogKeyValue("SessionId", request.session_id(), ss);
55+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
56+
LogTxSettings(request.tx_settings(), ss);
57+
LogKeyValue("Type", "BeginTransactionRequest", ss, /*last*/ true);
58+
return ss.Str();
59+
};
60+
61+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
62+
}
63+
64+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::BeginTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
65+
Y_UNUSED(request);
66+
67+
auto log = [](const auto& traceId, const auto& response) {
68+
auto& record = response->Get()->Record.GetRef();
69+
70+
TStringStream ss;
71+
LogKeyValue("Component", "Grpc", ss);
72+
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
73+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
74+
LogKeyValue("Type", "BeginTransactionResponse", ss);
75+
LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
76+
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
77+
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
78+
return ss.Str();
79+
};
80+
81+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
82+
}
83+
84+
// CommitTransaction
85+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::CommitTransactionRequest& request, const TActorContext& ctx) {
86+
auto log = [](const auto& traceId, const auto& request) {
87+
TStringStream ss;
88+
LogKeyValue("Component", "Grpc", ss);
89+
LogKeyValue("SessionId", request.session_id(), ss);
90+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
91+
LogKeyValue("Type", "CommitTransactionRequest", ss);
92+
LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true);
93+
return ss.Str();
94+
};
95+
96+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
97+
}
98+
99+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::CommitTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
100+
auto log = [](const auto& traceId, const auto& request, const auto& response) {
101+
const auto& record = response->Get()->Record.GetRef();
102+
103+
TStringStream ss;
104+
LogKeyValue("Component", "Grpc", ss);
105+
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
106+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
107+
LogKeyValue("Type", "CommitTransactionResponse", ss);
108+
LogKeyValue("TxId", request.tx_id(), ss);
109+
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
110+
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
111+
return ss.Str();
112+
};
113+
114+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
115+
}
116+
117+
// RollbackTransaction
118+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::RollbackTransactionRequest& request, const TActorContext& ctx) {
119+
auto log = [](const auto& traceId, const auto& request) {
120+
TStringStream ss;
121+
LogKeyValue("Component", "Grpc", ss);
122+
LogKeyValue("SessionId", request.session_id(), ss);
123+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
124+
LogKeyValue("Type", "RollbackTransactionRequest", ss);
125+
LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true);
126+
return ss.Str();
127+
};
128+
129+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
130+
}
131+
132+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::RollbackTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
133+
auto log = [](const auto& traceId, const auto& request, const auto& response) {
134+
const auto& record = response->Get()->Record.GetRef();
135+
136+
TStringStream ss;
137+
LogKeyValue("Component", "Grpc", ss);
138+
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
139+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
140+
LogKeyValue("Type", "RollbackTransactionResponse", ss);
141+
LogKeyValue("TxId", request.tx_id(), ss);
142+
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
143+
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
144+
return ss.Str();
145+
};
146+
147+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
148+
}
149+
150+
// ExecuteYqlScript/StreamExecuteYqlScript
151+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, const TActorContext& ctx) {
152+
Y_UNUSED(request);
153+
154+
auto log = [](const auto& traceId) {
155+
TStringStream ss;
156+
LogKeyValue("Component", "Grpc", ss);
157+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
158+
LogKeyValue("Type", "[Stream]ExecuteYqlScriptRequest", ss, /*last*/ true);
159+
return ss.Str();
160+
};
161+
162+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId));
163+
}
164+
165+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
166+
Y_UNUSED(request);
167+
168+
auto log = [](const auto& traceId, const auto& response) {
169+
const auto& record = response->Get()->Record.GetRef();
170+
171+
TStringStream ss;
172+
LogKeyValue("Component", "Grpc", ss);
173+
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
174+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
175+
LogKeyValue("Type", "[Stream]ExecuteYqlScriptResponse", ss);
176+
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
177+
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
178+
return ss.Str();
179+
};
180+
181+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
182+
}
183+
184+
// ExecuteQuery
185+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteQueryRequest& request, const TActorContext& ctx) {
186+
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
187+
return;
188+
}
189+
190+
auto log = [](const auto& traceId, const auto& request) {
191+
TStringStream ss;
192+
LogKeyValue("Component", "Grpc", ss);
193+
LogKeyValue("SessionId", request.session_id(), ss);
194+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
195+
LogTxControl(request.tx_control(), ss);
196+
LogKeyValue("Type", "ExecuteQueryRequest", ss, /*last*/ true);
197+
return ss.Str();
198+
};
199+
200+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
201+
}
202+
203+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
204+
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
205+
return;
206+
}
207+
208+
auto log = [](const auto& traceId, const auto& request, const auto& response) {
209+
const auto& record = response->Get()->Record.GetRef();
210+
211+
TStringStream ss;
212+
LogKeyValue("Component", "Grpc", ss);
213+
LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
214+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
215+
LogKeyValue("Type", "ExecuteQueryResponse", ss);
216+
217+
if (request.tx_control().tx_selector_case() == Ydb::Query::TransactionControl::kBeginTx) {
218+
LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
219+
}
220+
221+
LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
222+
LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
223+
return ss.Str();
224+
};
225+
226+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
227+
}
228+
229+
// ExecuteSrcipt
230+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteScriptRequest& request, const TActorContext& ctx) {
231+
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
232+
return;
233+
}
234+
235+
auto log = [](const auto& traceId) {
236+
TStringStream ss;
237+
LogKeyValue("Component", "Grpc", ss);
238+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
239+
LogKeyValue("Type", "ExecuteSrciptRequest", ss, /*last*/ true);
240+
return ss.Str();
241+
};
242+
243+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId));
244+
}
245+
246+
inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteScriptRequest& request, const NKqp::TEvKqp::TEvScriptResponse::TPtr& response, const TActorContext& ctx) {
247+
if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
248+
return;
249+
}
250+
251+
auto log = [](const auto& traceId, const auto& response) {
252+
TStringStream ss;
253+
LogKeyValue("Component", "Grpc", ss);
254+
LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
255+
LogKeyValue("Type", "ExecuteSrciptResponse", ss);
256+
LogKeyValue("Status", ToString(response->Get()->Status), ss);
257+
LogKeyValue("Issues", ToString(response->Get()->Issues), ss, /*last*/ true);
258+
return ss.Str();
259+
};
260+
261+
LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
262+
}
263+
264+
}
265+
}

ydb/core/grpc_services/query/rpc_execute_query.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/grpc_services/audit_dml_operations.h>
66
#include <ydb/core/grpc_services/base/base.h>
77
#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
8+
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
89
#include <ydb/core/grpc_services/rpc_kqp_base.h>
910
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
1011
#include <ydb/library/ydb_issue/issue_helpers.h>
@@ -264,6 +265,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
264265
}
265266

266267
AuditContextAppend(Request_.get(), *req);
268+
NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
267269

268270
auto queryType = req->concurrent_result_sets()
269271
? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
@@ -384,7 +386,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
384386
ctx.Send(channel.ActorId, resp.Release());
385387
}
386388

387-
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
389+
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
390+
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
391+
388392
auto& record = ev->Get()->Record.GetRef();
389393

390394
const auto& issueMessage = record.GetResponse().GetQueryIssues();

ydb/core/grpc_services/query/rpc_execute_script.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/grpc_services/base/base.h>
66
#include <ydb/core/grpc_services/rpc_kqp_base.h>
77
#include <ydb/core/grpc_services/audit_dml_operations.h>
8+
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
89
#include <ydb/core/kqp/common/kqp.h>
910
#include <ydb/public/api/protos/ydb_query.pb.h>
1011
#include <ydb/public/lib/operation_id/operation_id.h>
@@ -91,6 +92,7 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
9192
}
9293

9394
AuditContextAppend(Request_.get(), request);
95+
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext());
9496

9597
Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
9698
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
@@ -107,10 +109,12 @@ class TExecuteScriptRPC : public TActorBootstrapped<TExecuteScriptRPC> {
107109

108110
private:
109111
STRICT_STFUNC(StateFunc,
110-
hFunc(NKqp::TEvKqp::TEvScriptResponse, Handle)
112+
HFunc(NKqp::TEvKqp::TEvScriptResponse, Handle)
111113
)
112114

113-
void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
115+
void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) {
116+
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
117+
114118
Ydb::Operations::Operation operation;
115119
operation.set_id(ev->Get()->OperationId);
116120
Ydb::Query::ExecuteScriptMetadata metadata;

ydb/core/grpc_services/rpc_begin_transaction.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "service_table.h"
22
#include <ydb/core/grpc_services/base/base.h>
3+
#include <ydb/core/grpc_services/grpc_integrity_trails.h>
34

45
#include "rpc_calls.h"
56
#include "rpc_kqp_base.h"
@@ -49,6 +50,7 @@ class TBeginTransactionRPC : public TRpcKqpRequestActor<TBeginTransactionRPC, TE
4950
const auto traceId = Request_->GetTraceId();
5051

5152
AuditContextAppend(Request_.get(), *req);
53+
NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
5254

5355
TString sessionId;
5456
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
@@ -91,6 +93,8 @@ class TBeginTransactionRPC : public TRpcKqpRequestActor<TBeginTransactionRPC, TE
9193
}
9294

9395
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
96+
NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);
97+
9498
const auto& record = ev->Get()->Record.GetRef();
9599
SetCost(record.GetConsumedRu());
96100
AddServerHintsIfAny(record);

0 commit comments

Comments
 (0)