Skip to content

Commit 9c8e625

Browse files
authored
Implement a new tablet administration grpc service (#8400)
1 parent 3caa09f commit 9c8e625

36 files changed

+1280
-36
lines changed

ydb/core/driver_lib/run/run.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
#include <ydb/services/ydb/ydb_scripting.h>
123123
#include <ydb/services/ydb/ydb_table.h>
124124
#include <ydb/services/ydb/ydb_object_storage.h>
125+
#include <ydb/services/tablet/ydb_tablet.h>
125126

126127
#include <ydb/core/fq/libs/init/init.h>
127128

@@ -598,6 +599,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
598599
names["keyvalue"] = &hasKeyValue;
599600
TServiceCfg hasReplication = services.empty();
600601
names["replication"] = &hasReplication;
602+
TServiceCfg hasTabletService = services.empty();
603+
names["tablet_service"] = &hasTabletService;
601604

602605
std::unordered_set<TString> enabled;
603606
for (const auto& name : services) {
@@ -873,6 +876,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
873876
grpcRequestProxies[0], hasReplication.IsRlAllowed()));
874877
}
875878

879+
if (hasTabletService) {
880+
server.AddService(new NGRpcService::TGRpcYdbTabletService(ActorSystem.Get(), Counters, grpcRequestProxies,
881+
hasTabletService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
882+
}
883+
876884
if (ModuleFactories) {
877885
for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
878886
server.AddService(service);

ydb/core/driver_lib/run/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ PEERDIR(
165165
ydb/services/persqueue_v1
166166
ydb/services/rate_limiter
167167
ydb/services/replication
168+
ydb/services/tablet
168169
ydb/services/ydb
169170
)
170171

ydb/core/engine/minikql/flat_local_tx_minikql.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class TFlatLocalMiniKQL : public NTabletFlatExecutor::ITransaction {
106106
bool PrepareParams(TTransactionContext &txc, const TAppData *appData) {
107107
Y_UNUSED(txc);
108108
if (SourceProgram.Params.Binary) {
109-
SerializedMiniKQLParams = SourceProgram.Program.Binary;
109+
SerializedMiniKQLParams = SourceProgram.Params.Binary;
110110
return true;
111111
}
112112

ydb/core/grpc_services/audit_dml_operations.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/public/api/protos/ydb_table.pb.h>
44
#include <ydb/public/api/protos/ydb_scripting.pb.h>
55
#include <ydb/public/api/protos/ydb_query.pb.h>
6+
#include <ydb/public/api/protos/draft/ydb_tablet.pb.h>
67

78
#include "base/base.h"
89

@@ -196,4 +197,24 @@ void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteScriptRequest&
196197
}
197198
// log updated_row_count collected from ExecuteScriptMetadata.exec_stats?
198199

200+
// TabletService, ExecuteTabletMiniKQL
201+
template <>
202+
void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ExecuteTabletMiniKQLRequest& request) {
203+
if (request.dry_run()) {
204+
return;
205+
}
206+
ctx->AddAuditLogPart("tablet_id", TStringBuilder() << request.tablet_id());
207+
ctx->AddAuditLogPart("program_text", PrepareText(request.program()));
208+
}
209+
210+
// TabletService, ChangeTabletSchema
211+
template <>
212+
void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ChangeTabletSchemaRequest& request) {
213+
if (request.dry_run()) {
214+
return;
215+
}
216+
ctx->AddAuditLogPart("tablet_id", TStringBuilder() << request.tablet_id());
217+
ctx->AddAuditLogPart("schema_changes", PrepareText(request.schema_changes()));
218+
}
219+
199220
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/audit_dml_operations.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ class ExecuteScriptRequest;
2828

2929
}
3030

31+
namespace Ydb::Tablet {
32+
33+
class ExecuteTabletMiniKQLRequest;
34+
class ChangeTabletSchemaRequest;
35+
36+
}
37+
3138
namespace NKikimr::NGRpcService {
3239

3340
class IAuditCtx;
@@ -80,4 +87,8 @@ template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteQue
8087
// ExecuteSrcipt
8188
template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteScriptRequest& request);
8289

90+
// TabletService
91+
template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ExecuteTabletMiniKQLRequest& request);
92+
template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ChangeTabletSchemaRequest& request);
93+
8394
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/query/rpc_fetch_script_results.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, T
7171
return;
7272
}
7373

74-
Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), DatabaseName, ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), req->rows_limit() ? 0 : MAX_SIZE_LIMIT, Request->GetDeadline()));
74+
Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), GetDatabaseName(), ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), req->rows_limit() ? 0 : MAX_SIZE_LIMIT, Request->GetDeadline()));
7575

