From 3c728ef7a56374ff6b08b6e733417ef239211433 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 14 Jun 2025 11:52:19 +0200 Subject: [PATCH 1/8] fix: handle compression errors --- pkg/stream/aggregation.go | 125 ++++++++++++++++++------------------ pkg/stream/buffer_writer.go | 15 +++-- pkg/stream/producer.go | 5 +- 3 files changed, 76 insertions(+), 69 deletions(-) diff --git a/pkg/stream/aggregation.go b/pkg/stream/aggregation.go index af358054..e31ef6b7 100644 --- a/pkg/stream/aggregation.go +++ b/pkg/stream/aggregation.go @@ -5,11 +5,12 @@ import ( "bytes" "compress/gzip" "fmt" + "io" + "github.com/golang/snappy" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" - "io" ) const ( @@ -64,12 +65,11 @@ type subEntries struct { } type iCompress interface { - Compress(subEntries *subEntries) + Compress(subEntries *subEntries) error UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader } func compressByValue(value byte) iCompress { - switch value { case GZIP: return compressGZIP{} @@ -84,10 +84,9 @@ func compressByValue(value byte) iCompress { return compressNONE{} } -type compressNONE struct { -} +type compressNONE struct{} -func (es compressNONE) Compress(subEntries *subEntries) { +func (es compressNONE) Compress(subEntries *subEntries) error { for _, entry := range subEntries.items { var tmp bytes.Buffer for _, msg := range entry.messages { @@ -98,6 +97,8 @@ func (es compressNONE) Compress(subEntries *subEntries) { entry.sizeInBytes += len(entry.dataInBytes) subEntries.totalSizeInBytes += len(entry.dataInBytes) } + + return nil } func (es compressNONE) UnCompress(source *bufio.Reader, _, _ uint32) *bufio.Reader { @@ -107,25 +108,32 @@ func (es compressNONE) UnCompress(source *bufio.Reader, _, _ uint32) *bufio.Read type compressGZIP struct { } -func (es compressGZIP) Compress(subEntries *subEntries) { +func (es compressGZIP) Compress(subEntries *subEntries) error { for _, entry := range subEntries.items { var tmp bytes.Buffer w := gzip.NewWriter(&tmp) + for _, msg := range entry.messages { - size := len(msg.messageBytes) - //w.Write(bytesFromInt((uint32(size) >> 24) & 0xFF)) - //w.Write(bytesFromInt((uint32(size) >> 16) & 0xFF)) - //w.Write(bytesFromInt((uint32(size) >> 8) & 0xFF)) - //w.Write(bytesFromInt((uint32(size) >> 0) & 0xFF)) - w.Write(bytesFromInt(uint32(size))) - w.Write(msg.messageBytes) + prefixedMsg := bytesLenghPrefixed(msg.messageBytes) + if _, err := w.Write(prefixedMsg); err != nil { + return fmt.Errorf("failed to write message size to gzip writer: %w", err) + } + } + + if err := w.Flush(); err != nil { + return fmt.Errorf("failed to flush gzip writer: %w", err) + } + + if err := w.Close(); err != nil { + return fmt.Errorf("failed to close gzip writer: %w", err) } - w.Flush() - w.Close() + entry.sizeInBytes += len(tmp.Bytes()) entry.dataInBytes = tmp.Bytes() subEntries.totalSizeInBytes += len(tmp.Bytes()) } + + return nil } func (es compressGZIP) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { @@ -161,36 +169,33 @@ func (es compressGZIP) UnCompress(source *bufio.Reader, dataSize, uncompressedDa return bufio.NewReader(bytes.NewReader(uncompressedReader)) } -type compressSnappy struct { -} +type compressSnappy struct{} -func (es compressSnappy) Compress(subEntries *subEntries) { +func (es compressSnappy) Compress(subEntries *subEntries) error { for _, entry := range subEntries.items { var tmp bytes.Buffer w := snappy.NewBufferedWriter(&tmp) for _, msg := range entry.messages { - size := len(msg.messageBytes) - if _, err := w.Write(bytesFromInt(uint32(size))); err != nil { - logs.LogError("Error compressing with Snappy %v", err) - //return err // TODO we should return error if we cannot compress - } - if _, err := w.Write(msg.messageBytes); err != nil { - logs.LogError("Error compressing with Snappy %v", err) - //return err // TODO we should return error if we cannot compress + prefixedMsg := bytesLenghPrefixed(msg.messageBytes) + if _, err := w.Write(prefixedMsg); err != nil { + return fmt.Errorf("failed to write message size to snappy writer: %w", err) } } + if err := w.Flush(); err != nil { - logs.LogError("Error compressing with Snappy %v", err) - //return err // TODO we should return error if we cannot compress + return fmt.Errorf("failed to flush snappy writer: %w", err) } + if err := w.Close(); err != nil { - logs.LogError("Error compressing with Snappy %v", err) - //return err // TODO we should return error if we cannot compress + return fmt.Errorf("failed to close snappy writer: %w", err) } + entry.sizeInBytes += len(tmp.Bytes()) entry.dataInBytes = tmp.Bytes() subEntries.totalSizeInBytes += len(tmp.Bytes()) } + + return nil } func (es compressSnappy) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { @@ -226,36 +231,34 @@ func (es compressSnappy) UnCompress(source *bufio.Reader, dataSize, uncompressed } -type compressLZ4 struct { -} +type compressLZ4 struct{} -func (es compressLZ4) Compress(subEntries *subEntries) { +func (es compressLZ4) Compress(subEntries *subEntries) error { for _, entry := range subEntries.items { var tmp bytes.Buffer w := lz4.NewWriter(&tmp) + for _, msg := range entry.messages { - size := len(msg.messageBytes) - if _, err := w.Write(bytesFromInt(uint32(size))); err != nil { - logs.LogError("Error compressing with LZ4 %v", err) - //return err // TODO we should return error if we cannot compress - } - if _, err := w.Write(msg.messageBytes); err != nil { - logs.LogError("Error compressing with LZ4 %v", err) - //return err // TODO we should return error if we cannot compress + prefixedMsg := bytesLenghPrefixed(msg.messageBytes) + if _, err := w.Write(prefixedMsg); err != nil { + return fmt.Errorf("failed to write message size to LZ4 writer: %w", err) } } + if err := w.Flush(); err != nil { - logs.LogError("Error compressing with LZ4 %v", err) - //return err // TODO we should return error if we cannot compress + return fmt.Errorf("failed to flush LZ4 writer: %w", err) } + if err := w.Close(); err != nil { - logs.LogError("Error compressing with LZ4 %v", err) - //return err // TODO we should return error if we cannot compress + return fmt.Errorf("failed to close LZ4 writer: %w", err) } + entry.sizeInBytes += len(tmp.Bytes()) entry.dataInBytes = tmp.Bytes() subEntries.totalSizeInBytes += len(tmp.Bytes()) } + + return nil } func (es compressLZ4) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { @@ -288,41 +291,37 @@ func (es compressLZ4) UnCompress(source *bufio.Reader, dataSize, uncompressedDat } -type compressZSTD struct { -} +type compressZSTD struct{} -func (es compressZSTD) Compress(subEntries *subEntries) { +func (es compressZSTD) Compress(subEntries *subEntries) error { for _, entry := range subEntries.items { var tmp bytes.Buffer w, err := zstd.NewWriter(&tmp) if err != nil { - logs.LogError("Error creating ZSTD compression algorithm writer %v", err) - //return err // TODO we should return error if we cannot compress + return fmt.Errorf("error creating ZSTD compression algorithm writer %w", err) } for _, msg := range entry.messages { - size := len(msg.messageBytes) - if _, err := w.Write(bytesFromInt(uint32(size))); err != nil { - logs.LogError("Error compressing with ZSTD %v", err) - //return err // TODO we should return error if we cannot compress - } - if _, err := w.Write(msg.messageBytes); err != nil { - logs.LogError("Error compressing with ZSTD %v", err) - //return err // TODO we should return error if we cannot compress + prefixedMsg := bytesLenghPrefixed(msg.messageBytes) + if _, err := w.Write(prefixedMsg); err != nil { + return fmt.Errorf("failed to write message size to ZSTD writer: %w", err) } } + if err := w.Flush(); err != nil { - logs.LogError("Error compressing with ZSTD %v", err) - //return err // TODO we should return error if we cannot compress + return fmt.Errorf("failed to flush ZSTD writer: %w", err) } + if err := w.Close(); err != nil { - logs.LogError("Error compressing with ZSTD %v", err) - //return err // TODO we should return error if we cannot compress + return fmt.Errorf("failed to close ZSTD writer: %w", err) } + entry.sizeInBytes += len(tmp.Bytes()) entry.dataInBytes = tmp.Bytes() subEntries.totalSizeInBytes += len(tmp.Bytes()) } + + return nil } func (es compressZSTD) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { diff --git a/pkg/stream/buffer_writer.go b/pkg/stream/buffer_writer.go index dc5d3180..f006e6b7 100644 --- a/pkg/stream/buffer_writer.go +++ b/pkg/stream/buffer_writer.go @@ -80,11 +80,6 @@ func writeBUInt(inputBuff *bufio.Writer, value uint32) { inputBuff.Write(buff) } -func bytesFromInt(value uint32) []byte { - var buff = make([]byte, 4) - binary.BigEndian.PutUint32(buff, value) - return buff -} func writeString(inputBuff *bytes.Buffer, value string) { writeUShort(inputBuff, uint16(len(value))) inputBuff.Write([]byte(value)) @@ -157,3 +152,13 @@ func sizeOfMapStringString(mapString map[string]string) int { } return size } + +func bytesLenghPrefixed(msg []byte) []byte { + size := len(msg) + buff := make([]byte, 4+size) + + binary.BigEndian.PutUint32(buff, uint32(size)) + copy(buff[4:], msg) + + return buff +} diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 60c72486..96b2f172 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -531,7 +531,10 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c } } - compressByValue(compression.value).Compress(&subEntries) + err := compressByValue(compression.value).Compress(&subEntries) + if err != nil { + return subEntries, err + } return subEntries, nil } From 9d0af79bc7f071c3968a771fd0adc9b4f6d9186e Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 14 Jun 2025 12:07:46 +0200 Subject: [PATCH 2/8] fix: handle uncompression errors --- pkg/stream/aggregation.go | 115 +++++++++++++-------------------- pkg/stream/aggregation_test.go | 24 +++---- pkg/stream/server_frame.go | 10 ++- 3 files changed, 66 insertions(+), 83 deletions(-) diff --git a/pkg/stream/aggregation.go b/pkg/stream/aggregation.go index e31ef6b7..ac67a53e 100644 --- a/pkg/stream/aggregation.go +++ b/pkg/stream/aggregation.go @@ -10,7 +10,6 @@ import ( "github.com/golang/snappy" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) const ( @@ -66,7 +65,7 @@ type subEntries struct { type iCompress interface { Compress(subEntries *subEntries) error - UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader + UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) (*bufio.Reader, error) } func compressByValue(value byte) iCompress { @@ -101,8 +100,8 @@ func (es compressNONE) Compress(subEntries *subEntries) error { return nil } -func (es compressNONE) UnCompress(source *bufio.Reader, _, _ uint32) *bufio.Reader { - return source +func (es compressNONE) UnCompress(source *bufio.Reader, _, _ uint32) (*bufio.Reader, error) { + return source, nil } type compressGZIP struct { @@ -136,37 +135,32 @@ func (es compressGZIP) Compress(subEntries *subEntries) error { return nil } -func (es compressGZIP) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { - +func (es compressGZIP) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) (*bufio.Reader, error) { var zipperBuffer = make([]byte, dataSize) - /// empty - _, err := io.ReadFull(source, zipperBuffer) - /// array of compress data - - if err != nil { - logs.LogError("GZIP Error during reading buffer %s", err) + if _, err := io.ReadFull(source, zipperBuffer); err != nil { + return nil, fmt.Errorf("GZIP error during reading buffer %s", err) } reader, err := gzip.NewReader(bytes.NewBuffer(zipperBuffer)) - if err != nil { - logs.LogError("Error creating GZIP NewReader %s", err) + return nil, fmt.Errorf("error creating GZIP NewReader %s", err) } + //nolint:errcheck defer reader.Close() - /// headers ---> payload --> headers --> payload (compressed) + // headers --> payload --> headers --> payload (compressed) // Read in data. uncompressedReader, err := io.ReadAll(reader) if err != nil { - logs.LogError("Error during reading buffer %s", err) + return nil, fmt.Errorf("error during reading buffer %s", err) } + if uint32(len(uncompressedReader)) != uncompressedDataSize { - panic("uncompressedDataSize != count") + return nil, fmt.Errorf("uncompressedDataSize != count") } - /// headers ---> payload --> headers --> payload (compressed) --> uncompressed payload - - return bufio.NewReader(bytes.NewReader(uncompressedReader)) + // headers --> payload --> headers --> payload (compressed) --> uncompressed payload + return bufio.NewReader(bytes.NewReader(uncompressedReader)), nil } type compressSnappy struct{} @@ -198,37 +192,29 @@ func (es compressSnappy) Compress(subEntries *subEntries) error { return nil } -func (es compressSnappy) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { - +func (es compressSnappy) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) (*bufio.Reader, error) { var zipperBuffer = make([]byte, dataSize) - /// empty - _, err := io.ReadFull(source, zipperBuffer) - /// array of compress data - - if err != nil { - logs.LogError("SNAPPY Error during reading buffer %s", err) + // array of compress data + if _, err := io.ReadFull(source, zipperBuffer); err != nil { + return nil, fmt.Errorf("SNAPPY error during reading buffer %s", err) } reader := snappy.NewReader(bytes.NewBuffer(zipperBuffer)) - if err != nil { - logs.LogError("Error creating SNAPPY NewReader %s", err) - } defer reader.Reset(nil) - /// headers ---> payload --> headers --> payload (compressed) + // headers --> payload --> headers --> payload (compressed) // Read in data. uncompressedReader, err := io.ReadAll(reader) if err != nil { - logs.LogError("Error during reading buffer %s", err) + return nil, fmt.Errorf("error during reading buffer %s", err) } + if uint32(len(uncompressedReader)) != uncompressedDataSize { - panic("uncompressedDataSize != count") + return nil, fmt.Errorf("uncompressedDataSize != count") } - /// headers ---> payload --> headers --> payload (compressed) --> uncompressed payload - - return bufio.NewReader(bytes.NewReader(uncompressedReader)) - + // headers --> payload --> headers --> payload (compressed) --> uncompressed payload + return bufio.NewReader(bytes.NewReader(uncompressedReader)), nil } type compressLZ4 struct{} @@ -261,33 +247,29 @@ func (es compressLZ4) Compress(subEntries *subEntries) error { return nil } -func (es compressLZ4) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { - +func (es compressLZ4) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) (*bufio.Reader, error) { var zipperBuffer = make([]byte, dataSize) - /// empty - _, err := io.ReadFull(source, zipperBuffer) - /// array of compress data - - if err != nil { - logs.LogError("LZ4 Error during reading buffer %s", err) + // array of compress data + if _, err := io.ReadFull(source, zipperBuffer); err != nil { + return nil, fmt.Errorf("LZ4 error during reading buffer %s", err) } reader := lz4.NewReader(bytes.NewBuffer(zipperBuffer)) defer reader.Reset(nil) - /// headers ---> payload --> headers --> payload (compressed) + // headers --> payload --> headers --> payload (compressed) // Read in data. uncompressedReader, err := io.ReadAll(reader) if err != nil { - logs.LogError("Error during reading buffer %s", err) + return nil, fmt.Errorf("error during reading buffer %s", err) } + if uint32(len(uncompressedReader)) != uncompressedDataSize { - panic("uncompressedDataSize != count") + return nil, fmt.Errorf("uncompressedDataSize != count") } - /// headers ---> payload --> headers --> payload (compressed) --> uncompressed payload - - return bufio.NewReader(bytes.NewReader(uncompressedReader)) + // headers --> payload --> headers --> payload (compressed) --> uncompressed payload + return bufio.NewReader(bytes.NewReader(uncompressedReader)), nil } @@ -324,35 +306,30 @@ func (es compressZSTD) Compress(subEntries *subEntries) error { return nil } -func (es compressZSTD) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) *bufio.Reader { - +func (es compressZSTD) UnCompress(source *bufio.Reader, dataSize, uncompressedDataSize uint32) (*bufio.Reader, error) { var zipperBuffer = make([]byte, dataSize) - /// empty - _, err := io.ReadFull(source, zipperBuffer) - /// array of compress data - - if err != nil { - logs.LogError("ZSTD Error during reading buffer %s", err) + // array of compress data + if _, err := io.ReadFull(source, zipperBuffer); err != nil { + return nil, fmt.Errorf("ZSTD error during reading buffer %s", err) } reader, err := zstd.NewReader(bytes.NewBuffer(zipperBuffer)) if err != nil { - logs.LogError("Error creating ZSTD NewReader %s", err) + return nil, fmt.Errorf("error creating ZSTD NewReader %s", err) } - defer reader.Reset(nil) - /// headers ---> payload --> headers --> payload (compressed) + defer reader.Close() + // headers --> payload --> headers --> payload (compressed) // Read in data. uncompressedReader, err := io.ReadAll(reader) if err != nil { - logs.LogError("Error during reading buffer %s", err) + return nil, fmt.Errorf("error during reading buffer %s", err) } + if uint32(len(uncompressedReader)) != uncompressedDataSize { - panic("uncompressedDataSize != count") + return nil, fmt.Errorf("uncompressedDataSize != count") } - /// headers ---> payload --> headers --> payload (compressed) --> uncompressed payload - - return bufio.NewReader(bytes.NewReader(uncompressedReader)) - + // headers --> payload --> headers --> payload (compressed) --> uncompressed payload + return bufio.NewReader(bytes.NewReader(uncompressedReader)), nil } diff --git a/pkg/stream/aggregation_test.go b/pkg/stream/aggregation_test.go index 8ec901ff..2b5f10c7 100644 --- a/pkg/stream/aggregation_test.go +++ b/pkg/stream/aggregation_test.go @@ -3,12 +3,12 @@ package stream import ( "bufio" "bytes" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) var _ = Describe("Compression algorithms", func() { - var entries *subEntries BeforeEach(func() { @@ -35,46 +35,48 @@ var _ = Describe("Compression algorithms", func() { }) It("NONE", func() { - compressNONE{}.Compress(entries) + err := compressNONE{}.Compress(entries) + Expect(err).NotTo(HaveOccurred()) Expect(entries.totalSizeInBytes).To(Equal(entries.items[0].sizeInBytes)) Expect(entries.totalSizeInBytes).To(Equal(entries.items[0].unCompressedSize)) - }) It("GZIP", func() { gzip := compressGZIP{} - gzip.Compress(entries) + err := gzip.Compress(entries) + Expect(err).NotTo(HaveOccurred()) verifyCompression(gzip, entries) - }) It("SNAPPY", func() { snappy := compressSnappy{} - snappy.Compress(entries) + err := snappy.Compress(entries) + Expect(err).NotTo(HaveOccurred()) verifyCompression(snappy, entries) }) It("LZ4", func() { lz4 := compressLZ4{} - lz4.Compress(entries) + err := lz4.Compress(entries) + Expect(err).NotTo(HaveOccurred()) verifyCompression(lz4, entries) }) It("ZSTD", func() { zstd := compressZSTD{} - zstd.Compress(entries) + err := zstd.Compress(entries) + Expect(err).NotTo(HaveOccurred()) verifyCompression(zstd, entries) }) - }) func verifyCompression(algo iCompress, subEntries *subEntries) { - Expect(subEntries.totalSizeInBytes).To(SatisfyAll(BeNumerically("<", subEntries.items[0].unCompressedSize))) Expect(subEntries.totalSizeInBytes).To(Equal(subEntries.items[0].sizeInBytes)) bufferReader := bytes.NewReader(subEntries.items[0].dataInBytes) - algo.UnCompress(bufio.NewReader(bufferReader), + _, err := algo.UnCompress(bufio.NewReader(bufferReader), uint32(subEntries.totalSizeInBytes), uint32(subEntries.items[0].unCompressedSize)) + Expect(err).NotTo(HaveOccurred()) } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index c0953529..0504286c 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -83,7 +83,6 @@ func (c *Client) handleResponse() { case commandDeliver: { c.handleDeliver(buffer) - } case commandQueryPublisherSequence: { @@ -296,6 +295,7 @@ func (c *Client) queryPublisherSequenceFrameHandler(readProtocol *ReaderProtocol res.code <- Code{id: readProtocol.ResponseCode} res.data <- sequence } + func (c *Client) handleDeliver(r *bufio.Reader) { subscriptionId := readByte(r) consumer, err := c.coordinator.GetConsumerById(subscriptionId) @@ -360,7 +360,7 @@ func (c *Client) handleDeliver(r *bufio.Reader) { var chunk chunkInfo chunk.numEntries = numEntries - /// headers ---> payload -> messages + // headers ---> payload -> messages if consumer.options.CRCCheck { checkSum := crc32.ChecksumIEEE(bytesBuffer) @@ -400,9 +400,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) { dataSize, _ := readUInt(dataReader) numRecords -= uint32(numRecordsInBatch) compression := (entryType & 0x70) >> 4 //compression - uncompressedReader := compressByValue(compression).UnCompress(dataReader, + uncompressedReader, err := compressByValue(compression).UnCompress(dataReader, dataSize, uncompressedDataSize) + if err != nil { + // TODO: it should return error + logs.LogError("error during data uncompression %w", err) + } for numRecordsInBatch != 0 { batchConsumingMessages = c.decodeMessage(uncompressedReader, From e8e544902e1ed455a815bfa95c67966c1f712557 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 14 Jun 2025 12:31:27 +0200 Subject: [PATCH 3/8] fix: handle error on bufio writer --- pkg/stream/buffer_writer.go | 74 +++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/pkg/stream/buffer_writer.go b/pkg/stream/buffer_writer.go index f006e6b7..a445b79e 100644 --- a/pkg/stream/buffer_writer.go +++ b/pkg/stream/buffer_writer.go @@ -16,13 +16,15 @@ func writeULong(inputBuff *bytes.Buffer, value uint64) { inputBuff.Write(buff) } -func writeBLong(inputBuff *bufio.Writer, value int64) { - writeBULong(inputBuff, uint64(value)) +func writeBLong(inputBuff *bufio.Writer, value int64) error { + return writeBULong(inputBuff, uint64(value)) } -func writeBULong(inputBuff *bufio.Writer, value uint64) { + +func writeBULong(inputBuff *bufio.Writer, value uint64) error { var buff = make([]byte, 8) binary.BigEndian.PutUint64(buff, value) - inputBuff.Write(buff) + _, err := inputBuff.Write(buff) + return err } func writeShort(inputBuff *bytes.Buffer, value int16) { @@ -35,18 +37,24 @@ func writeUShort(inputBuff *bytes.Buffer, value uint16) { inputBuff.Write(buff) } -func writeBShort(inputBuff *bufio.Writer, value int16) { - writeBUShort(inputBuff, uint16(value)) +func writeBShort(inputBuff *bufio.Writer, value int16) error { + return writeBUShort(inputBuff, uint16(value)) } -func writeBUShort(inputBuff *bufio.Writer, value uint16) { +func writeBUShort(inputBuff *bufio.Writer, value uint16) error { var buff = make([]byte, 2) binary.BigEndian.PutUint16(buff, value) - inputBuff.Write(buff) + _, err := inputBuff.Write(buff) + return err } -func writeBString(inputBuff *bufio.Writer, value string) { - writeBUShort(inputBuff, uint16(len(value))) - inputBuff.Write([]byte(value)) +func writeBString(inputBuff *bufio.Writer, value string) error { + err := writeBUShort(inputBuff, uint16(len(value))) + if err != nil { + return err + } + + _, err = inputBuff.Write([]byte(value)) + return err } func writeByte(inputBuff *bytes.Buffer, value byte) { @@ -55,10 +63,11 @@ func writeByte(inputBuff *bytes.Buffer, value byte) { inputBuff.Write(buff) } -func writeBByte(inputBuff *bufio.Writer, value byte) { +func writeBByte(inputBuff *bufio.Writer, value byte) error { var buff = make([]byte, 1) buff[0] = value - inputBuff.Write(buff) + _, err := inputBuff.Write(buff) + return err } func writeInt(inputBuff *bytes.Buffer, value int) { @@ -70,14 +79,15 @@ func writeUInt(inputBuff *bytes.Buffer, value uint32) { inputBuff.Write(buff) } -func writeBInt(inputBuff *bufio.Writer, value int) { - writeBUInt(inputBuff, uint32(value)) +func writeBInt(inputBuff *bufio.Writer, value int) error { + return writeBUInt(inputBuff, uint32(value)) } -func writeBUInt(inputBuff *bufio.Writer, value uint32) { +func writeBUInt(inputBuff *bufio.Writer, value uint32) error { var buff = make([]byte, 4) binary.BigEndian.PutUint32(buff, value) - inputBuff.Write(buff) + _, err := inputBuff.Write(buff) + return err } func writeString(inputBuff *bytes.Buffer, value string) { @@ -105,7 +115,7 @@ func writeBytes(inputBuff *bytes.Buffer, value []byte) { inputBuff.Write(value) } -// writeProtocolHeader protocol utils functions +// writeProtocolHeader protocol utils functions func writeProtocolHeader(inputBuff *bytes.Buffer, length int, command uint16, correlationId ...int) { @@ -121,20 +131,30 @@ func writeProtocolHeader(inputBuff *bytes.Buffer, func writeBProtocolHeader(inputBuff *bufio.Writer, length int, command int16, - correlationId ...int) { - writeBProtocolHeaderVersion(inputBuff, length, command, version1, correlationId...) + correlationId ...int) error { + return writeBProtocolHeaderVersion(inputBuff, length, command, version1, correlationId...) } -func writeBProtocolHeaderVersion(inputBuff *bufio.Writer, - length int, command int16, version int16, - correlationId ...int) { +func writeBProtocolHeaderVersion(inputBuff *bufio.Writer, length int, command int16, + version int16, correlationId ...int) error { + + if err := writeBInt(inputBuff, length); err != nil { + return err + } + if err := writeBShort(inputBuff, command); err != nil { + return err + } + if err := writeBShort(inputBuff, version); err != nil { + return err + } - writeBInt(inputBuff, length) - writeBShort(inputBuff, command) - writeBShort(inputBuff, version) if len(correlationId) > 0 { - writeBInt(inputBuff, correlationId[0]) + if err := writeBInt(inputBuff, correlationId[0]); err != nil { + return err + } } + + return nil } func sizeOfStringArray(array []string) int { From 498465ddc65b42373b2ea9e7bf671dc7ffc73baa Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 14 Jun 2025 12:45:51 +0200 Subject: [PATCH 4/8] fix: handle error in producer --- pkg/stream/producer.go | 134 ++++++++++++++++++++++++++++++----------- 1 file changed, 100 insertions(+), 34 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 96b2f172..760876b1 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -471,28 +471,57 @@ func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) return producer.internalBatchSendProdId(messagesSequence, producer.GetID()) } -func (producer *Producer) simpleAggregation(messagesSequence []*messageSequence, - b *bufio.Writer) { +func (producer *Producer) simpleAggregation(messagesSequence []*messageSequence, b *bufio.Writer) error { for _, msg := range messagesSequence { r := msg.messageBytes - writeBLong(b, msg.publishingId) // publishingId - writeBInt(b, len(r)) // len - b.Write(r) + // publishingId + if err := writeBLong(b, msg.publishingId); err != nil { + return err + } + + // len + if err := writeBInt(b, len(r)); err != nil { + return err + } + + if _, err := b.Write(r); err != nil { + return err + } } + + return nil } -func (producer *Producer) subEntryAggregation(aggregation subEntries, b *bufio.Writer, compression Compression) { +func (producer *Producer) subEntryAggregation(aggregation subEntries, b *bufio.Writer, compression Compression) error { /// 51 messages // aggregation.items == (5 --> [10] messages) + (1 --> [1]message) for _, entry := range aggregation.items { - writeBLong(b, entry.publishingId) - writeBByte(b, 0x80| - compression.value<<4) // 1=SubBatchEntryType:1,CompressionType:3,Reserved:4, - writeBShort(b, int16(len(entry.messages))) - writeBInt(b, entry.unCompressedSize) - writeBInt(b, entry.sizeInBytes) - b.Write(entry.dataInBytes) + if err := writeBLong(b, entry.publishingId); err != nil { + return fmt.Errorf("failed to write publishingId: %w", err) + } + // 1=SubBatchEntryType:1,CompressionType:3,Reserved:4, + if err := writeBByte(b, 0x80|compression.value<<4); err != nil { + return fmt.Errorf("failed to write type and compression byte: %w", err) + } + + if err := writeBShort(b, int16(len(entry.messages))); err != nil { + return fmt.Errorf("failed to write message count: %w", err) + } + + if err := writeBInt(b, entry.unCompressedSize); err != nil { + return fmt.Errorf("failed to write uncompressed size: %w", err) + } + + if err := writeBInt(b, entry.sizeInBytes); err != nil { + return fmt.Errorf("failed to write size in bytes: %w", err) + } + + if _, err := b.Write(entry.dataInBytes); err != nil { + return fmt.Errorf("failed to write data in bytes: %w", err) + } } + + return nil } func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, compression Compression) (subEntries, error) { @@ -576,29 +605,44 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSeq frameHeaderLength := initBufferPublishSize length := frameHeaderLength + msgLen - writeBProtocolHeader(producer.options.client.socket.writer, length, commandPublish) - writeBByte(producer.options.client.socket.writer, producerID) + if err := writeBProtocolHeader(producer.options.client.socket.writer, length, commandPublish); err != nil { + return fmt.Errorf("failed to write protocol header: %w", err) + } + + if err := writeBByte(producer.options.client.socket.writer, producerID); err != nil { + return fmt.Errorf("failed to write producer ID: %w", err) + } + numberOfMessages := len(messagesSequence) numberOfMessages = numberOfMessages / producer.options.SubEntrySize if len(messagesSequence)%producer.options.SubEntrySize != 0 { numberOfMessages += 1 } - writeBInt(producer.options.client.socket.writer, numberOfMessages) //toExcluded - fromInclude + //toExcluded - fromInclude + if err := writeBInt(producer.options.client.socket.writer, numberOfMessages); err != nil { + return fmt.Errorf("failed to write number of messages: %w", err) + } if producer.options.isSubEntriesBatching() { - producer.subEntryAggregation(aggregation, producer.options.client.socket.writer, producer.options.Compression) + err := producer.subEntryAggregation(aggregation, producer.options.client.socket.writer, producer.options.Compression) + if err != nil { + return fmt.Errorf("failed to write sub entry aggregation: %w", err) + } } if !producer.options.isSubEntriesBatching() { - producer.simpleAggregation(messagesSequence, producer.options.client.socket.writer) + err := producer.simpleAggregation(messagesSequence, producer.options.client.socket.writer) + if err != nil { + return fmt.Errorf("failed to write simple aggregation: %w", err) + } } - err := producer.options.client.socket.writer.Flush() //writeAndFlush(b.Bytes()) + err := producer.options.client.socket.writer.Flush() if err != nil { - logs.LogError("Producer BatchSend error during flush: %s", err) - return err + return fmt.Errorf("producer BatchSend error during flush: %w", err) } + return nil } @@ -732,33 +776,55 @@ func (producer *Producer) sendWithFilter(messagesSequence []*messageSequence, pr frameHeaderLength := initBufferPublishSize var msgLen int for _, msg := range messagesSequence { - msgLen += len(msg.messageBytes) + 8 + 4 + msgLen += len(msg.messageBytes) + 8 + 4 // 8 for publishingId, 4 for message length if msg.filterValue != "" { - msgLen += 2 + len(msg.filterValue) + msgLen += 2 + len(msg.filterValue) // 2 for string length, then string bytes } } length := frameHeaderLength + msgLen - writeBProtocolHeaderVersion(producer.options.client.socket.writer, length, commandPublish, version2) - writeBByte(producer.options.client.socket.writer, producerID) + if err := writeBProtocolHeaderVersion(producer.options.client.socket.writer, length, commandPublish, version2); err != nil { + return fmt.Errorf("failed to write protocol header version: %w", err) + } + + if err := writeBByte(producer.options.client.socket.writer, producerID); err != nil { + return fmt.Errorf("failed to write producer ID: %w", err) + } + numberOfMessages := len(messagesSequence) - writeBInt(producer.options.client.socket.writer, numberOfMessages) + if err := writeBInt(producer.options.client.socket.writer, numberOfMessages); err != nil { + return fmt.Errorf("failed to write number of messages: %w", err) + } for _, msg := range messagesSequence { - writeBLong(producer.options.client.socket.writer, msg.publishingId) + if err := writeBLong(producer.options.client.socket.writer, msg.publishingId); err != nil { + return fmt.Errorf("failed to write publishing ID for message: %w", err) + } + if msg.filterValue != "" { - writeBString(producer.options.client.socket.writer, msg.filterValue) + if err := writeBString(producer.options.client.socket.writer, msg.filterValue); err != nil { + return fmt.Errorf("failed to write filter value for message: %w", err) + } } else { - writeBInt(producer.options.client.socket.writer, -1) + if err := writeBInt(producer.options.client.socket.writer, -1); err != nil { + return fmt.Errorf("failed to write -1 for filter value: %w", err) + } } - writeBInt(producer.options.client.socket.writer, len(msg.messageBytes)) // len - _, err := producer.options.client.socket.writer.Write(msg.messageBytes) - if err != nil { - return err + + if err := writeBInt(producer.options.client.socket.writer, len(msg.messageBytes)); err != nil { + return fmt.Errorf("failed to write message length: %w", err) + } + + if _, err := producer.options.client.socket.writer.Write(msg.messageBytes); err != nil { + return fmt.Errorf("failed to write message bytes: %w", err) } } - return producer.options.client.socket.writer.Flush() + if err := producer.options.client.socket.writer.Flush(); err != nil { + return fmt.Errorf("failed to flush writer: %w", err) + } + + return nil } func (c *Client) deletePublisher(publisherId byte) error { From af5cdf9f8ea83c2ecec3d88127db895898eb463d Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 14 Jun 2025 12:54:56 +0200 Subject: [PATCH 5/8] fix: remove unused erro on client close --- pkg/stream/client.go | 3 +-- pkg/stream/consumer.go | 20 ++++++++++---------- pkg/stream/coordinator.go | 8 +++++--- pkg/stream/environment.go | 14 +++----------- pkg/stream/environment_test.go | 11 ++++++----- pkg/stream/producer.go | 2 +- pkg/stream/server_frame.go | 2 +- pkg/stream/super_stream_producer_test.go | 13 +++++++------ pkg/stream/super_stream_test.go | 5 +++-- 9 files changed, 37 insertions(+), 41 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 6b17ce79..86450adb 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -470,7 +470,7 @@ func (c *Client) closeHartBeat() { } -func (c *Client) Close() error { +func (c *Client) Close() { c.closeHartBeat() c.coordinator.Producers().Range(func(_, p any) bool { producer := p.(*Producer) @@ -522,7 +522,6 @@ func (c *Client) Close() error { _ = c.coordinator.RemoveResponseById(res.correlationid) } c.getSocket().shutdown(nil) - return nil } func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (*Producer, error) { diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index f04bdbbe..c9262c43 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -3,10 +3,11 @@ package stream import ( "bytes" "fmt" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - logs "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "sync" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + logs "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) type Consumer struct { @@ -316,26 +317,27 @@ func (c *Client) credit(subscriptionId byte, credit int16) { } func (consumer *Consumer) Close() error { - if consumer.getStatus() == closed { return AlreadyClosed } - return consumer.close(Event{ + + consumer.close(Event{ Command: CommandUnsubscribe, StreamName: consumer.GetStreamName(), Name: consumer.GetName(), Reason: UnSubscribe, Err: nil, }) -} -func (consumer *Consumer) close(reason Event) error { + return nil +} +func (consumer *Consumer) close(reason Event) { if consumer.options == nil { // the config is usually set. this check is just to avoid panic and to make some test // easier to write logs.LogDebug("consumer options is nil, the close will be ignored") - return nil + return } consumer.cacheStoreOffset() @@ -371,14 +373,12 @@ func (consumer *Consumer) close(reason Event) error { _, _ = consumer.options.client.coordinator.ExtractConsumerById(consumer.ID) if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 { - _ = consumer.options.client.Close() + consumer.options.client.Close() } if consumer.onClose != nil { consumer.onClose() } - - return nil } func (consumer *Consumer) cacheStoreOffset() { diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 1183b1fe..7bb60958 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -2,11 +2,12 @@ package stream import ( "fmt" - "github.com/pkg/errors" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "strconv" "sync" "time" + + "github.com/pkg/errors" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" ) type Coordinator struct { @@ -86,7 +87,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) if err != nil { return err } - return consumer.close(reason) + consumer.close(reason) + return nil } func (coordinator *Coordinator) Consumers() *sync.Map { diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 9d886010..0212b8ff 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -50,12 +50,7 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration, options.RPCTimeout) - defer func(client *Client) { - err := client.Close() - if err != nil { - return - } - }(client) + defer client.Close() // we put a limit to the heartbeat. // it doesn't make sense to have a heartbeat less than 3 seconds @@ -275,7 +270,7 @@ func (env *Environment) Close() error { _ = env.producers.close() _ = env.consumers.close() if env.locator.client != nil { - _ = env.locator.client.Close() + env.locator.client.Close() } env.closed = true return nil @@ -581,10 +576,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP logs.LogDebug("connectionProperties host %s doesn't match with the advertised_host %s, advertised_port %s .. retry", clientResult.connectionProperties.host, leader.advHost, leader.advPort) - err := clientResult.Close() - if err != nil { - return nil, err - } + clientResult.Close() clientResult = cc.newClientForProducer(clientProvidedName, leader, tcpParameters, saslConfiguration, rpcTimeout) err = clientResult.connect() if err != nil { diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 3520a3d7..2393901e 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -2,10 +2,11 @@ package stream import ( "crypto/tls" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "sync" "time" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -496,22 +497,22 @@ var _ = Describe("Environment test", func() { Expect(err).NotTo(HaveOccurred()) streamName := uuid.New().String() // here we force the client closing - Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + env.locator.client.Close() Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) Expect(env.locator.client.socket.isOpen()).To(BeTrue()) const consumerName = "my_consumer_1" // here we force the client closing - Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + env.locator.client.Close() Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) Expect(env.locator.client.socket.isOpen()).To(BeTrue()) // here we force the client closing - Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + env.locator.client.Close() off, err := env.QueryOffset(consumerName, streamName) Expect(err).NotTo(HaveOccurred()) Expect(env.locator.client.socket.isOpen()).To(BeTrue()) Expect(off).To(Equal(int64(123))) // here we force the client closing - Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + env.locator.client.Close() Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) Expect(env.locator.client.socket.isOpen()).To(BeTrue()) Expect(env.Close()).NotTo(HaveOccurred()) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 760876b1..7fe23355 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -711,7 +711,7 @@ func (producer *Producer) close(reason Event) error { _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if producer.options.client.coordinator.ProducersCount() == 0 { - _ = producer.options.client.Close() + producer.options.client.Close() } if producer.onClose != nil { diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0504286c..6327289f 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -37,7 +37,7 @@ func (c *Client) handleResponse() { frameLen, err := readUInt(buffer) if err != nil { logs.LogDebug("Read connection failed: %s", err) - _ = c.Close() + c.Close() break } diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index 0102a8cd..f5505155 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -2,15 +2,16 @@ package stream import ( "fmt" + "math/rand" + "sync" + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper" - "math/rand" - "sync" - "time" + test_helper "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper" ) type TestingRandomStrategy struct { @@ -344,7 +345,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() Expect(err).NotTo(HaveOccurred()) Expect(route).To(Equal([]string{})) - Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + env.locator.client.Close() Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred()) Expect(env.Close()).NotTo(HaveOccurred()) }) @@ -358,7 +359,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() route, err := env.locator.client.queryRoute("not-found", "italy") Expect(err).To(HaveOccurred()) Expect(route).To(BeNil()) - Expect(env.locator.client.Close()).NotTo(HaveOccurred()) + env.locator.client.Close() Expect(env.Close()).NotTo(HaveOccurred()) }) diff --git a/pkg/stream/super_stream_test.go b/pkg/stream/super_stream_test.go index 7181abed..96cbbe3b 100644 --- a/pkg/stream/super_stream_test.go +++ b/pkg/stream/super_stream_test.go @@ -1,9 +1,10 @@ package stream import ( + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "time" ) type testSuperStreamOption struct { @@ -82,7 +83,7 @@ var _ = Describe("Super Stream Client", Label("super-stream"), func() { err = testEnvironment.locator.client.DeclareSuperStream("valid name", newTestSuperStreamOption([]string{"valid "}, []string{""}, nil)) Expect(err).To(HaveOccurred()) - Expect(testEnvironment.locator.client.Close()).NotTo(HaveOccurred()) + testEnvironment.locator.client.Close() }) It("Create Super stream two times and delete it with client", Label("super-stream"), func() { From ea681b2d9f2f6e9df57d3fcf6f44608f527aebc2 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 14 Jun 2025 12:58:55 +0200 Subject: [PATCH 6/8] fix: generic error handling --- pkg/stream/stream_suite_test.go | 10 +++++++--- pkg/stream/super_stream_producer_test.go | 3 ++- pkg/system_integration/http.go | 8 +++++--- pkg/test-helper/http_utils.go | 8 +++++--- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/pkg/stream/stream_suite_test.go b/pkg/stream/stream_suite_test.go index 864c8879..f549e54d 100644 --- a/pkg/stream/stream_suite_test.go +++ b/pkg/stream/stream_suite_test.go @@ -2,10 +2,11 @@ package stream_test import ( "fmt" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" "net/http" "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) const testVhost = "rabbitmq-streams-go-test" @@ -48,7 +49,10 @@ func httpCall(method, url string) error { if err != nil { return err } - resp.Body.Close() + err = resp.Body.Close() + if err != nil { + return err + } if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("http error (%d): %s", resp.StatusCode, resp.Status) diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index f5505155..b4831446 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -48,7 +48,8 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() msg := amqp.NewMessage(make([]byte, 0)) msg.ApplicationProperties = map[string]interface{}{"routingKey": key} - msg.MarshalBinary() + _, err := msg.MarshalBinary() + Expect(err).NotTo(HaveOccurred()) routing, err := routingMurmur.Route(msg, partitions) Expect(err).NotTo(HaveOccurred()) Expect(routing).To(HaveLen(1)) diff --git a/pkg/system_integration/http.go b/pkg/system_integration/http.go index df52d7e9..96a08d00 100644 --- a/pkg/system_integration/http.go +++ b/pkg/system_integration/http.go @@ -2,10 +2,11 @@ package system_integration import ( "encoding/json" - "github.com/pkg/errors" - "io/ioutil" + "io" "net/http" "strconv" + + "github.com/pkg/errors" ) type queue struct { @@ -40,10 +41,11 @@ func httpGet(url, username, password string) (string, error) { return "", err3 } + //nolint:errcheck defer resp.Body.Close() if resp.StatusCode == 200 { // OK - bodyBytes, err2 := ioutil.ReadAll(resp.Body) + bodyBytes, err2 := io.ReadAll(resp.Body) if err2 != nil { return "", err2 } diff --git a/pkg/test-helper/http_utils.go b/pkg/test-helper/http_utils.go index b7b87fd3..a8758771 100644 --- a/pkg/test-helper/http_utils.go +++ b/pkg/test-helper/http_utils.go @@ -2,10 +2,11 @@ package test_helper import ( "encoding/json" - "github.com/pkg/errors" - "io/ioutil" + "io" "net/http" "strconv" + + "github.com/pkg/errors" ) type client_properties struct { @@ -86,10 +87,11 @@ func baseCall(url, username, password string, method string) (string, error) { return "", err3 } + //nolint:errcheck defer resp.Body.Close() if resp.StatusCode == 200 { // OK - bodyBytes, err2 := ioutil.ReadAll(resp.Body) + bodyBytes, err2 := io.ReadAll(resp.Body) if err2 != nil { return "", err2 } From 43486a7ce50311ac048959da5eba7968b080faac Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 16 Jun 2025 10:49:47 +0200 Subject: [PATCH 7/8] change test Signed-off-by: Gabriele Santomaggio --- pkg/stream/super_stream_producer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index b4831446..64bd3a43 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -453,7 +453,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() Expect(env.Close()).NotTo(HaveOccurred()) }) - It("should return an error when the producer is already connected for a partition", func() { + It("should return an error when the producer is already connected for a partition", Focus, func() { // Test is to validate the error when the producer is already connected env, err := NewEnvironment(nil) @@ -525,7 +525,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() Eventually(partitionCloseEvent).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Receive()) // Verify that the partition was successfully reconnected - Expect(superProducer.getProducers()).To(HaveLen(partitionsCount)) + Eventually(superProducer.getProducers()).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(HaveLen(partitionsCount)) reconnectedProducer := superProducer.getProducer(partitionToClose) Expect(reconnectedProducer).NotTo(BeNil()) From ba0216c2b7c3d01b9f43cd6efc7704546b06d061 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 16 Jun 2025 10:52:26 +0200 Subject: [PATCH 8/8] change test Signed-off-by: Gabriele Santomaggio --- pkg/stream/super_stream_producer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/super_stream_producer_test.go b/pkg/stream/super_stream_producer_test.go index 64bd3a43..0b673a94 100644 --- a/pkg/stream/super_stream_producer_test.go +++ b/pkg/stream/super_stream_producer_test.go @@ -453,7 +453,7 @@ var _ = Describe("Super Stream Producer", Label("super-stream-producer"), func() Expect(env.Close()).NotTo(HaveOccurred()) }) - It("should return an error when the producer is already connected for a partition", Focus, func() { + It("should return an error when the producer is already connected for a partition", func() { // Test is to validate the error when the producer is already connected env, err := NewEnvironment(nil)