Skip to content

Commit 2fcf844

Browse files
committed
refactoring and ut
1 parent 8ac0676 commit 2fcf844

File tree

8 files changed

+297
-150
lines changed

8 files changed

+297
-150
lines changed

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,16 @@ static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs
2727
//
2828
bool TConsumerOperations::IsValid() const
2929
{
30-
return Offsets_.GetNumIntervals() == 1;
30+
return Offsets_.GetNumIntervals() <= 1;
3131
}
3232

3333
std::pair<ui64, ui64> TConsumerOperations::GetOffsetsCommitRange() const
3434
{
3535
Y_ABORT_UNLESS(IsValid());
3636

37-
return {Offsets_.Min(), Offsets_.Max()};
37+
if (Offsets_.Empty()) {
38+
return {0,0};
39+
} else return {Offsets_.Min(), Offsets_.Max()};
3840
}
3941

4042
bool TConsumerOperations::GetForceCommit() const
@@ -88,7 +90,9 @@ void TConsumerOperations::AddOperationImpl(const TString& consumer,
8890
Consumer_ = consumer;
8991
}
9092

91-
Offsets_.InsertInterval(begin, end);
93+
if (end != 0) {
94+
Offsets_.InsertInterval(begin, end);
95+
}
9296

9397
ForceCommit_ = forceCommit;
9498
KillReadSession_ = killReadSession;

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,87 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
622622
readSession2->Close();
623623
}
624624

625+
Y_UNIT_TEST(PartitionSplit_OffsetCommit) {
626+
TTopicSdkTestSetup setup = CreateSetup();
627+
TTopicClient client = setup.MakeClient();
628+
629+
TCreateTopicSettings createSettings;
630+
createSettings
631+
.BeginConfigurePartitioningSettings()
632+
.MinActivePartitions(1)
633+
.MaxActivePartitions(100)
634+
.BeginConfigureAutoPartitioningSettings()
635+
.UpUtilizationPercent(2)
636+
.DownUtilizationPercent(1)
637+
.StabilizationWindow(TDuration::Seconds(2))
638+
.Strategy(EAutoPartitioningStrategy::ScaleUp)
639+
.EndConfigureAutoPartitioningSettings()
640+
.EndConfigurePartitioningSettings();
641+
642+
TConsumerSettings<TCreateTopicSettings> consumers(createSettings, TEST_CONSUMER);
643+
createSettings.AppendConsumers(consumers);
644+
645+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
646+
647+
auto msg = TString(1_MB, 'a');
648+
649+
auto writeSession_1 = CreateWriteSession(client, "producer-1", 0, TEST_TOPIC, false);
650+
auto writeSession_2 = CreateWriteSession(client, "producer-2", 0, TEST_TOPIC, false);
651+
652+
{
653+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 1)));
654+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 2)));
655+
Sleep(TDuration::Seconds(5));
656+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
657+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
658+
}
659+
660+
{
661+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 3)));
662+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 4)));
663+
UNIT_ASSERT(writeSession_1->Write(Msg(msg, 5)));
664+
UNIT_ASSERT(writeSession_2->Write(Msg(msg, 6)));
665+
Sleep(TDuration::Seconds(5));
666+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
667+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
668+
}
669+
670+
auto writeSession2_1 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
671+
auto writeSession2_2 = CreateWriteSession(client, "producer-2", 1, TEST_TOPIC, false);
672+
673+
{
674+
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 7)));
675+
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 8)));
676+
UNIT_ASSERT(writeSession2_1->Write(Msg(msg, 9)));
677+
UNIT_ASSERT(writeSession2_2->Write(Msg(msg, 10)));
678+
Sleep(TDuration::Seconds(5));
679+
auto describe2 = client.DescribeTopic(TEST_TOPIC).GetValueSync();
680+
UNIT_ASSERT_EQUAL(describe2.GetTopicDescription().GetPartitions().size(), 5);
681+
}
682+
683+
auto status = client.CommitOffset(TEST_TOPIC, 1, TEST_CONSUMER, 2).GetValueSync();
684+
UNIT_ASSERT(status.IsSuccess());
685+
686+
auto describeConsumerSettings = TDescribeConsumerSettings().IncludeStats(true);
687+
auto result = client.DescribeConsumer(TEST_TOPIC, TEST_CONSUMER, describeConsumerSettings).GetValueSync();
688+
UNIT_ASSERT(result.IsSuccess());
689+
690+
auto description = result.GetConsumerDescription();
691+
UNIT_ASSERT(description.GetPartitions().size() == 5);
692+
693+
auto stats0 = description.GetPartitions().at(0).GetPartitionConsumerStats();
694+
UNIT_ASSERT(stats0.Defined());
695+
UNIT_ASSERT(stats0->GetCommittedOffset() == 6);
696+
697+
auto stats1 = description.GetPartitions().at(1).GetPartitionConsumerStats();
698+
UNIT_ASSERT(stats1.Defined());
699+
UNIT_ASSERT(stats1->GetCommittedOffset() == 2);
700+
701+
auto stats3 = description.GetPartitions().at(3).GetPartitionConsumerStats();
702+
UNIT_ASSERT(stats3.Defined());
703+
UNIT_ASSERT(stats3->GetCommittedOffset() == 0);
704+
}
705+
625706
Y_UNIT_TEST(CommitTopPast_BeforeAutoscaleAwareSDK) {
626707
TTopicSdkTestSetup setup = CreateSetup();
627708
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);

