Skip to content

Commit 6072bfc

Browse files
authored
Kafka protocol: idempotent producer (#19678)
1 parent 29f8f7b commit 6072bfc

15 files changed

+726
-87
lines changed

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "kafka_produce_actor.h"
2+
#include <library/cpp/string_utils/base64/base64.h>
23
#include <ydb/core/kafka_proxy/kafka_metrics.h>
34

45
#include <contrib/libs/protobuf/src/google/protobuf/util/time_util.h>
@@ -253,16 +254,26 @@ size_t TKafkaProduceActor::EnqueueInitialization() {
253254
return canProcess;
254255
}
255256

256-
THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data,
257-
const TString& topicName,
258-
ui64 cookie,
259-
const TString& clientDC,
260-
bool ruPerRequest) {
257+
std::pair<EKafkaErrors, THolder<TEvPartitionWriter::TEvWriteRequest>> Convert(
258+
const TString& transactionalId,
259+
const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data,
260+
const TString& topicName,
261+
ui64 cookie,
262+
const TString& clientDC,
263+
bool ruPerRequest
264+
) {
261265
auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
262266
auto& request = ev->Record;
263267

264268
const auto& batch = data.Records;
265-
const TString sourceId = TStringBuilder() << batch->ProducerId;
269+
270+
TString sourceId;
271+
TBuffer buf;
272+
buf.Reserve(transactionalId.size() + sizeof(batch->ProducerId));
273+
buf.Append(transactionalId.data(), transactionalId.size());
274+
buf.Append(static_cast<const char*>(static_cast<const void*>(&batch->ProducerId)), sizeof(batch->ProducerId));
275+
buf.AsString(sourceId);
276+
sourceId = Base64Encode(sourceId);
266277

267278
auto* partitionRequest = request.MutablePartitionRequest();
268279
partitionRequest->SetTopic(topicName);
@@ -274,7 +285,9 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
274285

275286
ui64 totalSize = 0;
276287

277-
for (const auto& record : batch->Records) {
288+
for (ui64 batchIndex = 0; batchIndex < batch->Records.size(); ++batchIndex) {
289+
const auto& record = batch->Records[batchIndex];
290+
278291
NKikimrPQClient::TDataChunk proto;
279292
proto.set_codec(NPersQueueCommon::RAW);
280293
for(auto& h : record.Headers) {
@@ -283,7 +296,7 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
283296
res->set_key(static_cast<const char*>(h.Key->data()), h.Key->size());
284297
}
285298
if (h.Value) {
286-
res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size());
299+
res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size());
287300
}
288301
}
289302

@@ -302,9 +315,25 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
302315
Y_ABORT_UNLESS(res);
303316

304317
auto w = partitionRequest->AddCmdWrite();
305-
306318
w->SetSourceId(sourceId);
307-
w->SetSeqNo(batch->BaseOffset + record.OffsetDelta);
319+
320+
w->SetEnableKafkaDeduplication(batch->ProducerId >= 0);
321+
if (batch->ProducerEpoch >= 0) {
322+
w->SetProducerEpoch(batch->ProducerEpoch);
323+
} else if (batch->ProducerEpoch == -1) {
324+
// Kafka accepts messages with producer epoch == -1, as long as it's the first "epoch" for this producer ID,
325+
// and ignores sequence numbers. I.e. you can send seqnos in any order with epoch == -1.
326+
} else if (batch->ProducerEpoch < -1) {
327+
return {EKafkaErrors::INVALID_PRODUCER_EPOCH, nullptr};
328+
}
329+
330+
if (batch->BaseSequence >= 0) {
331+
// Handle int32 overflow.
332+
w->SetSeqNo((static_cast<ui64>(batch->BaseSequence) + batchIndex) % (static_cast<ui64>(std::numeric_limits<i32>::max()) + 1));
333+
} else {
334+
return {EKafkaErrors::INVALID_RECORD, nullptr};
335+
}
336+
308337
w->SetData(str);
309338
ui64 createTime = batch->BaseTimestamp + record.TimestampDelta;
310339
w->SetCreateTimeMS(createTime ? createTime : TInstant::Now().MilliSeconds());
@@ -319,7 +348,7 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::
319348

320349
partitionRequest->SetPutUnitsSize(NPQ::PutUnitsSize(totalSize));
321350

322-
return ev;
351+
return {EKafkaErrors::NONE_ERROR, std::move(ev)};
323352
}
324353

