diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index f5811c7a..dccfdea7 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -89,6 +89,7 @@ func initFlags() { Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))") Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started") Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment") + Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).") Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.") Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.") diff --git a/config/config.go b/config/config.go index d8f958fa..7ce09ba1 100644 --- a/config/config.go +++ b/config/config.go @@ -22,13 +22,14 @@ var ( Version = "unknown" ) -type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) +type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) type ListenerConfig struct { BrokerAddress string ListenerAddress string AdvertisedAddress string } + type DialAddressMapping struct { SourceAddress string DestinationAddress string @@ -74,6 +75,7 @@ type Config struct { DefaultListenerIP string BootstrapServers []ListenerConfig ExternalServers []ListenerConfig + DeterministicListeners bool DialAddressMappings []DialAddressMapping DisableDynamicListeners bool DynamicAdvertisedListener string diff --git a/proxy/processor_default_test.go b/proxy/processor_default_test.go index f2720f4a..cc31e58a 100644 --- a/proxy/processor_default_test.go +++ b/proxy/processor_default_test.go @@ -3,11 +3,12 @@ package proxy import ( "bytes" "encoding/hex" + "testing" + "time" + "github.com/grepplabs/kafka-proxy/proxy/protocol" "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestHandleRequest(t *testing.T) { @@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) { } func TestHandleResponse(t *testing.T) { - netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" { switch brokerPort { case 19092: diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index fcd5cb65..e062176a 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -14,6 +14,7 @@ const ( brokersKeyName = "brokers" hostKeyName = "host" portKeyName = "port" + nodeKeyName = "node_id" coordinatorKeyName = "coordinator" coordinatorsKeyName = "coordinators" @@ -26,7 +27,7 @@ var ( func createMetadataResponseSchemaVersions() []Schema { metadataBrokerV0 := NewSchema("metadata_broker_v0", - &Mfield{Name: "node_id", Ty: TypeInt32}, + &Mfield{Name: nodeKeyName, Ty: TypeInt32}, &Mfield{Name: hostKeyName, Ty: TypeStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, ) @@ -51,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema { ) metadataBrokerV1 := NewSchema("metadata_broker_v1", - &Mfield{Name: "node_id", Ty: TypeInt32}, + &Mfield{Name: nodeKeyName, Ty: TypeInt32}, &Mfield{Name: hostKeyName, Ty: TypeStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, &Mfield{Name: "rack", Ty: TypeNullableStr}, ) metadataBrokerSchema9 := NewSchema("metadata_broker_schema9", - &Mfield{Name: "node_id", Ty: TypeInt32}, + &Mfield{Name: nodeKeyName, Ty: TypeInt32}, &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, &Mfield{Name: "rack", Ty: TypeCompactNullableStr}, @@ -248,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema { func createFindCoordinatorResponseSchemaVersions() []Schema { findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0", - &Mfield{Name: "node_id", Ty: TypeInt32}, + &Mfield{Name: nodeKeyName, Ty: TypeInt32}, &Mfield{Name: hostKeyName, Ty: TypeStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, ) findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9", - &Mfield{Name: "node_id", Ty: TypeInt32}, + &Mfield{Name: nodeKeyName, Ty: TypeInt32}, &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, &Mfield{Name: portKeyName, Ty: TypeInt32}, ) @@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu if !ok { return errors.New("broker.port not found") } + nodeId, ok := broker.Get(nodeKeyName).(int32) + if !ok { + return errors.New("broker.node_id not found") + } if host == "" && port <= 0 { continue } - newHost, newPort, err := fn(host, port) + newHost, newPort, err := fn(host, port, nodeId) if err != nil { return err } @@ -336,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu } } if port != newPort { - err = broker.Replace(portKeyName, int32(newPort)) + err = broker.Replace(portKeyName, newPort) if err != nil { return err } @@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e if !ok { return errors.New("coordinator.port not found") } + nodeId, ok := coordinator.Get(nodeKeyName).(int32) + if !ok { + return errors.New("coordinator.node_id not found") + } if host == "" && port <= 0 { return nil } - newHost, newPort, err := fn(host, port) + newHost, newPort, err := fn(host, port, nodeId) if err != nil { return err } diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index c2ef822d..549e9737 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -3,11 +3,12 @@ package protocol import ( "encoding/hex" "fmt" - "github.com/google/uuid" "reflect" "strings" "testing" + "github.com/google/uuid" + "github.com/grepplabs/kafka-proxy/config" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -20,7 +21,7 @@ var ( // topic_metadata 0x00, 0x00, 0x00, 0x00} - testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 51 { return "myhost1", 34001, nil } else if brokerHost == "google.com" && brokerPort == 273 { @@ -31,7 +32,7 @@ var ( return "", 0, errors.New("unexpected data") } - testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 19092 { return "myhost1", 34001, nil } else if brokerHost == "localhost" && brokerPort == 29092 { @@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) { a.Nil(err) a.Equal(bytes, resp) - modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { + modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "localhost" && brokerPort == 51 { return "azure.microsoft.com", 34001, nil } else if brokerHost == "google.com" && brokerPort == 273 { diff --git a/proxy/proxy.go b/proxy/proxy.go index 203635e4..e9af3bf7 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "fmt" "net" + "strconv" "sync" "github.com/grepplabs/kafka-proxy/config" @@ -11,7 +12,7 @@ import ( "github.com/sirupsen/logrus" ) -type ListenFunc func(cfg config.ListenerConfig) (l net.Listener, err error) +type ListenFunc func(cfg *ListenerConfig) (l net.Listener, err error) type Listeners struct { // Source of new connections to Kafka broker. @@ -25,18 +26,15 @@ type Listeners struct { listenFunc ListenFunc + deterministicListeners bool disableDynamicListeners bool dynamicSequentialMinPort int - brokerToListenerConfig map[string]config.ListenerConfig + brokerToListenerConfig map[string]*ListenerConfig lock sync.RWMutex } func NewListeners(cfg *config.Config) (*Listeners, error) { - - defaultListenerIP := cfg.Proxy.DefaultListenerIP - dynamicAdvertisedListener := cfg.Proxy.DynamicAdvertisedListener - tcpConnOptions := TCPConnOptions{ KeepAlive: cfg.Proxy.ListenerKeepAlive, ReadBufferSize: cfg.Proxy.ListenerReadBufferSize, @@ -52,7 +50,7 @@ func NewListeners(cfg *config.Config) (*Listeners, error) { } } - listenFunc := func(cfg config.ListenerConfig) (net.Listener, error) { + listenFunc := func(cfg *ListenerConfig) (net.Listener, error) { if tlsConfig != nil { return tls.Listen("tcp", cfg.ListenerAddress, tlsConfig) } @@ -65,29 +63,30 @@ func NewListeners(cfg *config.Config) (*Listeners, error) { } return &Listeners{ - defaultListenerIP: defaultListenerIP, - dynamicAdvertisedListener: dynamicAdvertisedListener, + defaultListenerIP: cfg.Proxy.DefaultListenerIP, + dynamicAdvertisedListener: cfg.Proxy.DynamicAdvertisedListener, connSrc: make(chan Conn, 1), brokerToListenerConfig: brokerToListenerConfig, tcpConnOptions: tcpConnOptions, listenFunc: listenFunc, + deterministicListeners: cfg.Proxy.DeterministicListeners, disableDynamicListeners: cfg.Proxy.DisableDynamicListeners, dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort, }, nil } -func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerConfig, error) { - brokerToListenerConfig := make(map[string]config.ListenerConfig) +func getBrokerToListenerConfig(cfg *config.Config) (map[string]*ListenerConfig, error) { + brokerToListenerConfig := make(map[string]*ListenerConfig) for _, v := range cfg.Proxy.BootstrapServers { if lc, ok := brokerToListenerConfig[v.BrokerAddress]; ok { if lc.ListenerAddress != v.ListenerAddress || lc.AdvertisedAddress != v.AdvertisedAddress { - return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc) + return nil, fmt.Errorf("bootstrap server mapping %s configured twice: %v and %v", v.BrokerAddress, v, lc.ToListenerConfig()) } continue } logrus.Infof("Bootstrap server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress) - brokerToListenerConfig[v.BrokerAddress] = v + brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v) } externalToListenerConfig := make(map[string]config.ListenerConfig) @@ -112,12 +111,12 @@ func getBrokerToListenerConfig(cfg *config.Config) (map[string]config.ListenerCo continue } logrus.Infof("External server %s advertised as %s", v.BrokerAddress, v.AdvertisedAddress) - brokerToListenerConfig[v.BrokerAddress] = v + brokerToListenerConfig[v.BrokerAddress] = FromListenerConfig(v) } return brokerToListenerConfig, nil } -func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { +func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) { if brokerHost == "" || brokerPort <= 0 { return "", 0, fmt.Errorf("broker address '%s:%d' is invalid", brokerHost, brokerPort) } @@ -129,17 +128,26 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l p.lock.RUnlock() if ok { - logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress) + logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s, brokerId=%d", listenerConfig.GetBrokerAddress(), listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress, brokerId) return util.SplitHostPort(listenerConfig.AdvertisedAddress) } if !p.disableDynamicListeners { logrus.Infof("Starting dynamic listener for broker %s", brokerAddress) - return p.ListenDynamicInstance(brokerAddress) + return p.ListenDynamicInstance(brokerAddress, brokerId) } return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort) } -func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, error) { +func (p *Listeners) findListenerConfig(brokerId int32) *ListenerConfig { + for _, listenerConfig := range p.brokerToListenerConfig { + if listenerConfig.BrokerID == brokerId { + return listenerConfig + } + } + return nil +} + +func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) { p.lock.Lock() defer p.lock.Unlock() // double check @@ -147,12 +155,34 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, return util.SplitHostPort(v.AdvertisedAddress) } - defaultListenerAddress := net.JoinHostPort(p.defaultListenerIP, fmt.Sprint(p.dynamicSequentialMinPort)) - if p.dynamicSequentialMinPort != 0 { - p.dynamicSequentialMinPort += 1 + var listenerAddress string + if p.deterministicListeners { + if brokerId < 0 { + return "", 0, fmt.Errorf("brokerId is negative %s %d", brokerAddress, brokerId) + } + deterministicPort := p.dynamicSequentialMinPort + int(brokerId) + if deterministicPort < p.dynamicSequentialMinPort { + return "", 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, deterministicPort) + } + listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(deterministicPort)) + cfg := p.findListenerConfig(brokerId) + if cfg != nil { + oldBrokerAddress := cfg.GetBrokerAddress() + if oldBrokerAddress != brokerAddress { + delete(p.brokerToListenerConfig, oldBrokerAddress) + cfg.SetBrokerAddress(brokerAddress) + p.brokerToListenerConfig[brokerAddress] = cfg + logrus.Infof("Broker address changed listener %s for new address %s old address %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), oldBrokerAddress, cfg.BrokerID, cfg.AdvertisedAddress) + } + return util.SplitHostPort(cfg.AdvertisedAddress) + } + } else { + listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(p.dynamicSequentialMinPort)) + if p.dynamicSequentialMinPort != 0 { + p.dynamicSequentialMinPort += 1 + } } - - cfg := config.ListenerConfig{ListenerAddress: defaultListenerAddress, BrokerAddress: brokerAddress} + cfg := NewListenerConfig(brokerAddress, listenerAddress, "", brokerId) l, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc) if err != nil { return "", 0, err @@ -164,11 +194,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string) (string, int32, if dynamicAdvertisedListener == "" { dynamicAdvertisedListener = p.defaultListenerIP } + cfg.AdvertisedAddress = net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port)) + cfg.ListenerAddress = address - advertisedAddress := net.JoinHostPort(dynamicAdvertisedListener, fmt.Sprint(port)) - p.brokerToListenerConfig[brokerAddress] = config.ListenerConfig{BrokerAddress: brokerAddress, ListenerAddress: address, AdvertisedAddress: advertisedAddress} - - logrus.Infof("Dynamic listener %s for broker %s advertised as %s", address, brokerAddress, advertisedAddress) + p.brokerToListenerConfig[brokerAddress] = cfg + logrus.Infof("Dynamic listener %s for broker %s brokerId %d advertised as %s", cfg.ListenerAddress, cfg.GetBrokerAddress(), cfg.BrokerID, cfg.AdvertisedAddress) return dynamicAdvertisedListener, int32(port), nil } @@ -179,7 +209,8 @@ func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn, // allows multiple local addresses to point to the remote for _, v := range cfgs { - _, err := listenInstance(p.connSrc, v, p.tcpConnOptions, p.listenFunc) + cfg := FromListenerConfig(v) + _, err := listenInstance(p.connSrc, cfg, p.tcpConnOptions, p.listenFunc) if err != nil { return nil, err } @@ -187,7 +218,7 @@ func (p *Listeners) ListenInstances(cfgs []config.ListenerConfig) (<-chan Conn, return p.connSrc, nil } -func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) { +func listenInstance(dst chan<- Conn, cfg *ListenerConfig, opts TCPConnOptions, listenFunc ListenFunc) (net.Listener, error) { l, err := listenFunc(cfg) if err != nil { return nil, err @@ -196,20 +227,28 @@ func listenInstance(dst chan<- Conn, cfg config.ListenerConfig, opts TCPConnOpti for { c, err := l.Accept() if err != nil { - logrus.Infof("Error in accept for %q on %v: %v", cfg, cfg.ListenerAddress, err) + logrus.Infof("Error in accept for %q on %v: %v", cfg.ToListenerConfig(), cfg.ListenerAddress, err) l.Close() return } if tcpConn, ok := c.(*net.TCPConn); ok { if err := opts.setTCPConnOptions(tcpConn); err != nil { - logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg, l.Addr().String(), err) + logrus.Infof("WARNING: Error while setting TCP options for accepted connection %q on %v: %v", cfg.ToListenerConfig(), l.Addr().String(), err) } } - logrus.Infof("New connection for %s", cfg.BrokerAddress) - dst <- Conn{BrokerAddress: cfg.BrokerAddress, LocalConnection: c} + brokerAddress := cfg.GetBrokerAddress() + if cfg.BrokerID != UnknownBrokerID { + logrus.Infof("New connection for %s brokerId %d", brokerAddress, cfg.BrokerID) + } else { + logrus.Infof("New connection for %s", brokerAddress) + } + dst <- Conn{BrokerAddress: brokerAddress, LocalConnection: c} } }) - - logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.BrokerAddress) + if cfg.BrokerID != UnknownBrokerID { + logrus.Infof("Listening on %s (%s) for remote %s broker %d", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress(), cfg.BrokerID) + } else { + logrus.Infof("Listening on %s (%s) for remote %s", cfg.ListenerAddress, l.Addr().String(), cfg.GetBrokerAddress()) + } return l, nil } diff --git a/proxy/proxy_config.go b/proxy/proxy_config.go new file mode 100644 index 00000000..79a17322 --- /dev/null +++ b/proxy/proxy_config.go @@ -0,0 +1,54 @@ +package proxy + +import ( + "github.com/grepplabs/kafka-proxy/config" + "sync/atomic" +) + +const UnknownBrokerID = -1 + +type ListenerConfig struct { + BrokerAddressPtr atomic.Pointer[string] + ListenerAddress string + AdvertisedAddress string + BrokerID int32 +} + +func FromListenerConfig(listenerConfig config.ListenerConfig) *ListenerConfig { + c := &ListenerConfig{ + ListenerAddress: listenerConfig.ListenerAddress, + AdvertisedAddress: listenerConfig.AdvertisedAddress, + BrokerID: UnknownBrokerID, + } + c.BrokerAddressPtr.Store(&listenerConfig.BrokerAddress) + return c +} + +func NewListenerConfig(brokerAddress, listenerAddress, advertisedAddress string, brokerID int32) *ListenerConfig { + c := &ListenerConfig{ + ListenerAddress: listenerAddress, + AdvertisedAddress: advertisedAddress, + BrokerID: brokerID, + } + c.BrokerAddressPtr.Store(&brokerAddress) + return c +} +func (c *ListenerConfig) ToListenerConfig() config.ListenerConfig { + return config.ListenerConfig{ + BrokerAddress: c.GetBrokerAddress(), + ListenerAddress: c.ListenerAddress, + AdvertisedAddress: c.AdvertisedAddress, + } +} + +func (c *ListenerConfig) GetBrokerAddress() string { + addressPtr := c.BrokerAddressPtr.Load() + if addressPtr == nil { + return "" + } + return *addressPtr +} + +func (c *ListenerConfig) SetBrokerAddress(address string) { + c.BrokerAddressPtr.Store(&address) +} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 0bcb8515..30242cfd 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -2,9 +2,10 @@ package proxy import ( "fmt" + "testing" + "github.com/grepplabs/kafka-proxy/config" "github.com/stretchr/testify/assert" - "testing" ) func TestGetBrokerToListenerConfig(t *testing.T) { @@ -24,7 +25,11 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, nil, @@ -38,9 +43,21 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"}, - {"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32401", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, + { + BrokerAddress: "192.168.99.100:32402", + ListenerAddress: "0.0.0.0:32402", + AdvertisedAddress: "kafka-proxy-0:32402", + }, }, []config.ListenerConfig{}, nil, @@ -64,8 +81,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, nil, @@ -79,8 +104,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32401", "0.0.0.0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "0.0.0.0:32400", + }, }, []config.ListenerConfig{}, fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32401 0.0.0.0:32400} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"), @@ -88,8 +121,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32400"}, - {"192.168.99.100:32400", "0.0.0.0:32400", "0.0.0.0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "0.0.0.0:32401", + }, }, []config.ListenerConfig{}, fmt.Errorf("bootstrap server mapping 192.168.99.100:32400 configured twice: {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32401} and {192.168.99.100:32400 0.0.0.0:32400 0.0.0.0:32400}"), @@ -97,13 +138,32 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32401", "0.0.0.0:32401", "kafka-proxy-0:32401"}, - {"192.168.99.100:32402", "0.0.0.0:32402", "kafka-proxy-0:32402"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32401", + ListenerAddress: "0.0.0.0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, + { + BrokerAddress: "192.168.99.100:32402", + ListenerAddress: "0.0.0.0:32402", + AdvertisedAddress: "kafka-proxy-0:32402", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32403", "kafka-proxy-0:32403", "kafka-proxy-0:32403"}, - {"192.168.99.100:32404", "kafka-proxy-0:32404", "kafka-proxy-0:32404"}, + { + BrokerAddress: "192.168.99.100:32403", + ListenerAddress: "kafka-proxy-0:32403", + AdvertisedAddress: "kafka-proxy-0:32403"}, + { + BrokerAddress: "192.168.99.100:32404", + ListenerAddress: "kafka-proxy-0:32404", + AdvertisedAddress: "kafka-proxy-0:32404", + }, }, nil, map[string]config.ListenerConfig{ @@ -136,10 +196,17 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400"}, }, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, nil, map[string]config.ListenerConfig{ @@ -152,10 +219,18 @@ func TestGetBrokerToListenerConfig(t *testing.T) { }, { []config.ListenerConfig{ - {"192.168.99.100:32400", "0.0.0.0:32400", "kafka-proxy-0:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "0.0.0.0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, }, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-1:32400", "kafka-proxy-1:32400"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-1:32400", + AdvertisedAddress: "kafka-proxy-1:32400", + }, }, fmt.Errorf("bootstrap and external server mappings 192.168.99.100:32400 with different advertised addresses: kafka-proxy-1:32400 and kafka-proxy-0:32400"), nil, @@ -163,7 +238,11 @@ func TestGetBrokerToListenerConfig(t *testing.T) { { []config.ListenerConfig{}, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32401", + }, }, fmt.Errorf("external server mapping has different listener and advertised addresses {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32401}"), nil, @@ -171,8 +250,16 @@ func TestGetBrokerToListenerConfig(t *testing.T) { { []config.ListenerConfig{}, []config.ListenerConfig{ - {"192.168.99.100:32400", "kafka-proxy-0:32400", "kafka-proxy-0:32400"}, - {"192.168.99.100:32400", "kafka-proxy-0:32401", "kafka-proxy-0:32401"}, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32400", + AdvertisedAddress: "kafka-proxy-0:32400", + }, + { + BrokerAddress: "192.168.99.100:32400", + ListenerAddress: "kafka-proxy-0:32401", + AdvertisedAddress: "kafka-proxy-0:32401", + }, }, fmt.Errorf("external server mapping 192.168.99.100:32400 configured twice: kafka-proxy-0:32401 and {192.168.99.100:32400 kafka-proxy-0:32400 kafka-proxy-0:32400}"), nil, @@ -182,8 +269,17 @@ func TestGetBrokerToListenerConfig(t *testing.T) { c := &config.Config{} c.Proxy.BootstrapServers = tt.bootstrapServers c.Proxy.ExternalServers = tt.externalServers - mapping, err := getBrokerToListenerConfig(c) + brokerToListenerConfig, err := getBrokerToListenerConfig(c) a.Equal(tt.err, err) - a.Equal(tt.mapping, mapping) + + mapping := make(map[string]config.ListenerConfig) + for k, v := range brokerToListenerConfig { + mapping[k] = config.ListenerConfig{ + BrokerAddress: v.GetBrokerAddress(), + ListenerAddress: v.ListenerAddress, + AdvertisedAddress: v.AdvertisedAddress, + } + } + assert.ObjectsAreEqual(tt.mapping, mapping) } }