ydb/services/lib/actors/type_definitions.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ struct TTopicHolder {
6262
.MeteringMode = info.MeteringMode,
6363
.FullConverter = info.TopicNameConverter,
6464
.Partitions = info.Partitions,
65-
.PartitionGraph = info.PartitionGraph // savnik тут будет пусто, т.к. в TTopicInitInfo это поле не сетится. Найти где оно инициализируется и исправить
65+
.PartitionGraph = info.PartitionGraph
6666
};
6767
}
6868
};

ydb/services/persqueue_v1/actors/commit_offset_actor.cpp

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -108,36 +108,37 @@ void TCommitOffsetActor::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TAc
108108
if (partitionNode->HierarhicalParents.size() == 0 && partitionNode->Children.size() == 0) {
109109
SendCommit(topicInitInfo, commitRequest, ctx);
110110
} else {
111-
Kqp.Consumer = ClientId;
112-
Kqp.DataBase = Request().GetDatabaseName().GetOrElse(TString()); // savnik if empty?
113-
Kqp.Path = topic;
111+
std::vector<TKqpHelper::TCommitInfo> commits;
114112

115113
for (auto& parent: partitionNode->HierarhicalParents) {
116114
TKqpHelper::TCommitInfo commit {.PartitionId = parent->Id, .Offset = Max<i64>()};
117-
Commits.push_back(commit);
115+
commits.push_back(commit);
118116
}
119117

120118
for (auto& child: partitionNode->Children) {
121119
TKqpHelper::TCommitInfo commit {.PartitionId = child->Id, .Offset = 0};
122-
Commits.push_back(commit);
120+
commits.push_back(commit);
123121
}
124122

125123
TKqpHelper::TCommitInfo commit {.PartitionId = partitionNode->Id, .Offset = commitRequest->Getoffset()};
126-
Commits.push_back(commit);
124+
commits.push_back(commit);
125+
126+
// savnik if empty database?
127+
Kqp = std::make_unique<TKqpHelper>(Request().GetDatabaseName().GetOrElse(TString()), ClientId, topic, commits);
127128

128129
SendDistributedTxOffsets(ctx);
129130
}
130131
}
131132