7676
Become(&TFetchScriptResultsRPC::StateFunc);
7777
}

ydb/core/grpc_services/rpc_backup.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class TBackupCollectionsRPC
4242

4343
auto ev = MakeHolder<typename NSchemeShard::TEvBackup::TEvApiMapping<TIn>::TEv>();
4444
ev->Record.SetTxId(this->TxId);
45-
ev->Record.SetDatabaseName(this->DatabaseName);
45+
ev->Record.SetDatabaseName(this->GetDatabaseName());
4646
if (this->UserToken) {
4747
ev->Record.SetUserSID(this->UserToken->GetUserSID());
4848
}

ydb/core/grpc_services/rpc_cancel_operation.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
4343
IEventBase* MakeRequest() override {
4444
switch (OperationId.GetKind()) {
4545
case TOperationId::EXPORT:
46-
return new TEvExport::TEvCancelExportRequest(TxId, DatabaseName, RawOperationId);
46+
return new TEvExport::TEvCancelExportRequest(TxId, GetDatabaseName(), RawOperationId);
4747
case TOperationId::IMPORT:
48-
return new TEvImport::TEvCancelImportRequest(TxId, DatabaseName, RawOperationId);
48+
return new TEvImport::TEvCancelImportRequest(TxId, GetDatabaseName(), RawOperationId);
4949
case TOperationId::BUILD_INDEX:
50-
return new TEvIndexBuilder::TEvCancelRequest(TxId, DatabaseName, RawOperationId);
50+
return new TEvIndexBuilder::TEvCancelRequest(TxId, GetDatabaseName(), RawOperationId);
5151
default:
5252
Y_ABORT("unreachable");
5353
}
@@ -141,7 +141,7 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
141141
}
142142

143143
void SendCancelScriptExecutionOperation() {
144-
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(DatabaseName, OperationId));
144+
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(GetDatabaseName(), OperationId));
145145
}
146146

147147
private:

ydb/core/grpc_services/rpc_export.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
3737

3838
auto ev = MakeHolder<TEvExport::TEvCreateExportRequest>();
3939
ev->Record.SetTxId(this->TxId);
40-
ev->Record.SetDatabaseName(this->DatabaseName);
40+
ev->Record.SetDatabaseName(this->GetDatabaseName());
4141
if (this->UserToken) {
4242
ev->Record.SetUserSID(this->UserToken->GetUserSID());
4343
}
@@ -64,7 +64,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
6464
TVector<TString> ExtractPaths() {
6565
TVector<TString> paths;
6666

67-
paths.emplace_back(this->DatabaseName); // first entry is database
67+
paths.emplace_back(this->GetDatabaseName()); // first entry is database
6868
ExtractPaths(paths, this->GetProtoRequest()->settings());
6969

7070
return paths;
@@ -74,7 +74,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
7474
Y_ABORT_UNLESS(!paths.empty());
7575

7676
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
77-
request->DatabaseName = this->DatabaseName;
77+
request->DatabaseName = this->GetDatabaseName();
7878

7979
for (const auto& path : paths) {
8080
auto& entry = request->ResultSet.emplace_back();

ydb/core/grpc_services/rpc_forget_operation.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
4444
IEventBase* MakeRequest() override {
4545
switch (OperationId.GetKind()) {
4646
case TOperationId::EXPORT:
47-
return new TEvExport::TEvForgetExportRequest(TxId, DatabaseName, RawOperationId);
47+
return new TEvExport::TEvForgetExportRequest(TxId, GetDatabaseName(), RawOperationId);
4848
case TOperationId::IMPORT:
49-
return new TEvImport::TEvForgetImportRequest(TxId, DatabaseName, RawOperationId);
49+
return new TEvImport::TEvForgetImportRequest(TxId, GetDatabaseName(), RawOperationId);
5050
case TOperationId::BUILD_INDEX:
51-
return new TEvIndexBuilder::TEvForgetRequest(TxId, DatabaseName, RawOperationId);
51+
return new TEvIndexBuilder::TEvForgetRequest(TxId, GetDatabaseName(), RawOperationId);
5252
default:
5353
Y_ABORT("unreachable");
5454
}
@@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
9797
}
9898

9999
void SendForgetScriptExecutionOperation() {
100-
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
100+
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(GetDatabaseName(), OperationId));
101101
}
102102

103103
public:

0 commit comments

Comments
 (0)