Skip to content

Commit 47cccd1

Browse files
[YDB Topics] Kafka API: Fix epoch overflow in init_producer_id (#16091)
1 parent 4518916 commit 47cccd1

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ namespace NKafka {
121121

122122
void TKafkaInitProducerIdActor::RequestFullRetry(const TActorContext& ctx) {
123123
CurrentTxAbortRetryNumber++;
124+
Kqp->ResetTxId();
124125
StartTxProducerInitCycle(ctx);
125126
}
126127

@@ -143,6 +144,8 @@ namespace NKafka {
143144
try {
144145
switch (LastSentToKqpRequest) {
145146
case BEGIN_TRANSACTION:
147+
// save tx id for future requests
148+
Kqp->SetTxId(ev->Get()->Record.GetResponse().GetTxMeta().id());
146149
SendSelectRequest(ctx);
147150
break;
148151
case SELECT:

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2166,9 +2166,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
21662166
}
21672167

21682168
Y_UNIT_TEST(InitProducerId_forPreviouslySeenTransactionalIdShouldReturnNewProducerIdIfEpochOverflown) {
2169-
return; // mute test till implementation
21702169
TInsecureTestServer testServer;
2171-
ui16 maxEpoch = std::numeric_limits<ui16>::max();
21722170

21732171
TKafkaTestClient kafkaClient(testServer.Port);
21742172
// use random transactional id for each request top avoid parallel execution problems
@@ -2182,20 +2180,22 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
21822180
rows.BeginList();
21832181
rows.AddListItem()
21842182
.BeginStruct()
2183+
.AddMember("database").Utf8("/Root")
21852184
.AddMember("transactional_id").Utf8(transactionalId)
21862185
.AddMember("producer_id").Int64(resp1->ProducerId)
21872186
.AddMember("producer_epoch").Int16(std::numeric_limits<i16>::max() - 1)
2187+
.AddMember("updated_at").Datetime(TInstant::Now())
21882188
.EndStruct();
21892189
rows.EndList();
2190-
auto upsertResult = tableClient.BulkUpsert(NKikimr::NGRpcProxy::V1::TTransactionalProducersInitManager::GetInstant()->GetStorageTablePath().c_str(), rows.Build()).GetValueSync();
2190+
auto upsertResult = tableClient.BulkUpsert("//Root/.metadata/kafka_transactional_producers", rows.Build()).GetValueSync();
21912191
UNIT_ASSERT_EQUAL(upsertResult.GetStatus(), EStatus::SUCCESS);
21922192

21932193
auto resp2 = kafkaClient.InitProducerId(transactionalId);
21942194

21952195
// validate first response
21962196
UNIT_ASSERT_VALUES_EQUAL(resp1->ErrorCode, EKafkaErrors::NONE_ERROR);
21972197
UNIT_ASSERT_GT(resp1->ProducerId, 0);
2198-
UNIT_ASSERT_VALUES_EQUAL(resp1->ProducerEpoch, maxEpoch);
2198+
UNIT_ASSERT_VALUES_EQUAL(resp1->ProducerEpoch, 0);
21992199
// validate second response
22002200
UNIT_ASSERT_VALUES_EQUAL(resp2->ErrorCode, EKafkaErrors::NONE_ERROR);
22012201
UNIT_ASSERT_GT(resp2->ProducerId, 0);

0 commit comments

Comments
 (0)