Skip to content

Rework of PR# 183 feat: Add support for deterministic listener ports (based on broker ID) #191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func initFlags() {
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")

Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ var (
Version = "unknown"
)

type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error)

type ListenerConfig struct {
BrokerAddress string
ListenerAddress string
AdvertisedAddress string
}

type DialAddressMapping struct {
SourceAddress string
DestinationAddress string
Expand Down Expand Up @@ -74,6 +75,7 @@ type Config struct {
DefaultListenerIP string
BootstrapServers []ListenerConfig
ExternalServers []ListenerConfig
DeterministicListeners bool
DialAddressMappings []DialAddressMapping
DisableDynamicListeners bool
DynamicAdvertisedListener string
Expand Down
7 changes: 4 additions & 3 deletions proxy/processor_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package proxy
import (
"bytes"
"encoding/hex"
"testing"
"time"

"github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestHandleRequest(t *testing.T) {
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) {
}

func TestHandleResponse(t *testing.T) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" {
switch brokerPort {
case 19092:
Expand Down
25 changes: 17 additions & 8 deletions proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
brokersKeyName = "brokers"
hostKeyName = "host"
portKeyName = "port"
nodeKeyName = "node_id"

coordinatorKeyName = "coordinator"
coordinatorsKeyName = "coordinators"
Expand All @@ -26,7 +27,7 @@ var (

func createMetadataResponseSchemaVersions() []Schema {
metadataBrokerV0 := NewSchema("metadata_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand All @@ -51,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
)

metadataBrokerV1 := NewSchema("metadata_broker_v1",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeNullableStr},
)

metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
Expand Down Expand Up @@ -248,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {

func createFindCoordinatorResponseSchemaVersions() []Schema {
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)

findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
&Mfield{Name: "node_id", Ty: TypeInt32},
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
&Mfield{Name: portKeyName, Ty: TypeInt32},
)
Expand Down Expand Up @@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
if !ok {
return errors.New("broker.port not found")
}
nodeId, ok := broker.Get(nodeKeyName).(int32)
if !ok {
return errors.New("broker.node_id not found")
}

if host == "" && port <= 0 {
continue
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand All @@ -336,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
}
}
if port != newPort {
err = broker.Replace(portKeyName, int32(newPort))
err = broker.Replace(portKeyName, newPort)
if err != nil {
return err
}
Expand Down Expand Up @@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
if !ok {
return errors.New("coordinator.port not found")
}
nodeId, ok := coordinator.Get(nodeKeyName).(int32)
if !ok {
return errors.New("coordinator.node_id not found")
}

if host == "" && port <= 0 {
return nil
}

newHost, newPort, err := fn(host, port)
newHost, newPort, err := fn(host, port, nodeId)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions proxy/protocol/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package protocol
import (
"encoding/hex"
"fmt"
"github.com/google/uuid"
"reflect"
"strings"
"testing"

"github.com/google/uuid"

"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
Expand All @@ -20,7 +21,7 @@ var (
// topic_metadata
0x00, 0x00, 0x00, 0x00}

testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "myhost1", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand All @@ -31,7 +32,7 @@ var (
return "", 0, errors.New("unexpected data")
}

testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 19092 {
return "myhost1", 34001, nil
} else if brokerHost == "localhost" && brokerPort == 29092 {
Expand Down Expand Up @@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) {
a.Nil(err)
a.Equal(bytes, resp)

modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
if brokerHost == "localhost" && brokerPort == 51 {
return "azure.microsoft.com", 34001, nil
} else if brokerHost == "google.com" && brokerPort == 273 {
Expand Down
Loading
Loading