325354
size_t PartsCount(const TMessagePtr<TProduceRequestData>& r) {
@@ -350,6 +379,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
350379
}
351380

352381
auto writer = PartitionWriter({topicPath, static_cast<ui32>(partitionId)}, producerInstanceId, transactionalId, ctx);
382+
auto& result = pendingRequest->Results[position];
353383
if (OK == writer.first) {
354384
auto ownCookie = ++Cookie;
355385
auto& cookieInfo = Cookies[ownCookie];
@@ -361,12 +391,14 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
361391
pendingRequest->WaitAcceptingCookies.insert(ownCookie);
362392
pendingRequest->WaitResultCookies.insert(ownCookie);
363393

364-
auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest);
365-
ruPerRequest = false;
366-
367-
Send(writer.second, std::move(ev));
394+
auto [error, ev] = Convert(transactionalId.GetOrElse(""), partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest);
395+
if (error == EKafkaErrors::NONE_ERROR) {
396+
ruPerRequest = false;
397+
Send(writer.second, std::move(ev));
398+
} else {
399+
result.ErrorCode = error;
400+
}
368401
} else {
369-
auto& result = pendingRequest->Results[position];
370402
switch (writer.first) {
371403
case NOT_FOUND:
372404
result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
@@ -522,14 +554,25 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
522554
SendMetrics(TStringBuilder() << topicData.Name, writeResults.size(), "successful_messages", ctx);
523555
auto& lastResult = writeResults.at(writeResults.size() - 1);
524556
partitionResponse.LogAppendTimeMs = lastResult.GetWriteTimestampMS();
525-
partitionResponse.BaseOffset = lastResult.GetSeqNo();
557+
partitionResponse.BaseOffset = writeResults.at(0).GetOffset();
526558
}
527559
} else {
528560
KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason << ", #02");
529561
SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
530-
partitionResponse.ErrorCode = Convert(msg->GetError().Code);
531-
metricsErrorCode = Convert(msg->GetError().Code);
532-
partitionResponse.ErrorMessage = msg->GetError().Reason;
562+
563+
if (msg->Record.GetErrorCode() == NPersQueue::NErrorCode::KAFKA_INVALID_PRODUCER_EPOCH) {
564+
partitionResponse.ErrorCode = EKafkaErrors::INVALID_PRODUCER_EPOCH;
565+
metricsErrorCode = EKafkaErrors::INVALID_PRODUCER_EPOCH;
566+
partitionResponse.ErrorMessage = msg->Record.GetErrorReason();
567+
} else if (msg->Record.GetErrorCode() == NPersQueue::NErrorCode::KAFKA_OUT_OF_ORDER_SEQUENCE_NUMBER) {
568+
partitionResponse.ErrorCode = EKafkaErrors::OUT_OF_ORDER_SEQUENCE_NUMBER;
569+
metricsErrorCode = EKafkaErrors::OUT_OF_ORDER_SEQUENCE_NUMBER;
570+
partitionResponse.ErrorMessage = msg->Record.GetErrorReason();
571+
} else {
572+
partitionResponse.ErrorCode = Convert(msg->GetError().Code);
573+
metricsErrorCode = Convert(msg->GetError().Code);
574+
partitionResponse.ErrorMessage = msg->GetError().Reason;
575+
}
533576
}
534577
}
535578
}

ydb/core/kafka_proxy/ut/kafka_test_client.cpp

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,33 @@ TKafkaTestClient::TKafkaTestClient(ui16 port, const TString clientName)
1717
, ClientName(clientName) {
1818
}
1919