132133
void TCommitOffsetActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) {
133-
if (!Kqp.Handle(ev, ctx)) {
134+
if (!Kqp->Handle(ev, ctx)) {
134135
AnswerError("empty list of topics", PersQueue::ErrorCode::UNKNOWN_TOPIC, ctx); // savnik
135136
}
136-
Kqp.BeginTransaction(ctx);
137+
Kqp->BeginTransaction(ctx);
137138
}
138139

139140
void TCommitOffsetActor::SendDistributedTxOffsets(const TActorContext& ctx) {
140-
Kqp.SendCreateSessionRequest(ctx);
141+
Kqp->SendCreateSessionRequest(ctx);
141142
}
142143

143144
void TCommitOffsetActor::SendCommit(const TTopicInitInfo& topic, const Ydb::Topic::CommitOffsetRequest* commitRequest, const TActorContext& ctx) {
@@ -177,16 +178,15 @@ void TCommitOffsetActor::SendCommit(const TTopicInitInfo& topic, const Ydb::Topi
177178

178179
void TCommitOffsetActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
179180
auto& record = ev->Get()->Record;
181+
if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { // savnik finish this part!
182+
Ydb::Topic::CommitOffsetResult result;
183+
Request().SendResult(result, record.GetYdbStatus());
184+
Die(ctx);
185+
}
180186

181-
// savnik set error record.GetYdbStatus()
182-
if (Kqp.Step == 0) {
183-
Kqp.Step++;
184-
Kqp.SendCommits(ev,Commits, ctx);
185-
} else if (Kqp.Step == 1) {
186-
Kqp.Step++;
187-
Kqp.CommitTx(ctx);
188-
} else {
189-
Kqp.CloseKqpSession(ctx);
187+
auto step = Kqp->Handle(ev, ctx);
188+
189+
if (step == TKqpHelper::ECurrentStep::DONE) {
190190
Ydb::Topic::CommitOffsetResult result;
191191
Request().SendResult(result, Ydb::StatusIds::SUCCESS);
192192
Die(ctx);
@@ -213,16 +213,15 @@ void TCommitOffsetActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActo
213213

214214

215215
void TCommitOffsetActor::AnswerError(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) {
216-
217216
Ydb::Topic::CommitOffsetResponse response;
218217
response.mutable_operation()->set_ready(true);
219218
auto issue = response.mutable_operation()->add_issues();
220219
FillIssue(issue, errorCode, errorReason);
221-
response.mutable_operation()->set_status(ConvertPersQueueInternalCodeToStatus(errorCode));
222-
Reply(ConvertPersQueueInternalCodeToStatus(errorCode), response.operation().issues(), ctx);
220+
auto status = ConvertPersQueueInternalCodeToStatus(errorCode);
221+
response.mutable_operation()->set_status(status);
222+
Reply(status, response.operation().issues(), ctx);
223223
}
224224

225-
226225
void TCommitOffsetActor::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
227226
AnswerError(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
228227
}
Lines changed: 2 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
#pragma once
22

33
#include "events.h"
4-
#include "ydb/core/kqp/common/simple/services.h"
5-
#include "ydb/services/metadata/service.h"
4+
#include "kqp_commit_offset_helper.h"
65

76

87
#include <ydb/core/kqp/common/events/events.h>
@@ -16,126 +15,6 @@ namespace NKikimr::NGRpcProxy::V1 {
1615

1716
using namespace NKikimr::NGRpcService;
1817

19-
class TKqpHelper {
20-
21-
public: // savnik
22-
23-
TString DataBase;
24-
TString KqpSessionId;
25-
TString Consumer;
26-
TString Path;
27-
TString TxId;
28-
29-
int Step = 0;
30-
31-
struct TCommitInfo {
32-
ui64 PartitionId;
33-
i64 Offset;
34-
};
35-
36-
void SendCreateSessionRequest(const TActorContext& ctx) {
37-
auto ev = MakeCreateSessionRequest();
38-
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
39-
}
40-
41-
THolder<NKqp::TEvKqp::TEvCreateSessionRequest> MakeCreateSessionRequest() {
42-
auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
43-
ev->Record.MutableRequest()->SetDatabase(DataBase);
44-
return ev;
45-
}
46-
47-
bool Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& /*ctx*/) {
48-
const auto& record = ev->Get()->Record;
49-
50-
if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
51-
return false;
52-
}
53-
54-
KqpSessionId = record.GetResponse().GetSessionId();
55-
Y_ABORT_UNLESS(!KqpSessionId.empty());
56-
57-
return true;
58-
}
59-
60-
void CloseKqpSession(const TActorContext& ctx) {
61-
if (KqpSessionId) {
62-
auto ev = MakeCloseSessionRequest();
63-
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
64-
65-
KqpSessionId = "";
66-
}
67-
}
68-
69-
THolder<NKqp::TEvKqp::TEvCloseSessionRequest> MakeCloseSessionRequest() {
70-
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
71-
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
72-
return ev;
73-
}
74-
75-
void SendCommits(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, std::vector<TCommitInfo> commits, const NActors::TActorContext& ctx)
76-
{
77-
// if (!AppData(ctx)->FeatureFlags.GetEnableTopicServiceTx()) { // savnik need this check?
78-
// return Reply(Ydb::StatusIds::UNSUPPORTED,
79-
// "Disabled transaction support for TopicService.",
80-
// NKikimrIssues::TIssuesIds::DEFAULT_ERROR,
81-
// ctx);
82-
// }
83-
84-
auto& record = ev->Get()->Record;
85-
TxId = record.GetResponse().GetTxMeta().id();
86-
Y_ABORT_UNLESS(!TxId.empty());
87-
88-
89-
auto offsets = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
90-
offsets->Record.MutableRequest()->SetDatabase(DataBase);
91-
offsets->Record.MutableRequest()->SetSessionId(KqpSessionId);
92-
offsets->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED);
93-
offsets->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_TOPIC);
94-
offsets->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId);
95-
offsets->Record.MutableRequest()->MutableTopicOperations()->SetConsumer(Consumer);
96-
97-
// savnik need set something else?
98-
99-
auto* topic = offsets->Record.MutableRequest()->MutableTopicOperations()->AddTopics();
100-
101-
topic->set_path(Path);
102-
103-
for(auto &c: commits) {
104-
auto* partition = topic->add_partitions();
105-
partition->set_partition_id(c.PartitionId);
106-
partition->set_force_commit(true);
107-
partition->set_kill_read_session(true);
108-
auto* offset = partition->add_partition_offsets();
109-
offset->set_end(c.Offset);
110-
}
111-
112-
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), offsets.Release());
113-
}
114-
115-
void BeginTransaction(const NActors::TActorContext& ctx) {
116-
auto begin = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
117-
118-
begin->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);
119-
begin->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
120-
begin->Record.MutableRequest()->SetSessionId(KqpSessionId);
121-
begin->Record.MutableRequest()->SetDatabase(DataBase);
122-
123-
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), begin.Release());
124-
}
125-
126-
void CommitTx(const NActors::TActorContext& ctx) {
127-
auto commit = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
128-
129-
commit->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX);
130-
commit->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId);
131-
commit->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
132-
commit->Record.MutableRequest()->SetSessionId(KqpSessionId);
133-
commit->Record.MutableRequest()->SetDatabase(DataBase);
134-
135-
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), commit.Release());
136-
}
137-
};
138-
13918
class TCommitOffsetActor : public TRpcOperationRequestActor<TCommitOffsetActor, TEvCommitOffsetRequest> {
14019

14120
using TBase = TRpcOperationRequestActor<TCommitOffsetActor, TEvCommitOffsetRequest>;
@@ -219,8 +98,7 @@ class TCommitOffsetActor : public TRpcOperationRequestActor<TCommitOffsetActor,
21998

22099
NPersQueue::TTopicsListController TopicsHandler;
221100

222-
TKqpHelper Kqp;
223-
std::vector<TKqpHelper::TCommitInfo> Commits;
101+
std::unique_ptr<TKqpHelper> Kqp;
224102
};
225103

226104
}

0 commit comments

Comments
 (0)