Skip to content

Commit ad30ab5

Browse files
Get topic data handler for UI (#17691)
Co-authored-by: Alexey Efimov <xeno@prnwatch.com>
1 parent b3e6533 commit ad30ab5

24 files changed

+1195
-76
lines changed

ydb/core/http_proxy/ut/internal_counters.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@
611611
"folder_id": "folder4",
612612
"database": "/Root"
613613
},
614-
"value":238,
614+
"value":243,
615615
"kind":"RATE"
616616
},
617617
{

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1428,7 +1428,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
14281428
}
14291429

14301430
TReadAnswer answer(info.FormAnswer(
1431-
ctx, *ev->Get(), EndOffset, Partition, userInfo,
1431+
ctx, *ev->Get(), StartOffset, EndOffset, Partition, userInfo,
14321432
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
14331433
));
14341434
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;

ydb/core/persqueue/partition_read.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ ui64 GetFirstHeaderOffset(const TKey& key, const TString& blob)
384384
TReadAnswer TReadInfo::FormAnswer(
385385
const TActorContext& ctx,
386386
const TEvPQ::TEvBlobResponse& blobResponse,
387+
const ui64 startOffset,
387388
const ui64 endOffset,
388389
const TPartitionId& partition,
389390
TUserInfo* userInfo,
@@ -410,7 +411,9 @@ TReadAnswer TReadInfo::FormAnswer(
410411
res.SetErrorCode(NPersQueue::NErrorCode::OK);
411412
auto readResult = res.MutablePartitionResponse()->MutableCmdReadResult();
412413
readResult->SetWaitQuotaTimeMs(WaitQuotaTime.MilliSeconds());
414+
readResult->SetStartOffset(startOffset);
413415
readResult->SetMaxOffset(endOffset);
416+
readResult->SetEndOffset(endOffset);
414417
readResult->SetRealReadOffset(Offset);
415418
ui64 realReadOffset = Offset;
416419
readResult->SetReadFromTimestampMs(ReadTimestampMs);
@@ -474,6 +477,7 @@ TReadAnswer TReadInfo::FormAnswer(
474477
SizeEstimate = answerSize;
475478
readResult->SetSizeEstimate(SizeEstimate);
476479
readResult->SetLastOffset(LastOffset);
480+
readResult->SetStartOffset(startOffset);
477481
readResult->SetEndOffset(endOffset);
478482
return {answerSize, std::move(answer)};
479483
}
@@ -507,7 +511,7 @@ TReadAnswer TReadInfo::FormAnswer(
507511
continue;
508512

509513

510-
PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
514+
PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
511515
<< " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
512516

513517
for (size_t i = pos; i < batch.Blobs.size(); ++i) {
@@ -580,6 +584,7 @@ TReadAnswer TReadInfo::FormAnswer(
580584
SizeEstimate = answerSize;
581585
readResult->SetSizeEstimate(SizeEstimate);
582586
readResult->SetLastOffset(LastOffset);
587+
readResult->SetStartOffset(startOffset);
583588
readResult->SetEndOffset(endOffset);
584589

585590
return {answerSize, std::move(answer)};
@@ -590,7 +595,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
590595
if (!res)
591596
return;
592597
TReadAnswer answer(res->FormAnswer(
593-
ctx, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
598+
ctx, StartOffset, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
594599
));
595600
ctx.Send(Tablet, answer.Event.Release());
596601
PQ_LOG_D(" waiting read cookie " << ev->Get()->Cookie
@@ -1040,7 +1045,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10401045
PQ_LOG_D("Reading cookie " << cookie << ". All data is from uncompacted head.");
10411046

10421047
TReadAnswer answer = info.FormAnswer(
1043-
ctx, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
1048+
ctx, StartOffset, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
10441049
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
10451050
);
10461051
const auto* ev = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get());

ydb/core/persqueue/pq_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
183183
auto partResp = responseRecord.MutablePartitionResponse()->MutableCmdReadResult();
184184

185185
partResp->SetMaxOffset(readResult.GetMaxOffset());
186+
partResp->SetStartOffset(readResult.GetStartOffset());
187+
partResp->SetEndOffset(readResult.GetEndOffset());
186188
partResp->SetSizeLag(readResult.GetSizeLag());
187189
partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + readResult.GetWaitQuotaTimeMs());
188190

ydb/core/persqueue/subscriber.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ struct TReadInfo {
8181
TReadAnswer FormAnswer(
8282
const TActorContext& ctx,
8383
const TEvPQ::TEvBlobResponse& response,
84+
const ui64 startOffset,
8485
const ui64 endOffset,
8586
const TPartitionId& partition,
8687
TUserInfo* ui,
@@ -93,6 +94,7 @@ struct TReadInfo {
9394

9495
TReadAnswer FormAnswer(
9596
const TActorContext& ctx,
97+
const ui64 startOffset,
9698
const ui64 endOffset,
9799
const TPartitionId& partition,
98100
TUserInfo* ui,
@@ -103,7 +105,7 @@ struct TReadInfo {
103105
const bool isActive
104106
) {
105107
TEvPQ::TEvBlobResponse response(0, TVector<TRequestedBlob>());
106-
return FormAnswer(ctx, response, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
108+
return FormAnswer(ctx, response, startOffset, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
107109
}
108110
};
109111

ydb/core/protos/msgbus_pq.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ message TCmdReadResult {
417417
optional uint64 ReadFromTimestampMs = 12;
418418
optional uint64 SizeEstimate = 13;
419419
optional int64 LastOffset = 14;
420+
optional uint64 StartOffset = 17;
420421
optional uint64 EndOffset = 15;
421422
optional bool ReadingFinished = 16;
422423
}

ydb/core/viewer/json_handlers_viewer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "viewer_describe.h"
1313
#include "viewer_describe_topic.h"
1414
#include "viewer_feature_flags.h"
15+
#include "viewer_topic_data.h"
1516
#include "viewer_graph.h"
1617
#include "viewer_healthcheck.h"
1718
#include "viewer_hiveinfo.h"
@@ -189,6 +190,10 @@ void InitViewerTopicInfoJsonHandler(TJsonHandlers& handlers) {
189190
handlers.AddHandler("/viewer/topicinfo", new TJsonHandler<TJsonTopicInfo>(TJsonTopicInfo::GetSwagger()));
190191
}
191192

193+
void InitViewerTopicDataJsonHandler(TJsonHandlers& handlers) {
194+
handlers.AddHandler("/viewer/topic_data", new TJsonHandler<TTopicData>(TTopicData::GetSwagger()));
195+
}
196+
192197
void InitViewerPQConsumerInfoJsonHandler(TJsonHandlers& handlers) {
193198
handlers.AddHandler("/viewer/pqconsumerinfo", new TJsonHandler<TJsonPQConsumerInfo>(TJsonPQConsumerInfo::GetSwagger()));
194199
}
@@ -303,6 +308,7 @@ void InitViewerJsonHandlers(TJsonHandlers& jsonHandlers) {
303308
InitViewerConfigJsonHandler(jsonHandlers);
304309
InitViewerCountersJsonHandler(jsonHandlers);
305310
InitViewerTopicInfoJsonHandler(jsonHandlers);
311+
InitViewerTopicDataJsonHandler(jsonHandlers);
306312
InitViewerPQConsumerInfoJsonHandler(jsonHandlers);
307313
InitViewerTabletCountersJsonHandler(jsonHandlers);
308314
InitViewerStorageJsonHandler(jsonHandlers);

ydb/core/viewer/json_pipe_req.cpp

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -544,24 +544,29 @@ void TViewerPipeClient::RequestBSControllerPDiskUpdateStatus(const NKikimrBlobSt
544544
SendRequestToPipe(pipeClient, request.Release());
545545
}
546546

547-
void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) {
547+
THolder<NSchemeCache::TSchemeCacheNavigate> TViewerPipeClient::SchemeCacheNavigateRequestBuilder (
548+
NSchemeCache::TSchemeCacheNavigate::TEntry&& entry
549+
) {
548550
THolder<NSchemeCache::TSchemeCacheNavigate> request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
549-
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
550-
entry.Path = SplitPath(path);
551551
entry.RedirectRequired = false;
552552
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath;
553-
request->ResultSet.emplace_back(entry);
553+
request->ResultSet.emplace_back(std::move(entry));
554+
return request;
555+
}
556+
557+
void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) {
558+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
559+
entry.Path = SplitPath(path);
560+
561+
auto request = SchemeCacheNavigateRequestBuilder(std::move(entry));
554562
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
555563
}
556564

557565
void TViewerPipeClient::RequestSchemeCacheNavigate(const TPathId& pathId) {
558-
THolder<NSchemeCache::TSchemeCacheNavigate> request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
559566
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
560567
entry.TableId.PathId = pathId;
561568
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
562-
entry.RedirectRequired = false;
563-
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath;
564-
request->ResultSet.emplace_back(entry);
569+
auto request = SchemeCacheNavigateRequestBuilder(std::move(entry));
565570
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
566571
}
567572

@@ -594,6 +599,42 @@ TViewerPipeClient::TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResu
594599
return response;
595600
}
596601

602+
TViewerPipeClient::TRequestResponse<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>
603+
TViewerPipeClient::MakeRequestSchemeShardDescribe(TTabletId schemeShardId, const TString& path, const NKikimrSchemeOp::TDescribeOptions& options, ui64 cookie) {
604+
auto request = std::make_unique<NSchemeShard::TEvSchemeShard::TEvDescribeScheme>();
605+
request->Record.SetSchemeshardId(schemeShardId);
606+
request->Record.SetPath(path);
607+
request->Record.MutableOptions()->CopyFrom(options);
608+
auto pipe = ConnectTabletPipe(schemeShardId);
609+
auto response = MakeRequestToPipe<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(pipe, request.release(), cookie);
610+
if (response.Span) {
611+
response.Span.Attribute("path", path);
612+
}
613+
return response;
614+
}
615+
616+
TViewerPipeClient::TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> TViewerPipeClient::MakeRequestSchemeCacheNavigateWithToken(
617+
const TString& path, bool showPrivate, ui32 access, ui64 cookie
618+
) {
619+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
620+
entry.Path = SplitPath(path);
621+
entry.ShowPrivatePath = showPrivate;
622+
entry.Access = access;
623+
auto request = SchemeCacheNavigateRequestBuilder(std::move(entry));
624+
625+
if (!Event->Get()->UserToken.empty())
626+
request->UserToken = new NACLib::TUserToken(Event->Get()->UserToken);
627+
628+
auto response = MakeRequest<TEvTxProxySchemeCache::TEvNavigateKeySetResult>(
629+
MakeSchemeCacheID(),
630+
new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()), 0 /*flags*/, cookie
631+
);
632+
if (response.Span) {
633+
response.Span.Attribute("path", path);
634+
}
635+
return response;
636+
}
637+
597638
void TViewerPipeClient::RequestTxProxyDescribe(const TString& path) {
598639
THolder<TEvTxUserProxy::TEvNavigate> request(new TEvTxUserProxy::TEvNavigate());
599640
request->Record.MutableDescribePath()->SetPath(path);
@@ -776,6 +817,10 @@ TString TViewerPipeClient::GetHTTPBADREQUEST(TString contentType, TString respon
776817
return Viewer->GetHTTPBADREQUEST(GetRequest(), std::move(contentType), std::move(response));
777818
}
778819

820+
TString TViewerPipeClient::GetHTTPNOTFOUND(TString, TString) {
821+
return Viewer->GetHTTPNOTFOUND(GetRequest());
822+
}
823+
779824
TString TViewerPipeClient::GetHTTPINTERNALERROR(TString contentType, TString response) {
780825
return Viewer->GetHTTPINTERNALERROR(GetRequest(), std::move(contentType), std::move(response));
781826
}

ydb/core/viewer/json_pipe_req.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,18 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
266266
TRequestResponse<NSysView::TEvSysView::TEvGetPDisksResponse> RequestBSControllerPDisks();
267267
TRequestResponse<NSysView::TEvSysView::TEvGetStorageStatsResponse> RequestBSControllerStorageStats();
268268
void RequestBSControllerPDiskUpdateStatus(const NKikimrBlobStorage::TUpdateDriveStatus& driveStatus, bool force = false);
269+
270+
THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheNavigateRequestBuilder(NSchemeCache::TSchemeCacheNavigate::TEntry&& entry);
271+
269272
void RequestSchemeCacheNavigate(const TString& path);
270273
void RequestSchemeCacheNavigate(const TPathId& pathId);
274+
271275
TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(const TString& path, ui64 cookie = 0);
272276
TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(TPathId pathId, ui64 cookie = 0);
277+
TRequestResponse<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult> MakeRequestSchemeShardDescribe(TTabletId schemeShardId, const TString& path, const NKikimrSchemeOp::TDescribeOptions& options = {}, ui64 cookie = 0);
278+
TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigateWithToken(
279+
const TString& path, bool showPrivate, ui32 access, ui64 cookie = 0);
280+
273281
TRequestResponse<TEvViewer::TEvViewerResponse> MakeRequestViewer(TNodeId nodeId, TEvViewer::TEvViewerRequest* request, ui32 flags = 0);
274282
void RequestTxProxyDescribe(const TString& path);
275283
void RequestStateStorageEndpointsLookup(const TString& path);
@@ -316,6 +324,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
316324
TString GetHTTPOKJSON(const google::protobuf::Message& response, TInstant lastModified = {});
317325
TString GetHTTPGATEWAYTIMEOUT(TString contentType = {}, TString response = {});
318326
TString GetHTTPBADREQUEST(TString contentType = {}, TString response = {});
327+
TString GetHTTPNOTFOUND(TString contentType = {}, TString response = {});
319328
TString GetHTTPINTERNALERROR(TString contentType = {}, TString response = {});
320329
TString GetHTTPFORBIDDEN(TString contentType = {}, TString response = {});
321330
TString MakeForward(const std::vector<ui32>& nodes);

ydb/core/viewer/protos/viewer.proto

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,3 +830,29 @@ message TFeatureFlagsConfig {
830830
repeated TDatabase Databases = 2;
831831
}
832832

833+
message TTopicDataResponse {
834+
message TMessage {
835+
uint64 Offset = 1;
836+
uint64 CreateTimestamp = 2;
837+
uint64 WriteTimestamp = 3;
838+
uint64 TimestampDiff = 4;
839+
string Message = 5;
840+
uint32 StorageSize = 6;
841+
uint32 OriginalSize = 7;
842+
uint32 Codec = 8;
843+
string ProducerId = 9;
844+
uint64 SeqNo = 10;
845+
846+
message TMetadataItem {
847+
string Key = 1;
848+
string Value = 2;
849+
}
850+
repeated TMetadataItem MessageMetadata = 11;
851+
}
852+
853+
uint64 StartOffset = 1;
854+
uint64 EndOffset = 2;
855+
repeated TMessage Messages = 3;
856+
bool Truncated = 4;
857+
}
858+

0 commit comments

Comments
 (0)