Skip to content

kafka 3.8 / 3.9 support #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 as builder
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.all
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 as builder
FROM --platform=$BUILDPLATFORM golang:1.22-alpine3.19 AS builder
RUN apk add alpine-sdk ca-certificates

ARG TARGETOS
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ VERSION ?= $(shell git describe --tags --always --dirty)
GOPKGS = $(shell go list ./... | grep -v /vendor/)
BUILD_FLAGS ?=
LDFLAGS ?= -X github.com/grepplabs/kafka-proxy/config.Version=$(VERSION) -w -s
TAG ?= "v0.3.11"
TAG ?= "v0.3.12"
GOOS ?= $(if $(TARGETOS),$(TARGETOS),linux)
GOARCH ?= $(if $(TARGETARCH),$(TARGETARCH),amd64)
GOARM ?= $(TARGETVARIANT)
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ As not every Kafka release adds new messages/versions which are relevant to the
| 0.2.9 | to 2.8.0 |
| 0.3.1 | to 3.4.0 |
| 0.3.11 | to 3.7.0 |
| 0.3.12 | to 3.9.0 |

### Install binary release

1. Download the latest release

Linux

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.11/kafka-proxy-v0.3.11-linux-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-linux-amd64.tar.gz | tar xz

macOS

curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.11/kafka-proxy-v0.3.11-darwin-amd64.tar.gz | tar xz
curl -Ls https://github.com/grepplabs/kafka-proxy/releases/download/v0.3.12/kafka-proxy-v0.3.12-darwin-amd64.tar.gz | tar xz

2. Move the binary in to your PATH.

