Skip to content

Commit 85ef15c

Browse files
Get topic data handler (#15038)
1 parent e3fc279 commit 85ef15c

17 files changed

+859
-76
lines changed

ydb/core/http_proxy/ut/internal_counters.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@
611611
"folder_id": "folder4",
612612
"database": "/Root"
613613
},
614-
"value":238,
614+
"value":243,
615615
"kind":"RATE"
616616
},
617617
{

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1414,7 +1414,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
14141414
}
14151415

14161416
TReadAnswer answer(info.FormAnswer(
1417-
ctx, *ev->Get(), EndOffset, Partition, userInfo,
1417+
ctx, *ev->Get(), StartOffset, EndOffset, Partition, userInfo,
14181418
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
14191419
));
14201420
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;

ydb/core/persqueue/partition_read.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ ui64 GetFirstHeaderOffset(const TKey& key, const TString& blob)
372372
TReadAnswer TReadInfo::FormAnswer(
373373
const TActorContext& ctx,
374374
const TEvPQ::TEvBlobResponse& blobResponse,
375+
const ui64 startOffset,
375376
const ui64 endOffset,
376377
const TPartitionId& partition,
377378
TUserInfo* userInfo,
@@ -398,7 +399,9 @@ TReadAnswer TReadInfo::FormAnswer(
398399
res.SetErrorCode(NPersQueue::NErrorCode::OK);
399400
auto readResult = res.MutablePartitionResponse()->MutableCmdReadResult();
400401
readResult->SetWaitQuotaTimeMs(WaitQuotaTime.MilliSeconds());
402+
readResult->SetStartOffset(startOffset);
401403
readResult->SetMaxOffset(endOffset);
404+
readResult->SetEndOffset(endOffset);
402405
readResult->SetRealReadOffset(Offset);
403406
ui64 realReadOffset = Offset;
404407
readResult->SetReadFromTimestampMs(ReadTimestampMs);
@@ -462,6 +465,7 @@ TReadAnswer TReadInfo::FormAnswer(
462465
SizeEstimate = answerSize;
463466
readResult->SetSizeEstimate(SizeEstimate);
464467
readResult->SetLastOffset(LastOffset);
468+
readResult->SetStartOffset(startOffset);
465469
readResult->SetEndOffset(endOffset);
466470
return {answerSize, std::move(answer)};
467471
}
@@ -495,7 +499,7 @@ TReadAnswer TReadInfo::FormAnswer(
495499
continue;
496500

497501

498-
PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
502+
PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
499503
<< " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
500504

501505
for (size_t i = pos; i < batch.Blobs.size(); ++i) {
@@ -568,6 +572,7 @@ TReadAnswer TReadInfo::FormAnswer(
568572
SizeEstimate = answerSize;
569573
readResult->SetSizeEstimate(SizeEstimate);
570574
readResult->SetLastOffset(LastOffset);
575+
readResult->SetStartOffset(startOffset);
571576
readResult->SetEndOffset(endOffset);
572577

573578
return {answerSize, std::move(answer)};
@@ -578,7 +583,7 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
578583
if (!res)
579584
return;
580585
TReadAnswer answer(res->FormAnswer(
581-
ctx, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
586+
ctx, StartOffset, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
582587
));
583588
ctx.Send(Tablet, answer.Event.Release());
584589
PQ_LOG_D(" waiting read cookie " << ev->Get()->Cookie
@@ -1028,7 +1033,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10281033
PQ_LOG_D("Reading cookie " << cookie << ". All data is from uncompacted head.");
10291034

10301035
TReadAnswer answer = info.FormAnswer(
1031-
ctx, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
1036+
ctx, StartOffset, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
10321037
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
10331038
);
10341039
const auto* ev = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get());

ydb/core/persqueue/pq_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
183183
auto partResp = responseRecord.MutablePartitionResponse()->MutableCmdReadResult();
184184

185185
partResp->SetMaxOffset(readResult.GetMaxOffset());
186+
partResp->SetStartOffset(readResult.GetStartOffset());
187+
partResp->SetEndOffset(readResult.GetEndOffset());
186188
partResp->SetSizeLag(readResult.GetSizeLag());
187189
partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + readResult.GetWaitQuotaTimeMs());
188190

ydb/core/persqueue/subscriber.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ struct TReadInfo {
8181
TReadAnswer FormAnswer(
8282
const TActorContext& ctx,
8383
const TEvPQ::TEvBlobResponse& response,
84+
const ui64 startOffset,
8485
const ui64 endOffset,
8586
const TPartitionId& partition,
8687
TUserInfo* ui,
@@ -93,6 +94,7 @@ struct TReadInfo {
9394

9495
TReadAnswer FormAnswer(
9596
const TActorContext& ctx,
97+
const ui64 startOffset,
9698
const ui64 endOffset,
9799
const TPartitionId& partition,
98100
TUserInfo* ui,
@@ -103,7 +105,7 @@ struct TReadInfo {
103105
const bool isActive
104106
) {
105107
TEvPQ::TEvBlobResponse response(0, TVector<TRequestedBlob>());
106-
return FormAnswer(ctx, response, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
108+
return FormAnswer(ctx, response, startOffset, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
107109
}
108110
};
109111

ydb/core/protos/msgbus_pq.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ message TCmdReadResult {
417417
optional uint64 ReadFromTimestampMs = 12;
418418
optional uint64 SizeEstimate = 13;
419419
optional int64 LastOffset = 14;
420+
optional uint64 StartOffset = 17;
420421
optional uint64 EndOffset = 15;
421422
optional bool ReadingFinished = 16;
422423
}

ydb/core/viewer/json_handlers_viewer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "viewer_describe.h"
1313
#include "viewer_describe_topic.h"
1414
#include "viewer_feature_flags.h"
15+
#include "viewer_topic_data.h"
1516
#include "viewer_graph.h"
1617
#include "viewer_healthcheck.h"
1718
#include "viewer_hiveinfo.h"
@@ -189,6 +190,10 @@ void InitViewerTopicInfoJsonHandler(TJsonHandlers& handlers) {
189190
handlers.AddHandler("/viewer/topicinfo", new TJsonHandler<TJsonTopicInfo>(TJsonTopicInfo::GetSwagger()));
190191
}
191192

193+
void InitViewerTopicDataJsonHandler(TJsonHandlers& handlers) {
194+
handlers.AddHandler("/viewer/topic_data", new TJsonHandler<TTopicData>(TTopicData::GetSwagger()));
195+
}
196+
192197
void InitViewerPQConsumerInfoJsonHandler(TJsonHandlers& handlers) {
193198
handlers.AddHandler("/viewer/pqconsumerinfo", new TJsonHandler<TJsonPQConsumerInfo>(TJsonPQConsumerInfo::GetSwagger()));
194199
}
@@ -303,6 +308,7 @@ void InitViewerJsonHandlers(TJsonHandlers& jsonHandlers) {
303308
InitViewerConfigJsonHandler(jsonHandlers);
304309
InitViewerCountersJsonHandler(jsonHandlers);
305310
InitViewerTopicInfoJsonHandler(jsonHandlers);
311+
InitViewerTopicDataJsonHandler(jsonHandlers);
306312
InitViewerPQConsumerInfoJsonHandler(jsonHandlers);
307313
InitViewerTabletCountersJsonHandler(jsonHandlers);
308314
InitViewerStorageJsonHandler(jsonHandlers);

ydb/core/viewer/json_pipe_req.cpp

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -544,24 +544,48 @@ void TViewerPipeClient::RequestBSControllerPDiskUpdateStatus(const NKikimrBlobSt
544544
SendRequestToPipe(pipeClient, request.Release());
545545
}
546546

547-
void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) {
547+
THolder<NSchemeCache::TSchemeCacheNavigate> TViewerPipeClient::SchemeCacheNavigateRequestBuilder (
548+
std::function<void (NSchemeCache::TSchemeCacheNavigate::TEntry&)> fillRequestEntryFunc
549+
) {
548550
THolder<NSchemeCache::TSchemeCacheNavigate> request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
549551
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
550-
entry.Path = SplitPath(path);
551552
entry.RedirectRequired = false;
552553
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath;
554+
fillRequestEntryFunc(entry);
553555
request->ResultSet.emplace_back(entry);
556+
return request;
557+
}
558+
559+
void TViewerPipeClient::RequestSchemeCacheNavigateWithParams(const TString& path, ui32 access, bool showPrivate) {
560+
auto request = SchemeCacheNavigateRequestBuilder(
561+
[&](auto& entry) {
562+
entry.Path = SplitPath(path);
563+
entry.Access = access;
564+
entry.ShowPrivatePath = showPrivate;
565+
}
566+
);
567+
if (!Event->Get()->UserToken.empty())
568+
request->UserToken = new NACLib::TUserToken(Event->Get()->UserToken);
569+
570+
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
571+
}
572+
573+
void TViewerPipeClient::RequestSchemeCacheNavigate(const TString& path) {
574+
auto request = SchemeCacheNavigateRequestBuilder(
575+
[&](auto& entry) {
576+
entry.Path = SplitPath(path);
577+
}
578+
);
554579
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
555580
}
556581

557582
void TViewerPipeClient::RequestSchemeCacheNavigate(const TPathId& pathId) {
558-
THolder<NSchemeCache::TSchemeCacheNavigate> request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
559-
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
560-
entry.TableId.PathId = pathId;
561-
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
562-
entry.RedirectRequired = false;
563-
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath;
564-
request->ResultSet.emplace_back(entry);
583+
auto request = SchemeCacheNavigateRequestBuilder(
584+
[&](auto& entry) {
585+
entry.TableId.PathId = pathId;
586+
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
587+
}
588+
);
565589
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
566590
}
567591

@@ -776,6 +800,10 @@ TString TViewerPipeClient::GetHTTPBADREQUEST(TString contentType, TString respon
776800
return Viewer->GetHTTPBADREQUEST(GetRequest(), std::move(contentType), std::move(response));
777801
}
778802

803+
TString TViewerPipeClient::GetHTTPNOTFOUND(TString, TString) {
804+
return Viewer->GetHTTPNOTFOUND(GetRequest());
805+
}
806+
779807
TString TViewerPipeClient::GetHTTPINTERNALERROR(TString contentType, TString response) {
780808
return Viewer->GetHTTPINTERNALERROR(GetRequest(), std::move(contentType), std::move(response));
781809
}

ydb/core/viewer/json_pipe_req.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,14 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
266266
TRequestResponse<NSysView::TEvSysView::TEvGetPDisksResponse> RequestBSControllerPDisks();
267267
TRequestResponse<NSysView::TEvSysView::TEvGetStorageStatsResponse> RequestBSControllerStorageStats();
268268
void RequestBSControllerPDiskUpdateStatus(const NKikimrBlobStorage::TUpdateDriveStatus& driveStatus, bool force = false);
269+
270+
THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheNavigateRequestBuilder(
271+
std::function<void (NSchemeCache::TSchemeCacheNavigate::TEntry&)> fillRequestEntryFunc);
272+
269273
void RequestSchemeCacheNavigate(const TString& path);
270274
void RequestSchemeCacheNavigate(const TPathId& pathId);
275+
void RequestSchemeCacheNavigateWithParams(const TString& path, ui32 access = NACLib::DescribeSchema, bool showPrivate = false);
276+
271277
TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(const TString& path, ui64 cookie = 0);
272278
TRequestResponse<TEvTxProxySchemeCache::TEvNavigateKeySetResult> MakeRequestSchemeCacheNavigate(TPathId pathId, ui64 cookie = 0);
273279
TRequestResponse<TEvViewer::TEvViewerResponse> MakeRequestViewer(TNodeId nodeId, TEvViewer::TEvViewerRequest* request, ui32 flags = 0);
@@ -316,6 +322,7 @@ class TViewerPipeClient : public TActorBootstrapped<TViewerPipeClient> {
316322
TString GetHTTPOKJSON(const google::protobuf::Message& response, TInstant lastModified = {});
317323
TString GetHTTPGATEWAYTIMEOUT(TString contentType = {}, TString response = {});
318324
TString GetHTTPBADREQUEST(TString contentType = {}, TString response = {});
325+
TString GetHTTPNOTFOUND(TString contentType = {}, TString response = {});
319326
TString GetHTTPINTERNALERROR(TString contentType = {}, TString response = {});
320327
TString GetHTTPFORBIDDEN(TString contentType = {}, TString response = {});
321328
TString MakeForward(const std::vector<ui32>& nodes);

0 commit comments

Comments
 (0)