Skip to content

Commit 49a25e3

Browse files
committed
Refactoring tests of commit offset (#17299)
1 parent 8ecef72 commit 49a25e3

File tree

2 files changed

+81
-16
lines changed

2 files changed

+81
-16
lines changed

src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ TTopicSdkTestSetup::TTopicSdkTestSetup(const TString& testCaseName, const NKikim
2222
}
2323
}
2424

25-
void TTopicSdkTestSetup::CreateTopicWithAutoscale(const TString& path, const TString& consumer, size_t partitionCount, size_t maxPartitionCount) {
25+
void TTopicSdkTestSetup::CreateTopicWithAutoscale(const std::string& path, const std::string& consumer, size_t partitionCount, size_t maxPartitionCount) {
2626
CreateTopic(path, consumer, partitionCount, maxPartitionCount);
2727
}
2828

29-
void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consumer, size_t partitionCount, std::optional<size_t> maxPartitionCount)
29+
void TTopicSdkTestSetup::CreateTopic(const std::string& path, const std::string& consumer, size_t partitionCount, std::optional<size_t> maxPartitionCount)
3030
{
3131
TTopicClient client(MakeDriver());
3232

@@ -49,10 +49,10 @@ void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consume
4949
auto status = client.CreateTopic(path, topics).GetValueSync();
5050
UNIT_ASSERT(status.IsSuccess());
5151

52-
Server.WaitInit(path);
52+
Server.WaitInit(TString{path});
5353
}
5454

55-
TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path)
55+
TTopicDescription TTopicSdkTestSetup::DescribeTopic(const std::string& path)
5656
{
5757
TTopicClient client(MakeDriver());
5858

@@ -66,7 +66,7 @@ TTopicDescription TTopicSdkTestSetup::DescribeTopic(const TString& path)
6666
return status.GetTopicDescription();
6767
}
6868

69-
TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, const TString& consumer)
69+
TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const std::string& path, const std::string& consumer)
7070
{
7171
TTopicClient client(MakeDriver());
7272

@@ -80,22 +80,79 @@ TConsumerDescription TTopicSdkTestSetup::DescribeConsumer(const TString& path, c
8080
return status.GetConsumerDescription();
8181
}
8282

83-
void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId) {
83+
void TTopicSdkTestSetup::Write(const std::string& message, ui32 partitionId, const std::optional<std::string> producer, std::optional<ui64> seqNo) {
8484
TTopicClient client(MakeDriver());
8585

8686
TWriteSessionSettings settings;
8787
settings.Path(TEST_TOPIC);
8888
settings.PartitionId(partitionId);
89-
settings.DeduplicationEnabled(false);
89+
settings.DeduplicationEnabled(producer.has_value());
90+
if (producer) {
91+
settings.ProducerId(producer.value())
92+
.MessageGroupId(producer.value());
93+
}
9094
auto session = client.CreateSimpleBlockingWriteSession(settings);
9195

92-
TWriteMessage msg(TStringBuilder() << message);
93-
UNIT_ASSERT(session->Write(std::move(msg)));
96+
UNIT_ASSERT(session->Write(message, seqNo));
9497

9598
session->Close(TDuration::Seconds(5));
9699
}
97100

98-
TStatus TTopicSdkTestSetup::Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId) {
101+
TTopicSdkTestSetup::ReadResult TTopicSdkTestSetup::Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout) {
102+
TTopicClient client(MakeDriver());
103+
104+
auto reader = client.CreateReadSession(
105+
TReadSessionSettings()
106+
.AutoPartitioningSupport(true)
107+
.AppendTopics(TTopicReadSettings(topic))
108+
.ConsumerName(consumer));
109+
110+
TInstant deadlineTime = TInstant::Now() + timeout;
111+
112+
ReadResult result;
113+
result.Reader = reader;
114+
115+
bool continueFlag = true;
116+
while(continueFlag && deadlineTime > TInstant::Now()) {
117+
for (auto event : reader->GetEvents(false)) {
118+
if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
119+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
120+
if (!handler(*x)) {
121+
continueFlag = false;
122+
break;
123+
}
124+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
125+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
126+
x->Confirm();
127+
result.StartPartitionSessionEvents.push_back(*x);
128+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
129+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
130+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
131+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
132+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
133+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
134+
x->Confirm();
135+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
136+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
137+
} else if (auto* x = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&event)) {
138+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
139+
x->Confirm();
140+
} else if (auto* x = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&event)) {
141+
Cerr << "SESSION EVENT " << x->DebugString() << Endl << Flush;
142+
} else {
143+
Cerr << "SESSION EVENT unhandled \n";
144+
}
145+
}
146+
147+
Sleep(TDuration::MilliSeconds(250));
148+
}
149+
150+
result.Timeout = continueFlag;
151+
152+
return result;
153+
}
154+
155+
TStatus TTopicSdkTestSetup::Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId) {
99156
TTopicClient client(MakeDriver());
100157

101158
TCommitOffsetSettings commitSettings {.ReadSessionId_ = sessionId};

src/client/topic/ut/ut_utils/topic_sdk_test_setup.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,24 @@ class TTopicSdkTestSetup {
1616
public:
1717
TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true);
1818

19-
void CreateTopic(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1,
19+
void CreateTopic(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER, size_t partitionCount = 1,
2020
std::optional<size_t> maxPartitionCount = std::nullopt);
21-
void CreateTopicWithAutoscale(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1,
21+
void CreateTopicWithAutoscale(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER, size_t partitionCount = 1,
2222
size_t maxPartitionCount = 100);
2323

24-
TTopicDescription DescribeTopic(const TString& path = TString{TEST_TOPIC});
25-
TConsumerDescription DescribeConsumer(const TString& path = TString{TEST_TOPIC}, const TString& consumer = TString{TEST_CONSUMER});
24+
TTopicDescription DescribeTopic(const std::string& path = TEST_TOPIC);
25+
TConsumerDescription DescribeConsumer(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER);
2626

27-
void Write(const std::string& message, ui32 partitionId = 0);
28-
TStatus Commit(const TString& path, const TString& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId = std::nullopt);
27+
void Write(const std::string& message, ui32 partitionId = 0, const std::optional<std::string> producer = std::nullopt, std::optional<ui64> seqNo = std::nullopt);
28+
29+
struct ReadResult {
30+
std::shared_ptr<IReadSession> Reader;
31+
bool Timeout;
32+
33+
std::vector<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent> StartPartitionSessionEvents;
34+
};
35+
ReadResult Read(const std::string& topic, const std::string& consumer, std::function<bool (NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent&)> handler, const TDuration timeout = TDuration::Seconds(5));
36+
TStatus Commit(const std::string& path, const std::string& consumerName, size_t partitionId, size_t offset, std::optional<std::string> sessionId = std::nullopt);
2937

3038
TString GetEndpoint() const;
3139
TString GetTopicPath(const TString& name = TString{TEST_TOPIC}) const;

0 commit comments

Comments
 (0)