Skip to content

Commit 247e41c

Browse files
FloatingCrowbarnshestakov
authored andcommitted
Fix nullable value in TKafkaRecord (#17609)
1 parent 607beb3 commit 247e41c

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

ydb/core/kafka_proxy/kafka_records.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ class TKafkaRecord: public TMessage {
165165

166166
static constexpr TKafkaVersions PresentVersions = VersionsAlways;
167167
static constexpr TKafkaVersions TaggedVersions = VersionsNever;
168-
static constexpr TKafkaVersions NullableVersions = VersionsNever;
168+
static constexpr TKafkaVersions NullableVersions = VersionsAlways;
169169
static constexpr TKafkaVersions FlexibleVersions = VersionsAlways;
170170
};
171171
ValueMeta::Type Value;

ydb/core/kafka_proxy/ut/ut_protocol.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,24 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
568568
AssertMessageMeta(readMessage, headerKey, headerValue);
569569
}
570570

571+
// send empty produce message
572+
{
573+
TKafkaRecordBatch batch;
574+
batch.BaseOffset = 3;
575+
batch.BaseSequence = 5;
576+
batch.Magic = 2; // Current supported
577+
batch.Records.resize(1);
578+
batch.Records[0].Key = TKafkaBytes{};
579+
batch.Records[0].Value = TKafkaBytes{};
580+
581+
auto msg = client.Produce(topicName, 0, batch);
582+
583+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName);
584+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
585+
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
586+
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
587+
}
588+
571589
{
572590
// Check short topic name
573591

0 commit comments

Comments
 (0)