Skip to content

Commit d573ec3

Browse files
Duplication in the code of the partition actor (#17595)
1 parent 198b18c commit d573ec3

File tree

4 files changed

+54
-45
lines changed

4 files changed

+54
-45
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1446,6 +1446,44 @@ void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorCont
14461446
ctx.Send(Tablet, response.Release());
14471447
}
14481448

1449+
void TPartition::OnReadComplete(TReadInfo& info,
1450+
TUserInfo* userInfo,
1451+
const TEvPQ::TEvBlobResponse* blobResponse,
1452+
const TActorContext& ctx)
1453+
{
1454+
TReadAnswer answer = info.FormAnswer(
1455+
ctx, blobResponse, StartOffset, EndOffset, Partition, userInfo,
1456+
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
1457+
);
1458+
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
1459+
1460+
if (blobResponse && HasError(*blobResponse)) {
1461+
if (info.IsSubscription) {
1462+
TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_ERROR].Increment(1);
1463+
}
1464+
TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1);
1465+
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
1466+
} else {
1467+
if (info.IsSubscription) {
1468+
TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1);
1469+
}
1470+
1471+
if (blobResponse) {
1472+
TabletCounters.Cumulative()[COUNTER_PQ_READ_OK].Increment(1);
1473+
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_OK].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
1474+
} else {
1475+
TabletCounters.Cumulative()[COUNTER_PQ_READ_HEAD_ONLY_OK].Increment(1);
1476+
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
1477+
}
1478+
1479+
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp->ByteSize());
1480+
}
1481+
1482+
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
1483+
1484+
OnReadRequestFinished(info.Destination, answer.Size, info.User, ctx);
1485+
}
1486+
14491487
void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
14501488
const ui64 cookie = ev->Get()->GetCookie();
14511489
auto it = ReadInfo.find(cookie);
@@ -1465,28 +1503,7 @@ void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& c
14651503
OnReadRequestFinished(info.Destination, 0, info.User, ctx);
14661504
}
14671505

1468-
TReadAnswer answer(info.FormAnswer(
1469-
ctx, *ev->Get(), StartOffset, EndOffset, Partition, userInfo,
1470-
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
1471-
));
1472-
const auto& resp = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get())->Response;
1473-
1474-
if (HasError(*ev->Get())) {
1475-
if (info.IsSubscription) {
1476-
TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_ERROR].Increment(1);
1477-
}
1478-
TabletCounters.Cumulative()[COUNTER_PQ_READ_ERROR].Increment(1);
1479-
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_ERROR].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
1480-
} else {
1481-
if (info.IsSubscription) {
1482-
TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1);
1483-
}
1484-
TabletCounters.Cumulative()[COUNTER_PQ_READ_OK].Increment(1);
1485-
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_OK].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
1486-
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp->ByteSize());
1487-
}
1488-
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
1489-
OnReadRequestFinished(info.Destination, answer.Size, info.User, ctx);
1506+
OnReadComplete(info, userInfo, ev->Get(), ctx);
14901507
}
14911508

14921509
void TPartition::Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx) {

ydb/core/persqueue/partition.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,11 @@ class TPartition : public TActorBootstrapped<TPartition> {
10131013
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request);
10141014

10151015
TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key);
1016+
1017+
void OnReadComplete(TReadInfo& info,
1018+
TUserInfo* userInfo,
1019+
const TEvPQ::TEvBlobResponse* blobResponse,
1020+
const TActorContext& ctx);
10161021
};
10171022

10181023
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_read.cpp

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -592,9 +592,9 @@ void TPartition::Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ct
592592
auto res = Subscriber.OnTimeout(ev);
593593
if (!res)
594594
return;
595-
TReadAnswer answer(res->FormAnswer(
596-
ctx, StartOffset, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
597-
));
595+
TReadAnswer answer = res->FormAnswer(
596+
ctx, nullptr, StartOffset, res->Offset, Partition, nullptr, res->Destination, 0, Tablet, Config.GetMeteringMode(), IsActive()
597+
);
598598
ctx.Send(Tablet, answer.Event.Release());
599599
PQ_LOG_D(" waiting read cookie " << ev->Get()->Cookie
600600
<< " partition " << Partition << " read timeout for " << res->User << " offset " << res->Offset);
@@ -1041,24 +1041,7 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
10411041
if (info.Blobs.empty()) { //all from head, answer right now
10421042
PQ_LOG_D("Reading cookie " << cookie << ". All data is from uncompacted head.");
10431043

1044-
TReadAnswer answer = info.FormAnswer(
1045-
ctx, StartOffset, EndOffset, Partition, &UsersInfoStorage->GetOrCreate(info.User, ctx),
1046-
info.Destination, GetSizeLag(info.Offset), Tablet, Config.GetMeteringMode(), IsActive()
1047-
);
1048-
const auto* ev = dynamic_cast<TEvPQ::TEvProxyResponse*>(answer.Event.Get());
1049-
Y_ABORT_UNLESS(ev);
1050-
const auto& resp = ev->Response;
1051-
if (info.IsSubscription) {
1052-
TabletCounters.Cumulative()[COUNTER_PQ_READ_SUBSCRIPTION_OK].Increment(1);
1053-
}
1054-
TabletCounters.Cumulative()[COUNTER_PQ_READ_HEAD_ONLY_OK].Increment(1);
1055-
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_READ_HEAD_ONLY].IncrementFor((ctx.Now() - info.Timestamp).MilliSeconds());
1056-
1057-
TabletCounters.Cumulative()[COUNTER_PQ_READ_BYTES].Increment(resp->ByteSize());
1058-
1059-
ctx.Send(info.Destination != 0 ? Tablet : ctx.SelfID, answer.Event.Release());
1060-
1061-
OnReadRequestFinished(info.Destination, answer.Size, info.User, ctx);
1044+
OnReadComplete(info, &UsersInfoStorage->GetOrCreate(info.User, ctx), nullptr, ctx);
10621045
return;
10631046
}
10641047

ydb/core/persqueue/subscriber.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ struct TReadInfo {
9494

9595
TReadAnswer FormAnswer(
9696
const TActorContext& ctx,
97+
const TEvPQ::TEvBlobResponse* response,
9798
const ui64 startOffset,
9899
const ui64 endOffset,
99100
const TPartitionId& partition,
@@ -104,8 +105,11 @@ struct TReadInfo {
104105
const NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode,
105106
const bool isActive
106107
) {
107-
TEvPQ::TEvBlobResponse response(0, TVector<TRequestedBlob>());
108-
return FormAnswer(ctx, response, startOffset, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
108+
static TEvPQ::TEvBlobResponse fakeBlobResponse(0, TVector<TRequestedBlob>());
109+
if (!response) {
110+
response = &fakeBlobResponse;
111+
}
112+
return FormAnswer(ctx, *response, startOffset, endOffset, partition, ui, dst, sizeLag, tablet, meteringMode, isActive);
109113
}
110114
};
111115

0 commit comments

Comments
 (0)