Skip to content

Commit cd37952

Browse files
committed
Fixed topic tests from ydb repo
1 parent e5f3a43 commit cd37952

File tree

4 files changed

+54
-45
lines changed

4 files changed

+54
-45
lines changed

src/client/topic/impl/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ class TBaseSessionEventsQueue : public ISignalable {
383383
while (!HasEventsImpl()) {
384384
std::unique_lock<std::mutex> lk(Mutex, std::adopt_lock);
385385
CondVar.wait(lk);
386+
lk.release();
386387
}
387388
}
388389

src/client/topic/impl/read_session.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,19 @@ void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
230230

231231
Y_ABORT_UNLESS(!topics.empty());
232232

233-
auto result = Client->UpdateOffsetsInTransaction(tx,
234-
topics,
235-
Settings.ConsumerName_,
236-
{}).GetValueSync();
237-
Y_ABORT_UNLESS(!result.IsTransportError());
238-
239-
if (!result.IsSuccess()) {
240-
ythrow yexception() << "error on update offsets: " << result;
233+
while (true) {
234+
auto result = Client->UpdateOffsetsInTransaction(tx,
235+
topics,
236+
Settings.ConsumerName_,
237+
{}).GetValueSync();
238+
Y_ABORT_UNLESS(!result.IsTransportError());
239+
if (result.GetStatus() != EStatus::SESSION_BUSY) {
240+
if (!result.IsSuccess()) {
241+
ythrow yexception() << "error on update offsets: " << result;
242+
}
243+
break;
244+
}
245+
Sleep(TDuration::MilliSeconds(1));
241246
}
242247

243248
OffsetRanges.erase(std::make_pair(sessionId, txId));
@@ -309,9 +314,10 @@ bool TReadSession::Close(TDuration timeout) {
309314
issues.AddIssue(TStringBuilder() << "Session was closed after waiting " << timeout);
310315
EventsQueue->Close(TSessionClosedEvent(EStatus::TIMEOUT, std::move(issues)), deferred);
311316
}
312-
313-
std::lock_guard guard(Lock);
314-
Aborting = true; // Set abort flag for doing nothing on destructor.
317+
{
318+
std::lock_guard guard(Lock);
319+
Aborting = true; // Set abort flag for doing nothing on destructor.
320+
}
315321
return result;
316322
}
317323

src/client/topic/impl/read_session_impl.ipp

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2184,22 +2184,23 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvents(bool block, std::option
21842184
std::vector<TReadSessionEventInfo<UseMigrationProtocol>> eventInfos;
21852185
const size_t maxCount = maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max();
21862186
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol> accumulator;
2187+
{
2188+
std::lock_guard<std::mutex> guard(TParent::Mutex);
2189+
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.has_value(), maxCount));
2190+
do {
2191+
if (block) {
2192+
TParent::WaitEventsImpl();
2193+
}
21872194

2188-
std::lock_guard<std::mutex> guard(TParent::Mutex);
2189-
eventInfos.reserve(Min(TParent::Events.size() + TParent::CloseEvent.has_value(), maxCount));
2190-
do {
2191-
if (block) {
2192-
TParent::WaitEventsImpl();
2193-
}
2194-
2195-
while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
2196-
TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);
2197-
eventInfos.emplace_back(std::move(event));
2198-
if (eventInfos.back().IsSessionClosedEvent()) {
2199-
break;
2195+
while (TParent::HasEventsImpl() && eventInfos.size() < maxCount && maxByteSize > 0) {
2196+
TReadSessionEventInfo<UseMigrationProtocol> event = GetEventImpl(maxByteSize, accumulator);
2197+
eventInfos.emplace_back(std::move(event));
2198+
if (eventInfos.back().IsSessionClosedEvent()) {
2199+
break;
2200+
}
22002201
}
2201-
}
2202-
} while (block && eventInfos.empty());
2202+
} while (block && eventInfos.empty());
2203+
}
22032204

22042205
accumulator.OnUserRetrievedEvent();
22052206

@@ -2222,18 +2223,19 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEvent(bool block, size_t maxBy
22222223

22232224
std::optional<TReadSessionEventInfo<UseMigrationProtocol>> eventInfo;
22242225
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol> accumulator;
2226+
{
2227+
std::lock_guard<std::mutex> guard(TParent::Mutex);
2228+
do {
2229+
if (block) {
2230+
TParent::WaitEventsImpl();
2231+
}
22252232

2226-
std::lock_guard<std::mutex> guard(TParent::Mutex);
2227-
do {
2228-
if (block) {
2229-
TParent::WaitEventsImpl();
2230-
}
2231-
2232-
if (TParent::HasEventsImpl()) {
2233-
eventInfo = GetEventImpl(maxByteSize, accumulator);
2234-
}
2233+
if (TParent::HasEventsImpl()) {
2234+
eventInfo = GetEventImpl(maxByteSize, accumulator);
2235+
}
22352236

2236-
} while (block && !eventInfo);
2237+
} while (block && !eventInfo);
2238+
}
22372239

22382240
accumulator.OnUserRetrievedEvent();
22392241

src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2124,32 +2124,32 @@ Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
21242124
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false});
21252125

21262126
// Users have created their own topic on it
2127-
CreateTopic(TEST_TOPIC);
2127+
CreateTopic(TString{TEST_TOPIC});
21282128

21292129
// And they wrote their messages into it
2130-
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1");
2131-
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2");
2132-
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3");
2130+
WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-1");
2131+
WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-2");
2132+
WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-3");
21332133

21342134
// And he had a consumer
2135-
AddConsumer(TEST_TOPIC, {"consumer-1"});
2135+
AddConsumer(TString{TEST_TOPIC}, {"consumer-1"});
21362136

21372137
// We read messages from the topic and committed offsets
2138-
auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2138+
auto messages = ReadFromTopic(TString{TEST_TOPIC}, "consumer-1", TDuration::Seconds(2));
21392139
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
2140-
CloseTopicReadSession(TEST_TOPIC, "consumer-1");
2140+
CloseTopicReadSession(TString{TEST_TOPIC}, "consumer-1");
21412141

21422142
// And then the Logbroker team turned on the feature flag
21432143
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true});
21442144

21452145
// Users continued to write to the topic
2146-
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4");
2146+
WriteToTopic(TString{TEST_TOPIC}, TEST_MESSAGE_GROUP_ID, "message-4");
21472147

21482148
// Users have added new consumers
2149-
AddConsumer(TEST_TOPIC, {"consumer-2"});
2149+
AddConsumer(TString{TEST_TOPIC}, {"consumer-2"});
21502150

21512151
// And they wanted to continue reading their messages
2152-
messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2152+
messages = ReadFromTopic(TString{TEST_TOPIC}, "consumer-1", TDuration::Seconds(2));
21532153
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
21542154
}
21552155

0 commit comments

Comments
 (0)