Expand All @@ -69,7 +70,7 @@ Docker images are available on [Docker Hub](https://hub.docker.com/r/grepplabs/k

You can launch a kafka-proxy container for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.11 \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12 \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand All @@ -88,7 +89,7 @@ Docker images with precompiled plugins located in `/opt/kafka-proxy/bin/` are ta

You can launch a kafka-proxy container with auth-ldap plugin for trying it out with

docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.11-all \
docker run --rm -p 30001-30003:30001-30003 grepplabs/kafka-proxy:0.3.12-all \
server \
--bootstrap-server-mapping "localhost:19092,0.0.0.0:30001" \
--bootstrap-server-mapping "localhost:29092,0.0.0.0:30002" \
Expand Down
3 changes: 1 addition & 2 deletions cmd/plugin-auth-ldap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io/ioutil"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -317,7 +316,7 @@ func getTlsConfig(caCertFile string, insecureSkipVerify bool) (*tls.Config, erro
if caCertFile == "" {
return &tls.Config{InsecureSkipVerify: insecureSkipVerify}, nil
} else {
certData, err := ioutil.ReadFile(caCertFile)
certData, err := os.ReadFile(caCertFile)
if err != nil {
return nil, errors.Wrapf(err, "reading certificate file %s", caCertFile)
}
Expand Down
4 changes: 2 additions & 2 deletions config/jaas.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"errors"
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
)
Expand All @@ -19,7 +19,7 @@ type JaasCredentials struct {
}

func NewJaasCredentialFromFile(filename string) (*JaasCredentials, error) {
bytes, err := ioutil.ReadFile(filename)
bytes, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions config/jaas_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package config

import (
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestExtractsJaasCredentials(t *testing.T) {
Expand All @@ -30,7 +30,7 @@ func TestExtractsJaasCredentialsFromFile(t *testing.T) {
password="veyaiThai5que0ieb5le";
};
`
tmpFile, err := ioutil.TempFile("", "kafka-proxy-jaas-test")
tmpFile, err := os.CreateTemp("", "kafka-proxy-jaas-test")
assert.Nil(t, err)
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/libs/googleid/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"encoding/json"
"fmt"
"golang.org/x/net/context/ctxhttp"
"io/ioutil"
"io"
"math/big"
"net/http"
"time"
Expand Down Expand Up @@ -59,7 +59,7 @@ func GetCerts(ctx context.Context) (*Certs, error) {
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/libs/googleid/service_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jws"
"google.golang.org/api/oauth2/v2"
"io/ioutil"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"
)
Expand All @@ -32,7 +33,7 @@ type ServiceAccountTokenSource struct {
}

func NewServiceAccountTokenSource(credentialsFile string, targetAudience string) (*ServiceAccountTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFile)
data, err := os.ReadFile(credentialsFile)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,7 +131,7 @@ func doExchange(ctx context.Context, token string) ([]byte, error) {
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/libs/oidc-provider/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sync"
"time"
Expand Down Expand Up @@ -210,7 +209,7 @@ func getTokenResponse(token string, status int) (apis.TokenResponse, error) {
}

func getTokenSource(credentialsFilePath string, targetAud string) (idTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFilePath)
data, err := os.ReadFile(credentialsFilePath)

if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/libs/oidc/password_grant.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"os"

"github.com/sirupsen/logrus"
"golang.org/x/oauth2"
Expand All @@ -28,7 +28,7 @@ type PasswordGrantTokenSource struct {
}

func NewPasswordGrantTokenSource(credentialsFile string, targetAudience string) (*PasswordGrantTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFile)
data, err := os.ReadFile(credentialsFile)
source := &PasswordGrantTokenSource{}

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/libs/oidc/service_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"io/ioutil"
"os"

"github.com/sirupsen/logrus"
"golang.org/x/oauth2/clientcredentials"
Expand All @@ -26,7 +26,7 @@ type ServiceAccountTokenSource struct {
}

func NewServiceAccountTokenSource(credentialsFile string, targetAudience string) (*ServiceAccountTokenSource, error) {
data, err := ioutil.ReadFile(credentialsFile)
data, err := os.ReadFile(credentialsFile)
source := &ServiceAccountTokenSource{}

if err != nil {
Expand Down
23 changes: 11 additions & 12 deletions pkg/libs/util/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package util
import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"path/filepath"
"sync/atomic"
Expand All @@ -16,18 +15,18 @@ func TestWatchRegularFileChange(t *testing.T) {

a := assert.New(t)

dirName, err := ioutil.TempDir("", "watcher-test-")
dirName, err := os.MkdirTemp("", "watcher-test-")
a.Nil(err)
defer os.Remove(dirName)

targetSecret, err := ioutil.TempFile(dirName, "secret-")
targetSecret, err := os.CreateTemp(dirName, "secret-")
a.Nil(err)
defer os.Remove(targetSecret.Name())

_, err = targetSecret.WriteString("secret1")
a.Nil(err)

data, err := ioutil.ReadFile(targetSecret.Name())
data, err := os.ReadFile(targetSecret.Name())
a.Nil(err)
a.Equal("secret1", string(data))

Expand Down Expand Up @@ -59,7 +58,7 @@ func TestWatchRegularFileChange(t *testing.T) {
opsFinal := atomic.LoadInt32(&ops)
a.Equal(int32(1), opsFinal)

data, err = ioutil.ReadFile(targetSecret.Name())
data, err = os.ReadFile(targetSecret.Name())
a.Nil(err)
a.Equal("secret1addition", string(data))
}
Expand All @@ -74,23 +73,23 @@ func TestWatchLinkedFileChange(t *testing.T) {
*/
a := assert.New(t)

dirName, err := ioutil.TempDir("", "watcher-test-")
dirName, err := os.MkdirTemp("", "watcher-test-")
a.Nil(err)
defer os.Remove(dirName)

dirTmp1, err := ioutil.TempDir(dirName, "tmp1-")
dirTmp1, err := os.MkdirTemp(dirName, "tmp1-")
a.Nil(err)
defer os.Remove(dirTmp1)

dirTmp2, err := ioutil.TempDir(dirName, "tmp2-")
dirTmp2, err := os.MkdirTemp(dirName, "tmp2-")
a.Nil(err)
defer os.Remove(dirTmp2)

targetSecret1, err := ioutil.TempFile(dirTmp1, "secret-")
targetSecret1, err := os.CreateTemp(dirTmp1, "secret-")
a.Nil(err)
defer os.Remove(targetSecret1.Name())

targetSecret2, err := ioutil.TempFile(dirTmp2, "secret-")
targetSecret2, err := os.CreateTemp(dirTmp2, "secret-")
a.Nil(err)
defer os.Remove(targetSecret2.Name())

Expand All @@ -110,7 +109,7 @@ func TestWatchLinkedFileChange(t *testing.T) {
a.Nil(err)
defer os.Remove(secretLink)

data, err := ioutil.ReadFile(secretLink)
data, err := os.ReadFile(secretLink)
a.Nil(err)
a.Equal("secret1", string(data))

Expand Down Expand Up @@ -145,7 +144,7 @@ func TestWatchLinkedFileChange(t *testing.T) {
opsFinal := atomic.LoadInt32(&ops)
a.Equal(int32(1), opsFinal)

data, err = ioutil.ReadFile(secretLink)
data, err = os.ReadFile(secretLink)
a.Nil(err)
a.Equal("secret2", string(data))
}
2 changes: 1 addition & 1 deletion proxy/processor_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (handler *DefaultRequestHandler) mustReply(requestKeyVersion *protocol.Requ
return false, nil, err
}

case 3, 4, 5, 6, 7, 8, 9, 10:
case 3, 4, 5, 6, 7, 8, 9, 10, 11:
// CorrelationID + ClientID
if err = acksReader.ReadAndDiscardHeaderV1Part(reader); err != nil {
return false, nil, err
Expand Down
26 changes: 26 additions & 0 deletions proxy/protocol/request_key_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,32 @@ func (r *RequestKeyVersion) ResponseHeaderVersion() int16 {
return 1
case 74: // ListClientMetricsResources
return 1
case 75: // DescribeTopicPartitions
return 1
case 76: // ShareGroupHeartbeat
return 1
case 77: // ShareGroupDescribe
return 1
case 78: // ShareFetch
return 1
case 79: // ShareAcknowledge
return 1
case 80: // AddRaftVoter
return 1
case 81: // RemoveRaftVoter
return 1
case 82: // UpdateRaftVoter
return 1
case 83: // InitializeShareGroupState
return 1
case 84: // ReadShareGroupState
return 1
case 85: // WriteShareGroupState
return 1
case 86: // DeleteShareGroupState
return 1
case 87: // ReadShareGroupStateSummary
return 1
default:
// throw new UnsupportedVersionException("Unsupported API key " + apiKey);
return -1
Expand Down
3 changes: 1 addition & 2 deletions proxy/protocol/request_produce_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package protocol
import (
"encoding/binary"
"io"
"io/ioutil"
)

type RequestAcksReader struct {
Expand All @@ -18,7 +17,7 @@ func (r RequestAcksReader) readAndDiscardNullableString(reader io.Reader) (err e
return errInvalidStringLength
}
if length > 0 {
if _, err = io.CopyN(ioutil.Discard, reader, int64(length)); err != nil {
if _, err = io.CopyN(io.Discard, reader, int64(length)); err != nil {
return err
}
}
Expand Down
3 changes: 1 addition & 2 deletions proxy/protocol/response_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"github.com/pkg/errors"
"io"
"io/ioutil"
)

type ResponseHeader struct {
Expand Down Expand Up @@ -78,7 +77,7 @@ func (r *ResponseHeaderTaggedFields) MaybeRead(reader io.Reader) ([]byte, error)
} else if size == 0 {
continue
} else {
if _, err := io.CopyN(ioutil.Discard, reader, int64(size)); err != nil {
if _, err := io.CopyN(io.Discard, reader, int64(size)); err != nil {
return nil, errors.Wrap(err, "error while reading tagged field data")
}
}
Expand Down
5 changes: 4 additions & 1 deletion proxy/protocol/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ func createFindCoordinatorResponseSchemaVersions() []Schema {
&CompactArray{Name: coordinatorsKeyName, Ty: findCoordinatorCoordinatorsSchema4},
&SchemaTaggedFields{Name: "response_tagged_fields"},
)
return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4}
findCoordinatorResponseV5 := findCoordinatorResponseV4
findCoordinatorResponseV6 := findCoordinatorResponseV5

return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6}
}

func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error {
Expand Down
Loading
Loading