20-
TMessagePtr<TApiVersionsResponseData> TKafkaTestClient::ApiVersions() {
21-
Cerr << ">>>>> ApiVersionsRequest\n";
20+
TMessagePtr<TApiVersionsResponseData> TKafkaTestClient::ApiVersions(bool silent) {
21+
if (!silent) {
22+
Cerr << ">>>>> ApiVersionsRequest\n";
23+
}
2224

2325
TRequestHeaderData header = Header(NKafka::EApiKey::API_VERSIONS, 2);
2426

2527
TApiVersionsRequestData request;
2628
request.ClientSoftwareName = "SuperTest";
2729
request.ClientSoftwareVersion = "3100.7.13";
2830

29-
return WriteAndRead<TApiVersionsResponseData>(header, request);
31+
return WriteAndRead<TApiVersionsResponseData>(header, request, silent);
3032
}
3133

32-
TMessagePtr<TMetadataResponseData> TKafkaTestClient::Metadata(const TVector<TString>& topics) {
34+
// YDB ignores AllowAutoTopicCreation, i.e. it never creates a new topic implicitly.
35+
// But in Apache Kafka the default behavior is to create a new topic, if there is no one at the moment of the request.
36+
// With this flag, allowAutoTopicCreation, you can stop this behavior in Apache Kafka.
37+
TMessagePtr<TMetadataResponseData> TKafkaTestClient::Metadata(const TVector<TString>& topics, std::optional<bool> allowAutoTopicCreation) {
3338
Cerr << ">>>>> MetadataRequest\n";
3439

35-
TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 9);
40+
TRequestHeaderData header = Header(NKafka::EApiKey::METADATA, 12);
3641

3742
TMetadataRequestData request;
43+
if (allowAutoTopicCreation.has_value()) {
44+
// If allowAutoTopicCreation does not have a value, use the default value (= true).
45+
request.AllowAutoTopicCreation = allowAutoTopicCreation.value() ? 1 : 0;
46+
}
3847
request.Topics.reserve(topics.size());
3948
for (auto topicName : topics) {
4049
NKafka::TMetadataRequestData::TMetadataRequestTopic topic;
@@ -70,7 +79,7 @@ TMessagePtr<TSaslAuthenticateResponseData> TKafkaTestClient::SaslAuthenticate(co
7079
return WriteAndRead<TSaslAuthenticateResponseData>(header, request);
7180
}
7281

73-
TMessagePtr<TInitProducerIdResponseData> TKafkaTestClient::InitProducerId(const TString& transactionalId) {
82+
TMessagePtr<TInitProducerIdResponseData> TKafkaTestClient::InitProducerId(const std::optional<TString>& transactionalId) {
7483
Cerr << ">>>>> TInitProducerIdRequestData\n";
7584

7685
TRequestHeaderData header = Header(NKafka::EApiKey::INIT_PRODUCER_ID, 4);
@@ -120,6 +129,7 @@ TMessagePtr<TProduceResponseData> TKafkaTestClient::Produce(const TString& topic
120129
TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9);
121130

122131
TProduceRequestData request;
132+
request.Acks = -1;
123133
request.TopicData.resize(1);
124134
request.TopicData[0].Name = topicName;
125135
request.TopicData[0].PartitionData.resize(msgs.size());
@@ -427,8 +437,9 @@ TMessagePtr<TFetchResponseData> TKafkaTestClient::Fetch(const std::vector<std::p
427437
TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 3);
428438

429439
TFetchRequestData request;
430-
request.MaxBytes = 1024;
440+
request.MaxWaitMs = 1000;
431441
request.MinBytes = 1;
442+
request.ReplicaId = -1;
432443

433444
for (auto& topic: topics) {
434445
NKafka::TFetchRequestData::TFetchTopic topicReq {};
@@ -437,7 +448,33 @@ TMessagePtr<TFetchResponseData> TKafkaTestClient::Fetch(const std::vector<std::p
437448
NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {};
438449
partitionReq.FetchOffset = offset;
439450
partitionReq.Partition = partition;
440-
partitionReq.PartitionMaxBytes = 1024;
451+
partitionReq.PartitionMaxBytes = 1_MB;
452+
topicReq.Partitions.push_back(partitionReq);
453+
}
454+
request.Topics.push_back(topicReq);
455+
}
456+
457+
return WriteAndRead<TFetchResponseData>(header, request);
458+
}
459+
460+
TMessagePtr<TFetchResponseData> TKafkaTestClient::Fetch(const std::vector<std::pair<TKafkaUuid, std::vector<i32>>>& topics, i64 offset) {
461+
Cerr << ">>>>> TFetchRequestData\n";
462+
463+
TRequestHeaderData header = Header(NKafka::EApiKey::FETCH, 13);
464+
465+
TFetchRequestData request;
466+
request.MaxWaitMs = 1000;
467+
request.MinBytes = 1;
468+
request.ReplicaId = -1;
469+
470+
for (auto& topic: topics) {
471+
NKafka::TFetchRequestData::TFetchTopic topicReq {};
472+
topicReq.TopicId = topic.first;
473+
for (auto& partition: topic.second) {
474+
NKafka::TFetchRequestData::TFetchTopic::TFetchPartition partitionReq {};
475+
partitionReq.FetchOffset = offset;
476+
partitionReq.Partition = partition;
477+
partitionReq.PartitionMaxBytes = 1_MB;
441478
topicReq.Partitions.push_back(partitionReq);
442479
}
443480
request.Topics.push_back(topicReq);
@@ -457,6 +494,7 @@ TMessagePtr<TCreateTopicsResponseData> TKafkaTestClient::CreateTopics(std::vecto
457494
NKafka::TCreateTopicsRequestData::TCreatableTopic topic;
458495
topic.Name = topicToCreate.Name;
459496
topic.NumPartitions = topicToCreate.PartitionsNumber;
497+
topic.ReplicationFactor = topicToCreate.ReplicationFactor;
460498

461499
auto addConfig = [&topic](std::optional<TString> configValue, TString configName) {
462500
if (configValue.has_value()) {
@@ -672,31 +710,35 @@ ui32 TKafkaTestClient::NextCorrelation() {
672710
}
673711

674712
template <std::derived_from<TApiMessage> T>
675-
TMessagePtr<T> TKafkaTestClient::WriteAndRead(TRequestHeaderData& header, TApiMessage& request) {
676-
Write(So, &header, &request);
713+
TMessagePtr<T> TKafkaTestClient::WriteAndRead(TRequestHeaderData& header, TApiMessage& request, bool silent) {
714+
Write(So, &header, &request, silent);
677715
return Read<T>(Si, &header);
678716
}
679717

680-
void TKafkaTestClient::Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) {
718+
void TKafkaTestClient::Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version, bool silent) {
681719
TWritableBuf sb(nullptr, request->Size(version) + 1000);
682720
TKafkaWritable writable(sb);
683721
request->Write(writable, version);
684722
so.Write(sb.Data(), sb.Size());
685723

686-
Print(sb.GetBuffer());
724+
if (!silent) {
725+
Print(sb.GetBuffer());
726+
}
687727
}
688728

689-
void TKafkaTestClient::Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) {
729+
void TKafkaTestClient::Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request, bool silent) {
690730
TKafkaVersion version = header->RequestApiVersion;
691731
TKafkaVersion headerVersion = RequestHeaderVersion(request->ApiKey(), version);
692732

693733
TKafkaInt32 size = header->Size(headerVersion) + request->Size(version);
694-
Cerr << ">>>>> Size=" << size << Endl;
734+
if (!silent) {
735+
Cerr << ">>>>> Size=" << size << Endl;
736+
}
695737
NKafka::NormalizeNumber(size);
696738
so.Write(&size, sizeof(size));
697739

698-
Write(so, header, headerVersion);
699-
Write(so, request, version);
740+
Write(so, header, headerVersion, silent);
741+
Write(so, request, version, silent);
700742

701743
so.Flush();
702744
}

ydb/core/kafka_proxy/ut/kafka_test_client.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ struct TTopicConfig {
1717
ui32 partionsNumber,
1818
std::optional<TString> retentionMs = std::nullopt,
1919
std::optional<TString> retentionBytes = std::nullopt,
20-
const std::map<TString, TString>& configs = DummyMap)
20+
const std::map<TString, TString>& configs = DummyMap,
21+
TKafkaInt16 replicationFactor = 1)
2122
: Name(name)
2223
, PartitionsNumber(partionsNumber)
2324
, RetentionMs(retentionMs)
2425
, RetentionBytes(retentionBytes)
2526
, Configs(configs)
27+
, ReplicationFactor(replicationFactor)
2628
{
2729
}
2830

@@ -31,6 +33,7 @@ struct TTopicConfig {
3133
std::optional<TString> RetentionMs;
3234
std::optional<TString> RetentionBytes;
3335
std::map<TString, TString> Configs;
36+
TKafkaInt16 ReplicationFactor;
3437
};
3538

3639
struct TReadInfo {
@@ -53,15 +56,15 @@ class TKafkaTestClient {
5356
return Read<T>(Si, &header);
5457
}
5558

56-
TMessagePtr<TApiVersionsResponseData> ApiVersions();
59+
TMessagePtr<TApiVersionsResponseData> ApiVersions(bool silent = false);
5760

58-
TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {});
61+
TMessagePtr<TMetadataResponseData> Metadata(const TVector<TString>& topics = {}, std::optional<bool> allowAutoTopicCreation = std::nullopt);
5962

6063
TMessagePtr<TSaslHandshakeResponseData> SaslHandshake(const TString& mechanism = "PLAIN");
6164

6265
TMessagePtr<TSaslAuthenticateResponseData> SaslAuthenticate(const TString& user, const TString& password);
6366

64-
TMessagePtr<TInitProducerIdResponseData> InitProducerId(const TString& transactionalId = "");
67+
TMessagePtr<TInitProducerIdResponseData> InitProducerId(const std::optional<TString>& transactionalId = std::nullopt);
6568

6669
TMessagePtr<TOffsetCommitResponseData> OffsetCommit(TString groupId, std::unordered_map<TString, std::vector<NKafka::TEvKafka::PartitionConsumerOffset>> topicToConsumerOffsets);
6770

@@ -101,6 +104,7 @@ class TKafkaTestClient {
101104

102105
TMessagePtr<TOffsetFetchResponseData> OffsetFetch(TOffsetFetchRequestData request);
103106

107+
TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TKafkaUuid, std::vector<i32>>>& topics, i64 offset = 0);
104108
TMessagePtr<TFetchResponseData> Fetch(const std::vector<std::pair<TString, std::vector<i32>>>& topics, i64 offset = 0);
105109

106110
TMessagePtr<TCreateTopicsResponseData> CreateTopics(std::vector<TTopicConfig> topicsToCreate, bool validateOnly = false);
@@ -128,9 +132,9 @@ class TKafkaTestClient {
128132
protected:
129133
ui32 NextCorrelation();
130134
template <std::derived_from<TApiMessage> T>
131-
TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request);
132-
void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version);
133-
void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request);
135+
TMessagePtr<T> WriteAndRead(TRequestHeaderData& header, TApiMessage& request, bool silent = false);
136+
void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version, bool silent = false);
137+
void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request, bool silent = false);
134138
template <std::derived_from<TApiMessage> T>
135139
TMessagePtr<T> Read(TSocketInput& si, TRequestHeaderData* requestHeader);
136140
void Print(const TBuffer& buffer);

0 commit comments

Comments
 (0)