From 55178c43ff4a168ee8c1524daedd4244eb2c707b Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Tue, 4 Feb 2025 00:12:00 +0000 Subject: [PATCH 1/2] Support Metadata Response v13 (added in AK 4.0 as part of KIP-1102) --- proxy/protocol/responses.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index e062176a..774ff90d 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -244,7 +244,32 @@ func createMetadataResponseSchemaVersions() []Schema { &SchemaTaggedFields{Name: "response_tagged_fields"}, ) - return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8, metadataResponseV9, metadataResponseV10, metadataResponseV11, metadataResponseV12} + metadataResponseV13 := NewSchema("metadata_response_v13", + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: metadataBrokerSchema9}, + &Mfield{Name: "cluster_id", Ty: TypeCompactNullableStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: "topic_metadata", Ty: topicMetadataSchema12}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + + return []Schema{ + metadataResponseV0, + metadataResponseV1, + metadataResponseV2, + metadataResponseV3, + metadataResponseV4, + metadataResponseV5, + metadataResponseV6, + metadataResponseV7, + metadataResponseV8, + metadataResponseV9, + metadataResponseV10, + metadataResponseV11, + metadataResponseV12, + metadataResponseV13, + } } func createFindCoordinatorResponseSchemaVersions() []Schema { From 6d3994bcc99dcc5d794d8752c06cc9a9222c2afe Mon Sep 17 00:00:00 2001 From: Michal Budzyn Date: Mon, 24 Mar 2025 01:05:41 +0100 Subject: [PATCH 2/2] support produce version 12 --- proxy/processor_default.go | 6 +- proxy/protocol/responses_test.go | 120 +++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 4 deletions(-) diff --git a/proxy/processor_default.go b/proxy/processor_default.go index fc243514..67bd12b8 100644 --- a/proxy/processor_default.go +++ b/proxy/processor_default.go @@ -158,8 +158,8 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ if err != nil { return false, nil, err } - - case 3, 4, 5, 6, 7, 8, 9, 10, 11: + default: + // case 3, 4, 5, 6, 7, 8, 9, 10, 11, 12: // CorrelationID + ClientID if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil { return false, nil, err @@ -169,8 +169,6 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ if err != nil { return false, nil, err } - default: - return false, nil, fmt.Errorf("produce version %d is not supported", requestKeyVersion.ApiVersion) } return acks != 0, bufferRead.Bytes(), nil } diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index 1ba49d41..48e16474 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -41,6 +41,8 @@ var ( return "myhost3", 34003, nil } else if brokerHost == "localhost" && brokerPort == 9999 { return "myhost", 34000, nil + } else if brokerHost == "localhost" && brokerPort == 9092 { + return "myhost5", 34005, nil } return "", 0, errors.New("unexpected data") } @@ -2436,6 +2438,124 @@ func TestMetadataResponseV11(t *testing.T) { testMetadataResponse(t, apiVersion, payload, expectedInput, expectedModified) } +func TestMetadataResponseV13(t *testing.T) { + apiVersion := int16(13) + payload := "0000000002000000010a6c6f63616c686f737400002384000017354c3667336e5368542d654d43744b2d2d58383673770000000102000010746573742d6e6f2d686561646572737d064a8944b14f078805ea02998647bf00040000000000010000000100000000020000000102000000010100000000000002000000010000000002000000010200000001010000000000000000000001000000000200000001020000000101008000000000000000" + expectedInput := []string{ + "throttle_time_ms int32 0", + "[brokers]", + "brokers struct", + "node_id int32 1", + "host string localhost", + "port int32 9092", + "rack *string ", + "[broker_tagged_fields]", + "cluster_id *string 5L6g3nShT-eMCtK--X86sw", + "controller_id int32 1", + "[topic_metadata]", + "topic_metadata struct", + "error_code int16 0", + "name *string test-no-headers", + "topic_id uuid 7d064a89-44b1-4f07-8805-ea02998647bf", + "is_internal bool false", + "[partition_metadata]", + "partition_metadata struct", + "error_code int16 0", + "partition int32 1", + "leader int32 1", + "leader_epoch int32 0", + "[replicas]", + "replicas int32 1", + "[isr]", + "isr int32 1", + "[offline_replicas]", + "[partition_metadata_tagged_fields]", + "partition_metadata struct", + "error_code int16 0", + "partition int32 2", + "leader int32 1", + "leader_epoch int32 0", + "[replicas]", + "replicas int32 1", + "[isr]", + "isr int32 1", + "[offline_replicas]", + "[partition_metadata_tagged_fields]", + "partition_metadata struct", + "error_code int16 0", + "partition int32 0", + "leader int32 1", + "leader_epoch int32 0", + "[replicas]", + "replicas int32 1", + "[isr]", + "isr int32 1", + "[offline_replicas]", + "[partition_metadata_tagged_fields]", + "topic_authorized_operations int32 -2147483648", + "[topic_metadata_tagged_fields]", + "error_code int16 0", + "[response_tagged_fields]", + } + expectedModified := []string{ + "throttle_time_ms int32 0", + "[brokers]", + "brokers struct", + "node_id int32 1", + "host string myhost5", + "port int32 34005", + "rack *string ", + "[broker_tagged_fields]", + "cluster_id *string 5L6g3nShT-eMCtK--X86sw", + "controller_id int32 1", + "[topic_metadata]", + "topic_metadata struct", + "error_code int16 0", + "name *string test-no-headers", + "topic_id uuid 7d064a89-44b1-4f07-8805-ea02998647bf", + "is_internal bool false", + "[partition_metadata]", + "partition_metadata struct", + "error_code int16 0", + "partition int32 1", + "leader int32 1", + "leader_epoch int32 0", + "[replicas]", + "replicas int32 1", + "[isr]", + "isr int32 1", + "[offline_replicas]", + "[partition_metadata_tagged_fields]", + "partition_metadata struct", + "error_code int16 0", + "partition int32 2", + "leader int32 1", + "leader_epoch int32 0", + "[replicas]", + "replicas int32 1", + "[isr]", + "isr int32 1", + "[offline_replicas]", + "[partition_metadata_tagged_fields]", + "partition_metadata struct", + "error_code int16 0", + "partition int32 0", + "leader int32 1", + "leader_epoch int32 0", + "[replicas]", + "replicas int32 1", + "[isr]", + "isr int32 1", + "[offline_replicas]", + "[partition_metadata_tagged_fields]", + "topic_authorized_operations int32 -2147483648", + "[topic_metadata_tagged_fields]", + "error_code int16 0", + "[response_tagged_fields]", + } + testMetadataResponse(t, apiVersion, payload, expectedInput, expectedModified) +} + func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expectedInput, expectedModified []string) { bytes, err := hex.DecodeString(payload) if err != nil {