From 6077958f81e43c0a51e3189c83609e5f98a9ccc8 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 25 Nov 2024 15:35:18 +0100 Subject: [PATCH 01/22] latency on perftest Signed-off-by: Gabriele Santomaggio --- .github/workflows/build_and_test.yml | 20 +++++++ examples/tls/getting_started_tls.go | 41 +++++++++----- perfTest/cmd/commands.go | 2 + perfTest/cmd/silent.go | 84 ++++++++++++++++++---------- 4 files changed, 102 insertions(+), 45 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index ea87633d..d5158a53 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -87,3 +87,23 @@ jobs: - name: Install GNU make run: choco install make - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} + publish: + runs-on: ubuntu-latest + needs: [test] + steps: + - uses: docker/setup-buildx-action@v2 + - uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - uses: actions/checkout@v3 + - name: Publish Docker Image + run: | + set -x + VERSION=latest + export VERSION + if [[ ! $GITHUB_REF =~ "/tags/" ]] + then + VERSION=dev + fi + make perf-test-docker-push diff --git a/examples/tls/getting_started_tls.go b/examples/tls/getting_started_tls.go index 9586b240..b457a3e2 100644 --- a/examples/tls/getting_started_tls.go +++ b/examples/tls/getting_started_tls.go @@ -4,7 +4,6 @@ import ( "bufio" "crypto/tls" "fmt" - "github.com/google/uuid" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" @@ -48,13 +47,18 @@ func main() { fmt.Println("Getting started with Streaming TLS client for RabbitMQ") fmt.Println("Connecting to RabbitMQ streaming ...") + addressResolver := stream.AddressResolver{ + Host: "35.234.132.231", + Port: 5551, + } // Connect to the broker ( or brokers ) env, err := stream.NewEnvironment( stream.NewEnvironmentOptions(). - SetHost("localhost"). - SetPort(5551). // standard TLS port - SetUser("guest"). - SetPassword("guest"). + SetAddressResolver(addressResolver). + SetPort(addressResolver.Port). // standard TLS port + SetHost(addressResolver.Host). + SetUser("remote"). + SetPassword("remote"). IsTLS(true). // use tls.Config to customize the TLS configuration // for tests you may need InsecureSkipVerify: true @@ -73,12 +77,12 @@ func main() { // err = env.DeclareStream(streamName, nil) // it is the best practise to define a size, 1GB for example: - streamName := uuid.New().String() - err = env.DeclareStream(streamName, - &stream.StreamOptions{ - MaxLengthBytes: stream.ByteCapacity{}.GB(2), - }, - ) + streamName := "perf-test-go" + //err = env.DeclareStream(streamName, + // &stream.StreamOptions{ + // MaxLengthBytes: stream.ByteCapacity{}.GB(2), + // }, + //) CheckErr(err) @@ -92,7 +96,7 @@ func main() { // the send method automatically aggregates the messages // based on batch size - for i := 0; i < 1000; i++ { + for i := 0; i < 10; i++ { err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i)))) CheckErr(err) } @@ -107,8 +111,17 @@ func main() { // //}, nil) // if you need to track the offset you need a consumer name like: + consumed := 0 handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data) + + consumed++ + if consumed%1000 == 0 { + + fmt.Printf("name: %s, offset %d, chunk entities count: %d, total: %d \n ", + consumerContext.Consumer.GetName(), consumerContext.Consumer.GetOffset(), consumerContext.GetEntriesCount(), consumed) + + } + } consumer, err := env.NewConsumer( @@ -128,7 +141,7 @@ func main() { err = consumer.Close() time.Sleep(200 * time.Millisecond) CheckErr(err) - err = env.DeleteStream(streamName) + //err = env.DeleteStream(streamName) CheckErr(err) err = env.Close() CheckErr(err) diff --git a/perfTest/cmd/commands.go b/perfTest/cmd/commands.go index 0ae7e0ce..3902bfa3 100644 --- a/perfTest/cmd/commands.go +++ b/perfTest/cmd/commands.go @@ -46,6 +46,7 @@ var ( crcCheck bool runDuration int initialCredits int + isBatchSend bool ) func init() { @@ -76,6 +77,7 @@ func setupCli(baseCmd *cobra.Command) { baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.") baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random") baseCmd.PersistentFlags().IntVarP(&initialCredits, "initial-credits", "", 10, "Consumer initial credits") + baseCmd.PersistentFlags().BoolVarP(&isBatchSend, "batch-send", "", false, "Enable batch send") baseCmd.AddCommand(versionCmd) baseCmd.AddCommand(newSilent()) } diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 6616d7b1..7bc9c5cd 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -36,8 +36,10 @@ func newSilent() *cobra.Command { } var ( - publisherMessageCount int32 - consumerMessageCount int32 + publisherMessageCount int32 + consumerMessageCount int32 + //consumerMessageCountPerLatency int32 + totalLatency int64 confirmedMessageCount int32 notConfirmedMessageCount int32 consumersCloseCount int32 @@ -77,9 +79,15 @@ func printStats() { PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 + //latency := float64(totalLatency) / float64(atomic.LoadInt32(&consumerMessageCount)) + averageLatency := int64(0) + if atomic.LoadInt32(&consumerMessageCount) > 0 { + averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount)) + } + ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 - logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v |", - PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent)) + logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", + PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency) } } @@ -273,28 +281,9 @@ func startPublisher(streamName string) error { return err } - var arr []message.StreamMessage - var body []byte - for z := 0; z < batchSize; z++ { - - if fixedBody > 0 { - body = make([]byte, fixedBody) - } else { - if variableBody > 0 { - rand.Seed(time.Now().UnixNano()) - body = make([]byte, rand.Intn(variableBody)) - } - } - n := time.Now().UnixNano() - var buff = make([]byte, 8) - binary.BigEndian.PutUint64(buff, uint64(n)) - /// added to calculate the latency - msg := amqp.NewMessage(append(buff, body...)) - arr = append(arr, msg) - } - - go func(prod *ha.ReliableProducer, messages []message.StreamMessage) { + go func(prod *ha.ReliableProducer) { for { + if rate > 0 { rateWithBatchSize := float64(rate) / float64(batchSize) sleepAfterMessage := float64(time.Second) / rateWithBatchSize @@ -313,21 +302,50 @@ func startPublisher(streamName string) error { } time.Sleep(time.Duration(sleep) * time.Millisecond) } + messages := buildMessages() - atomic.AddInt64(&messagesSent, int64(len(arr))) - for _, streamMessage := range arr { - err = prod.Send(streamMessage) + atomic.AddInt64(&messagesSent, int64(len(messages))) + if isBatchSend { + err = prod.BatchSend(messages) checkErr(err) + } else { + for _, streamMessage := range messages { + err = prod.Send(streamMessage) + checkErr(err) + } } - atomic.AddInt32(&publisherMessageCount, int32(len(arr))) + + atomic.AddInt32(&publisherMessageCount, int32(len(messages))) } - }(rPublisher, arr) + }(rPublisher) return nil } +func buildMessages() []message.StreamMessage { + var arr []message.StreamMessage + for z := 0; z < batchSize; z++ { + //var body []byte + if fixedBody > 0 { + // body = make([]byte, fixedBody) + } else { + if variableBody > 0 { + rand.Seed(time.Now().UnixNano()) + // body = make([]byte, rand.Intn(variableBody)) + } + } + var buff = make([]byte, 8) + sentTime := time.Now().UnixMilli() + binary.BigEndian.PutUint64(buff, uint64(sentTime)) + /// added to calculate the latency + msg := amqp.NewMessage(buff) + arr = append(arr, msg) + } + return arr +} + func startPublishers() error { logInfo("Starting %d publishers...", publishers) @@ -362,8 +380,12 @@ func handleConsumerClose(channelClose stream.ChannelClose) { func startConsumer(consumerName string, streamName string) error { handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&consumerMessageCount, 1) + sentTime := binary.BigEndian.Uint64(message.GetData()[:8]) // Decode the timestamp + startTimeFromMessage := time.UnixMilli(int64(sentTime)) + latency := time.Now().Sub(startTimeFromMessage).Milliseconds() + totalLatency += latency + atomic.AddInt32(&consumerMessageCount, 1) } offsetSpec := stream.OffsetSpecification{}.Last() switch consumerOffset { From e8e74203f7078637f2b1b9fa6b4ed52a08a41d2a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 25 Nov 2024 15:37:00 +0100 Subject: [PATCH 02/22] restore tls [skip ci] Signed-off-by: Gabriele Santomaggio --- examples/tls/getting_started_tls.go | 43 ++++++++++------------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/examples/tls/getting_started_tls.go b/examples/tls/getting_started_tls.go index b457a3e2..88d99115 100644 --- a/examples/tls/getting_started_tls.go +++ b/examples/tls/getting_started_tls.go @@ -4,6 +4,7 @@ import ( "bufio" "crypto/tls" "fmt" + "github.com/google/uuid" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" @@ -47,18 +48,13 @@ func main() { fmt.Println("Getting started with Streaming TLS client for RabbitMQ") fmt.Println("Connecting to RabbitMQ streaming ...") - addressResolver := stream.AddressResolver{ - Host: "35.234.132.231", - Port: 5551, - } // Connect to the broker ( or brokers ) env, err := stream.NewEnvironment( stream.NewEnvironmentOptions(). - SetAddressResolver(addressResolver). - SetPort(addressResolver.Port). // standard TLS port - SetHost(addressResolver.Host). - SetUser("remote"). - SetPassword("remote"). + SetHost("localhost"). + SetPort(5551). // standard TLS port + SetUser("guest"). + SetPassword("guest"). IsTLS(true). // use tls.Config to customize the TLS configuration // for tests you may need InsecureSkipVerify: true @@ -77,12 +73,12 @@ func main() { // err = env.DeclareStream(streamName, nil) // it is the best practise to define a size, 1GB for example: - streamName := "perf-test-go" - //err = env.DeclareStream(streamName, - // &stream.StreamOptions{ - // MaxLengthBytes: stream.ByteCapacity{}.GB(2), - // }, - //) + streamName := uuid.New().String() + err = env.DeclareStream(streamName, + &stream.StreamOptions{ + MaxLengthBytes: stream.ByteCapacity{}.GB(2), + }, + ) CheckErr(err) @@ -96,7 +92,7 @@ func main() { // the send method automatically aggregates the messages // based on batch size - for i := 0; i < 10; i++ { + for i := 0; i < 1000; i++ { err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i)))) CheckErr(err) } @@ -111,24 +107,15 @@ func main() { // //}, nil) // if you need to track the offset you need a consumer name like: - consumed := 0 handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - - consumed++ - if consumed%1000 == 0 { - - fmt.Printf("name: %s, offset %d, chunk entities count: %d, total: %d \n ", - consumerContext.Consumer.GetName(), consumerContext.Consumer.GetOffset(), consumerContext.GetEntriesCount(), consumed) - - } - + fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data) } consumer, err := env.NewConsumer( streamName, handleMessages, stream.NewConsumerOptions(). - SetConsumerName("my_consumer"). // set a consumer name + SetConsumerName("my_consumer"). // set a consumer name SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning CheckErr(err) channelClose := consumer.NotifyClose() @@ -141,7 +128,7 @@ func main() { err = consumer.Close() time.Sleep(200 * time.Millisecond) CheckErr(err) - //err = env.DeleteStream(streamName) + err = env.DeleteStream(streamName) CheckErr(err) err = env.Close() CheckErr(err) From d8405872c82b3c522b6d8372ba0ca5ac1e3045d2 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 25 Nov 2024 16:45:10 +0100 Subject: [PATCH 03/22] Fix reset latency Signed-off-by: Gabriele Santomaggio --- examples/tls/getting_started_tls.go | 2 +- perfTest/cmd/silent.go | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/examples/tls/getting_started_tls.go b/examples/tls/getting_started_tls.go index 88d99115..9586b240 100644 --- a/examples/tls/getting_started_tls.go +++ b/examples/tls/getting_started_tls.go @@ -115,7 +115,7 @@ func main() { streamName, handleMessages, stream.NewConsumerOptions(). - SetConsumerName("my_consumer"). // set a consumer name + SetConsumerName("my_consumer"). // set a consumer name SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning CheckErr(err) channelClose := consumer.NotifyClose() diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 7bc9c5cd..847dae3a 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -76,18 +76,15 @@ func printStats() { select { case _ = <-ticker.C: v := time.Now().Sub(start).Milliseconds() - - PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 - CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 - //latency := float64(totalLatency) / float64(atomic.LoadInt32(&consumerMessageCount)) averageLatency := int64(0) if atomic.LoadInt32(&consumerMessageCount) > 0 { + PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 + CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount)) + ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 + logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", + PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency) } - - ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 - logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", - PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency) } } @@ -100,6 +97,7 @@ func printStats() { atomic.SwapInt32(&publisherMessageCount, 0) atomic.SwapInt32(&consumerMessageCount, 0) + atomic.SwapInt64(&totalLatency, 0) atomic.SwapInt32(&confirmedMessageCount, 0) atomic.SwapInt32(¬ConfirmedMessageCount, 0) start = time.Now() @@ -159,8 +157,9 @@ func startSimulation() error { err := initStreams() checkErr(err) + // simulEnvironment, err = stream.NewEnvironment(stream.NewEnvironmentOptions(). - SetUris(rabbitmqBrokerUrl). + SetUri(rabbitmqBrokerUrl[0]). SetMaxProducersPerClient(publishersPerClient). SetMaxConsumersPerClient(consumersPerClient)) checkErr(err) From 54c8c334a31a1006036faafa2b06174622c0c06b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 25 Nov 2024 16:52:51 +0100 Subject: [PATCH 04/22] Add log Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/silent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 847dae3a..5675453e 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -94,7 +94,7 @@ func printStats() { for { select { case _ = <-tickerReset.C: - + logInfo("Resetting counters...") atomic.SwapInt32(&publisherMessageCount, 0) atomic.SwapInt32(&consumerMessageCount, 0) atomic.SwapInt64(&totalLatency, 0) From 9be47e51d2c5fd3d782b85c5a1d033fb4b8dbda7 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 25 Nov 2024 17:06:05 +0100 Subject: [PATCH 05/22] Add reset log Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/silent.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 5675453e..a763e632 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -94,12 +94,12 @@ func printStats() { for { select { case _ = <-tickerReset.C: - logInfo("Resetting counters...") - atomic.SwapInt32(&publisherMessageCount, 0) + logInfo("***********Resetting counters***********") atomic.SwapInt32(&consumerMessageCount, 0) - atomic.SwapInt64(&totalLatency, 0) - atomic.SwapInt32(&confirmedMessageCount, 0) atomic.SwapInt32(¬ConfirmedMessageCount, 0) + atomic.SwapInt32(&confirmedMessageCount, 0) + atomic.SwapInt32(&publisherMessageCount, 0) + atomic.SwapInt64(&totalLatency, 0) start = time.Now() } } From a6d062647ecb26186487bf49bd38262af52e68e7 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 25 Nov 2024 17:10:19 +0100 Subject: [PATCH 06/22] remove needs Signed-off-by: Gabriele Santomaggio --- .github/workflows/build_and_test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index d5158a53..da665c45 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -89,7 +89,8 @@ jobs: - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} publish: runs-on: ubuntu-latest - needs: [test] + # tmp + # needs: [test] steps: - uses: docker/setup-buildx-action@v2 - uses: docker/login-action@v2 From 0bf99f95bdd7b38821f215795b04319bc61d109f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 25 Nov 2024 17:30:36 +0100 Subject: [PATCH 07/22] add controls Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/silent.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index a763e632..880905e6 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -76,15 +76,21 @@ func printStats() { select { case _ = <-ticker.C: v := time.Now().Sub(start).Milliseconds() + PMessagesPerSecond := float64(0) + if publisherMessageCount > 0 { + PMessagesPerSecond = float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 + } + averageLatency := int64(0) + CMessagesPerSecond := float64(0) + ConfirmedMessagesPerSecond := float64(0) if atomic.LoadInt32(&consumerMessageCount) > 0 { - PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 - CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 + CMessagesPerSecond = float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount)) - ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 - logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", - PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency) + ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 } + logInfo("+Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", + PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency) } } From 922893d24baa0ae957fac9ede778f42efad902f7 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 10:44:25 +0100 Subject: [PATCH 08/22] add client-provided-name Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/commands.go | 2 ++ perfTest/cmd/silent.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/perfTest/cmd/commands.go b/perfTest/cmd/commands.go index 3902bfa3..b9df70bc 100644 --- a/perfTest/cmd/commands.go +++ b/perfTest/cmd/commands.go @@ -47,6 +47,7 @@ var ( runDuration int initialCredits int isBatchSend bool + clientProvidedName string ) func init() { @@ -78,6 +79,7 @@ func setupCli(baseCmd *cobra.Command) { baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random") baseCmd.PersistentFlags().IntVarP(&initialCredits, "initial-credits", "", 10, "Consumer initial credits") baseCmd.PersistentFlags().BoolVarP(&isBatchSend, "batch-send", "", false, "Enable batch send") + baseCmd.PersistentFlags().StringVarP(&clientProvidedName, "client-provided-name", "", "", "Client provided name") baseCmd.AddCommand(versionCmd) baseCmd.AddCommand(newSilent()) } diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 880905e6..0963ca4a 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -271,12 +271,11 @@ func startPublisher(streamName string) error { if compression == "zstd" { cp = stream.Compression{}.Zstd() } - producerOptions.SetSubEntrySize(subEntrySize).SetCompression(cp) - logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp) } + producerOptions.SetClientProvidedName(clientProvidedName) rPublisher, err := ha.NewReliableProducer(simulEnvironment, streamName, producerOptions, @@ -420,6 +419,7 @@ func startConsumer(consumerName string, streamName string) error { handleMessages, stream.NewConsumerOptions(). SetConsumerName(consumerName). + SetClientProvidedName(clientProvidedName). SetOffset(offsetSpec). SetCRCCheck(crcCheck). SetInitialCredits(int16(initialCredits))) From d673796215bde675b262bd6b008af88e82474cc5 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 11:40:26 +0100 Subject: [PATCH 09/22] formatting Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/silent.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 0963ca4a..3bbcbb72 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -9,6 +9,8 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "github.com/spf13/cobra" + "golang.org/x/text/language" + gomsg "golang.org/x/text/message" "math/rand" "os" "sync" @@ -89,8 +91,9 @@ func printStats() { averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount)) ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 } - logInfo("+Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", - PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency) + p := gomsg.NewPrinter(language.English) + logInfo(p.Sprintf("+Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", + PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency)) } } From 4d8a8a25b65418c4b47e035baa024146fdeac71d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 12:28:07 +0100 Subject: [PATCH 10/22] test Signed-off-by: Gabriele Santomaggio --- pkg/stream/brokers.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index 3a7e5f59..777e5a21 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -52,9 +52,10 @@ func newTCPParameterDefault() *TCPParameters { RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, WriteBuffer: 8192, - ReadBuffer: 65536, - NoDelay: false, - tlsConfig: nil, + //ReadBuffer: 65536, + ReadBuffer: 2097152, + NoDelay: false, + tlsConfig: nil, } } From 4465dd9c1dd95219c22140b7bb135e29000af8a9 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 13:50:14 +0100 Subject: [PATCH 11/22] test Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/silent.go | 2 +- pkg/stream/brokers.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 3bbcbb72..25edc502 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -92,7 +92,7 @@ func printStats() { ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 } p := gomsg.NewPrinter(language.English) - logInfo(p.Sprintf("+Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", + logInfo(p.Sprintf("++Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency)) } } diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index 777e5a21..d9083b96 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -51,7 +51,7 @@ func newTCPParameterDefault() *TCPParameters { return &TCPParameters{ RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, - WriteBuffer: 8192, + WriteBuffer: 2097152, //ReadBuffer: 65536, ReadBuffer: 2097152, NoDelay: false, From 13ff7f898e27b7dc3ff0c5a55775f511995382d1 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 14:04:04 +0100 Subject: [PATCH 12/22] to 22 Signed-off-by: Gabriele Santomaggio --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 3879ab8e..42016507 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.19 as builder +FROM golang:1.22 as builder ENV GOPATH=/go GOOS=linux CGO_ENABLED=0 WORKDIR /go/src/github.com/rabbitmq/rabbitmq-stream-go-client COPY go.mod go.sum VERSION ./ From 51088e3bb38ce8996d1e22ee28ec225a62019a72 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 14:50:24 +0100 Subject: [PATCH 13/22] change tcp paramenters Signed-off-by: Gabriele Santomaggio --- pkg/stream/brokers.go | 6 +++--- pkg/stream/client.go | 7 +++++++ pkg/stream/server_frame.go | 3 +-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index d9083b96..99041e3a 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -51,10 +51,10 @@ func newTCPParameterDefault() *TCPParameters { return &TCPParameters{ RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, - WriteBuffer: 2097152, + WriteBuffer: 65536, //ReadBuffer: 65536, - ReadBuffer: 2097152, - NoDelay: false, + ReadBuffer: 65536, + NoDelay: true, tlsConfig: nil, } } diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 2c92e717..9d24189e 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -163,6 +163,13 @@ func (c *Client) connect() error { return errorConnection } + err = connection.SetDeadline(time.Now().Add(5 * time.Second)) + if err != nil { + logs.LogError("Failed to SetDeadline due to %v", err) + return err + } + //connection.SetDeadline(time.Now().Add(c.socketCallTimeout)) + if err = connection.SetWriteBuffer(c.tcpParameters.WriteBuffer); err != nil { logs.LogError("Failed to SetWriteBuffer to %d due to %v", c.tcpParameters.WriteBuffer, err) return err diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 96f0d8e8..0d889957 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -321,8 +321,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) { _, _ = readUInt(r) _, _ = readUInt(r) - c.credit(subscriptionId, 1) - var offsetLimit int64 = -1 // we can have two cases @@ -414,6 +412,7 @@ func (c *Client) handleDeliver(r *bufio.Reader) { chunk.offsetMessages = batchConsumingMessages if consumer.getStatus() == open { consumer.response.chunkForConsumer <- chunk + c.credit(subscriptionId, 1) } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName()) From f00f129307f9e9bcc4eb0246c1e06676ae58440b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 15:19:55 +0100 Subject: [PATCH 14/22] change tcp paramenters Signed-off-by: Gabriele Santomaggio --- .github/workflows/build_and_test.yml | 166 +++++++++++++-------------- perfTest/cmd/silent.go | 2 +- pkg/stream/client.go | 15 ++- 3 files changed, 94 insertions(+), 89 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index da665c45..18bee984 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -4,89 +4,89 @@ on: push: jobs: - test: - runs-on: ubuntu-latest - strategy: - fail-fast: true - matrix: - go: [ '1.22'] - steps: - - name: Checkout - uses: actions/checkout@v4 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v3 - - name: Build and export - uses: docker/build-push-action@v5 - with: - context: . - file: CiDockerfile - tags: rabbitmq_tls:latest - outputs: type=docker,dest=/tmp/rabbitmq_tls.tar - - name: Upload artifact - uses: actions/upload-artifact@v4 - with: - name: rabbitmq_tls - path: /tmp/rabbitmq_tls.tar - - name: Download artifact - uses: actions/download-artifact@v4 - with: - name: rabbitmq_tls - path: /tmp - - name: Load image - run: | - docker load --input /tmp/rabbitmq_tls.tar - docker image ls -a - docker run -d --rm --name rabbitmq-stream-client-test \ - -p 5552:5552 -p 5672:5672 -p 5671:5671 -p 5551:5551 -p 15672:15672 \ - -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \ - rabbitmq_tls - - name: wait for running - run: | - docker exec rabbitmq-stream-client-test /bin/bash -c 'ps -aux' - docker exec rabbitmq-stream-client-test /bin/bash -c 'sleep 10' - docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl status' - docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl wait --pid 1 --timeout 70' - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - id: setup_go - with: - go-version: ${{ matrix.go }} - check-latest: true - - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} - - uses: actions/checkout@main - - uses: codecov/codecov-action@v4 - with: - fail_ci_if_error: false # optional (default = false) - files: ./coverage.txt - flags: unittests - name: codecov-umbrella # optional - verbose: true # optional (default = false) - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - test-win32: - runs-on: windows-latest - strategy: - matrix: - go: [ '1.22'] - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - id: setup_go - with: - go-version: ${{ matrix.go }} - check-latest: true - - name: Cache installers - uses: actions/cache@v4 - with: - # Note: the cache path is relative to the workspace directory - # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action - path: ~/installers - key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} - - name: Install and start RabbitMQ - run: ./.ci/install.ps1 - - name: Install GNU make - run: choco install make - - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} + # test: + # runs-on: ubuntu-latest + # strategy: + # fail-fast: true + # matrix: + # go: [ '1.22'] + # steps: + # - name: Checkout + # uses: actions/checkout@v4 + # - name: Set up Docker Buildx + # uses: docker/setup-buildx-action@v3 + # - name: Build and export + # uses: docker/build-push-action@v5 + # with: + # context: . + # file: CiDockerfile + # tags: rabbitmq_tls:latest + # outputs: type=docker,dest=/tmp/rabbitmq_tls.tar + # - name: Upload artifact + # uses: actions/upload-artifact@v4 + # with: + # name: rabbitmq_tls + # path: /tmp/rabbitmq_tls.tar + # - name: Download artifact + # uses: actions/download-artifact@v4 + # with: + # name: rabbitmq_tls + # path: /tmp + # - name: Load image + # run: | + # docker load --input /tmp/rabbitmq_tls.tar + # docker image ls -a + # docker run -d --rm --name rabbitmq-stream-client-test \ + # -p 5552:5552 -p 5672:5672 -p 5671:5671 -p 5551:5551 -p 15672:15672 \ + # -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \ + # rabbitmq_tls + # - name: wait for running + # run: | + # docker exec rabbitmq-stream-client-test /bin/bash -c 'ps -aux' + # docker exec rabbitmq-stream-client-test /bin/bash -c 'sleep 10' + # docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl status' + # docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl wait --pid 1 --timeout 70' + # - uses: actions/checkout@v4 + # - uses: actions/setup-go@v5 + # id: setup_go + # with: + # go-version: ${{ matrix.go }} + # check-latest: true + # - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} + # - uses: actions/checkout@main + # - uses: codecov/codecov-action@v4 + # with: + # fail_ci_if_error: false # optional (default = false) + # files: ./coverage.txt + # flags: unittests + # name: codecov-umbrella # optional + # verbose: true # optional (default = false) + # env: + # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + # test-win32: + # runs-on: windows-latest + # strategy: + # matrix: + # go: [ '1.22'] + # steps: + # - uses: actions/checkout@v4 + # - uses: actions/setup-go@v5 + # id: setup_go + # with: + # go-version: ${{ matrix.go }} + # check-latest: true + # - name: Cache installers + # uses: actions/cache@v4 + # with: + # # Note: the cache path is relative to the workspace directory + # # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action + # path: ~/installers + # key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} + # - name: Install and start RabbitMQ + # run: ./.ci/install.ps1 + # - name: Install GNU make + # run: choco install make + # - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} publish: runs-on: ubuntu-latest # tmp diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 25edc502..826a7ac9 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -92,7 +92,7 @@ func printStats() { ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 } p := gomsg.NewPrinter(language.English) - logInfo(p.Sprintf("++Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", + logInfo(p.Sprintf("1-Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency)) } } diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 9d24189e..e8012197 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -163,11 +163,16 @@ func (c *Client) connect() error { return errorConnection } - err = connection.SetDeadline(time.Now().Add(5 * time.Second)) - if err != nil { - logs.LogError("Failed to SetDeadline due to %v", err) - return err - } + //err = connection.SetKeepAlive(true) + //if err != nil { + // logs.LogError("Failed to SetKeepAlive due to %v", err) + // return err + //} + //err = connection.SetDeadline(time.Now().Add(5 * time.Second)) + //if err != nil { + // logs.LogError("Failed to SetDeadline due to %v", err) + // return err + //} //connection.SetDeadline(time.Now().Add(c.socketCallTimeout)) if err = connection.SetWriteBuffer(c.tcpParameters.WriteBuffer); err != nil { From 619dd2fe2a600e8d0ad3d55d0fc579e4d8882eb5 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 15:42:33 +0100 Subject: [PATCH 15/22] change tcp paramenters Signed-off-by: Gabriele Santomaggio --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 916814bc..f156d648 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,7 @@ perf-test-docker-build: perf-test-build $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test:$(VERSION) . perf-test-docker-push: perf-test-docker-build - $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:$(VERSION) + $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:1 RABBITMQ_OCI ?= rabbitmq:3-management BUILDKIT_RUN_ARGS ?= --pull always From b21e366aa1e7b2113f6570d27f64cb912a2a4c16 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 15:44:23 +0100 Subject: [PATCH 16/22] change tcp paramenters Signed-off-by: Gabriele Santomaggio --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index f156d648..68a59218 100644 --- a/Makefile +++ b/Makefile @@ -73,10 +73,10 @@ perf-test-build: BUILDKIT ?= docker perf-test-docker-build: perf-test-build - $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test:$(VERSION) . + $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test::v1 . perf-test-docker-push: perf-test-docker-build - $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:1 + $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:v1 RABBITMQ_OCI ?= rabbitmq:3-management BUILDKIT_RUN_ARGS ?= --pull always From 1dce7a1a614b43e2db53d948034ae867081126b1 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 15:45:39 +0100 Subject: [PATCH 17/22] v1 Signed-off-by: Gabriele Santomaggio --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 68a59218..2036bc05 100644 --- a/Makefile +++ b/Makefile @@ -73,7 +73,7 @@ perf-test-build: BUILDKIT ?= docker perf-test-docker-build: perf-test-build - $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test::v1 . + $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test:v1 . perf-test-docker-push: perf-test-docker-build $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:v1 From fd17da10bea4ed9e299aac783ae494973aa67d33 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 26 Nov 2024 16:30:50 +0100 Subject: [PATCH 18/22] v2 Signed-off-by: Gabriele Santomaggio --- Makefile | 4 ++-- pkg/stream/brokers.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 2036bc05..947d5e98 100644 --- a/Makefile +++ b/Makefile @@ -73,10 +73,10 @@ perf-test-build: BUILDKIT ?= docker perf-test-docker-build: perf-test-build - $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test:v1 . + $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test:v2 . perf-test-docker-push: perf-test-docker-build - $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:v1 + $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:v2 RABBITMQ_OCI ?= rabbitmq:3-management BUILDKIT_RUN_ARGS ?= --pull always diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index 99041e3a..2eb30a47 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -51,9 +51,9 @@ func newTCPParameterDefault() *TCPParameters { return &TCPParameters{ RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, - WriteBuffer: 65536, + WriteBuffer: 8192, //ReadBuffer: 65536, - ReadBuffer: 65536, + ReadBuffer: 8192, // 64k NoDelay: true, tlsConfig: nil, } From 4082b1b920e271edaf289476dbf380e4ace07914 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 27 Nov 2024 14:10:40 +0100 Subject: [PATCH 19/22] restore ci Signed-off-by: Gabriele Santomaggio --- .github/workflows/build_and_test.yml | 171 +++++++++++++-------------- Makefile | 4 +- pkg/stream/brokers.go | 7 +- pkg/stream/server_frame.go | 5 +- 4 files changed, 94 insertions(+), 93 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 18bee984..f8f44132 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -4,93 +4,92 @@ on: push: jobs: - # test: - # runs-on: ubuntu-latest - # strategy: - # fail-fast: true - # matrix: - # go: [ '1.22'] - # steps: - # - name: Checkout - # uses: actions/checkout@v4 - # - name: Set up Docker Buildx - # uses: docker/setup-buildx-action@v3 - # - name: Build and export - # uses: docker/build-push-action@v5 - # with: - # context: . - # file: CiDockerfile - # tags: rabbitmq_tls:latest - # outputs: type=docker,dest=/tmp/rabbitmq_tls.tar - # - name: Upload artifact - # uses: actions/upload-artifact@v4 - # with: - # name: rabbitmq_tls - # path: /tmp/rabbitmq_tls.tar - # - name: Download artifact - # uses: actions/download-artifact@v4 - # with: - # name: rabbitmq_tls - # path: /tmp - # - name: Load image - # run: | - # docker load --input /tmp/rabbitmq_tls.tar - # docker image ls -a - # docker run -d --rm --name rabbitmq-stream-client-test \ - # -p 5552:5552 -p 5672:5672 -p 5671:5671 -p 5551:5551 -p 15672:15672 \ - # -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \ - # rabbitmq_tls - # - name: wait for running - # run: | - # docker exec rabbitmq-stream-client-test /bin/bash -c 'ps -aux' - # docker exec rabbitmq-stream-client-test /bin/bash -c 'sleep 10' - # docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl status' - # docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl wait --pid 1 --timeout 70' - # - uses: actions/checkout@v4 - # - uses: actions/setup-go@v5 - # id: setup_go - # with: - # go-version: ${{ matrix.go }} - # check-latest: true - # - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} - # - uses: actions/checkout@main - # - uses: codecov/codecov-action@v4 - # with: - # fail_ci_if_error: false # optional (default = false) - # files: ./coverage.txt - # flags: unittests - # name: codecov-umbrella # optional - # verbose: true # optional (default = false) - # env: - # CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - # test-win32: - # runs-on: windows-latest - # strategy: - # matrix: - # go: [ '1.22'] - # steps: - # - uses: actions/checkout@v4 - # - uses: actions/setup-go@v5 - # id: setup_go - # with: - # go-version: ${{ matrix.go }} - # check-latest: true - # - name: Cache installers - # uses: actions/cache@v4 - # with: - # # Note: the cache path is relative to the workspace directory - # # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action - # path: ~/installers - # key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} - # - name: Install and start RabbitMQ - # run: ./.ci/install.ps1 - # - name: Install GNU make - # run: choco install make - # - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} - publish: + test: runs-on: ubuntu-latest - # tmp - # needs: [test] + strategy: + fail-fast: true + matrix: + go: [ '1.22'] + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Build and export + uses: docker/build-push-action@v5 + with: + context: . + file: CiDockerfile + tags: rabbitmq_tls:latest + outputs: type=docker,dest=/tmp/rabbitmq_tls.tar + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: rabbitmq_tls + path: /tmp/rabbitmq_tls.tar + - name: Download artifact + uses: actions/download-artifact@v4 + with: + name: rabbitmq_tls + path: /tmp + - name: Load image + run: | + docker load --input /tmp/rabbitmq_tls.tar + docker image ls -a + docker run -d --rm --name rabbitmq-stream-client-test \ + -p 5552:5552 -p 5672:5672 -p 5671:5671 -p 5551:5551 -p 15672:15672 \ + -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost" \ + rabbitmq_tls + - name: wait for running + run: | + docker exec rabbitmq-stream-client-test /bin/bash -c 'ps -aux' + docker exec rabbitmq-stream-client-test /bin/bash -c 'sleep 10' + docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl status' + docker exec rabbitmq-stream-client-test /bin/bash -c 'rabbitmqctl wait --pid 1 --timeout 70' + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + id: setup_go + with: + go-version: ${{ matrix.go }} + check-latest: true + - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} + - uses: actions/checkout@main + - uses: codecov/codecov-action@v4 + with: + fail_ci_if_error: false # optional (default = false) + files: ./coverage.txt + flags: unittests + name: codecov-umbrella # optional + verbose: true # optional (default = false) + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + test-win32: + runs-on: windows-latest + strategy: + matrix: + go: [ '1.22'] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + id: setup_go + with: + go-version: ${{ matrix.go }} + check-latest: true + - name: Cache installers + uses: actions/cache@v4 + with: + # Note: the cache path is relative to the workspace directory + # https://docs.github.com/en/actions/using-workflows/caching-dependencies-to-speed-up-workflows#using-the-cache-action + path: ~/installers + key: ${{ runner.os }}-v0-${{ hashFiles('.ci/versions.json') }} + - name: Install and start RabbitMQ + run: ./.ci/install.ps1 + - name: Install GNU make + run: choco install make + - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} + publish: + runs-on: ubuntu-latest + needs: [test] steps: - uses: docker/setup-buildx-action@v2 - uses: docker/login-action@v2 diff --git a/Makefile b/Makefile index 947d5e98..916814bc 100644 --- a/Makefile +++ b/Makefile @@ -73,10 +73,10 @@ perf-test-build: BUILDKIT ?= docker perf-test-docker-build: perf-test-build - $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test:v2 . + $(BUILDKIT) build -t pivotalrabbitmq/go-stream-perf-test:$(VERSION) . perf-test-docker-push: perf-test-docker-build - $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:v2 + $(BUILDKIT) push pivotalrabbitmq/go-stream-perf-test:$(VERSION) RABBITMQ_OCI ?= rabbitmq:3-management BUILDKIT_RUN_ARGS ?= --pull always diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index 2eb30a47..a3e5d943 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -52,10 +52,9 @@ func newTCPParameterDefault() *TCPParameters { RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, WriteBuffer: 8192, - //ReadBuffer: 65536, - ReadBuffer: 8192, // 64k - NoDelay: true, - tlsConfig: nil, + ReadBuffer: 8192, + NoDelay: true, + tlsConfig: nil, } } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0d889957..62f55050 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -409,10 +409,13 @@ func (c *Client) handleDeliver(r *bufio.Reader) { } } + // request a credit for the next chunk + c.credit(subscriptionId, 1) + + // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages if consumer.getStatus() == open { consumer.response.chunkForConsumer <- chunk - c.credit(subscriptionId, 1) } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName()) From c8a345a551faa8b423f4736f184730dd9304769a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 27 Nov 2024 14:26:15 +0100 Subject: [PATCH 20/22] set default batch send Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/commands.go | 4 ++-- perfTest/cmd/silent.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/perfTest/cmd/commands.go b/perfTest/cmd/commands.go index b9df70bc..9cb263c8 100644 --- a/perfTest/cmd/commands.go +++ b/perfTest/cmd/commands.go @@ -46,7 +46,7 @@ var ( crcCheck bool runDuration int initialCredits int - isBatchSend bool + isAsyncSend bool clientProvidedName string ) @@ -78,7 +78,7 @@ func setupCli(baseCmd *cobra.Command) { baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.") baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random") baseCmd.PersistentFlags().IntVarP(&initialCredits, "initial-credits", "", 10, "Consumer initial credits") - baseCmd.PersistentFlags().BoolVarP(&isBatchSend, "batch-send", "", false, "Enable batch send") + baseCmd.PersistentFlags().BoolVarP(&isAsyncSend, "async-send", "", false, "Enable the async send. By default it uses batchSend in this case is faster") baseCmd.PersistentFlags().StringVarP(&clientProvidedName, "client-provided-name", "", "", "Client provided name") baseCmd.AddCommand(versionCmd) baseCmd.AddCommand(newSilent()) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 826a7ac9..3187fee2 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -312,14 +312,14 @@ func startPublisher(streamName string) error { messages := buildMessages() atomic.AddInt64(&messagesSent, int64(len(messages))) - if isBatchSend { - err = prod.BatchSend(messages) - checkErr(err) - } else { + if isAsyncSend { for _, streamMessage := range messages { err = prod.Send(streamMessage) checkErr(err) } + } else { + err = prod.BatchSend(messages) + checkErr(err) } atomic.AddInt32(&publisherMessageCount, int32(len(messages))) From 3c95441e41e2ca4bf35fdfaf6304927a1cd4c19f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 27 Nov 2024 16:35:44 +0100 Subject: [PATCH 21/22] restore silent paramenters Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/silent.go | 8 ++++---- pkg/stream/client.go | 12 ------------ 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 3187fee2..a3f67ff0 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -334,20 +334,20 @@ func startPublisher(streamName string) error { func buildMessages() []message.StreamMessage { var arr []message.StreamMessage for z := 0; z < batchSize; z++ { - //var body []byte + var body []byte if fixedBody > 0 { - // body = make([]byte, fixedBody) + body = make([]byte, fixedBody) } else { if variableBody > 0 { rand.Seed(time.Now().UnixNano()) - // body = make([]byte, rand.Intn(variableBody)) + body = make([]byte, rand.Intn(variableBody)) } } var buff = make([]byte, 8) sentTime := time.Now().UnixMilli() binary.BigEndian.PutUint64(buff, uint64(sentTime)) /// added to calculate the latency - msg := amqp.NewMessage(buff) + msg := amqp.NewMessage(append(buff, body...)) arr = append(arr, msg) } return arr diff --git a/pkg/stream/client.go b/pkg/stream/client.go index e8012197..2c92e717 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -163,18 +163,6 @@ func (c *Client) connect() error { return errorConnection } - //err = connection.SetKeepAlive(true) - //if err != nil { - // logs.LogError("Failed to SetKeepAlive due to %v", err) - // return err - //} - //err = connection.SetDeadline(time.Now().Add(5 * time.Second)) - //if err != nil { - // logs.LogError("Failed to SetDeadline due to %v", err) - // return err - //} - //connection.SetDeadline(time.Now().Add(c.socketCallTimeout)) - if err = connection.SetWriteBuffer(c.tcpParameters.WriteBuffer); err != nil { logs.LogError("Failed to SetWriteBuffer to %d due to %v", c.tcpParameters.WriteBuffer, err) return err From d2d4d685f2da78f641e3e72e2518a227212869d0 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 27 Nov 2024 16:50:19 +0100 Subject: [PATCH 22/22] remove parameters Signed-off-by: Gabriele Santomaggio --- perfTest/cmd/silent.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index a3f67ff0..2467736e 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -45,7 +45,6 @@ var ( confirmedMessageCount int32 notConfirmedMessageCount int32 consumersCloseCount int32 - messagesSent int64 //connections []*stream.Client simulEnvironment *stream.Environment ) @@ -92,8 +91,8 @@ func printStats() { ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 } p := gomsg.NewPrinter(language.English) - logInfo(p.Sprintf("1-Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms", - PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency)) + logInfo(p.Sprintf("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %2v | %2v | latency: %d ms", + PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), averageLatency)) } } @@ -121,12 +120,12 @@ func decodeBody() string { if publishers > 0 { if fixedBody > 0 { - return fmt.Sprintf("Fixed Body: %d", fixedBody+8) + return fmt.Sprintf("Body sz: %d", fixedBody+8) } if variableBody > 0 { - return fmt.Sprintf("Variable Body: %d", variableBody) + return fmt.Sprintf("Body vsz: %d", variableBody) } - return fmt.Sprintf("Fixed Body: %d", len("simul_message")+8) + return fmt.Sprintf("Body sz: %d", 8) } else { return "ND" } @@ -135,12 +134,12 @@ func decodeBody() string { func decodeRate() string { if publishers > 0 { if rate > 0 { - return fmt.Sprintf("Fixed Rate: %d", rate) + return fmt.Sprintf("Rate Fx: %d", rate) } if variableRate > 0 { - return fmt.Sprintf("Variable Rate: %d", variableRate) + return fmt.Sprintf("Rate Vr: %d", variableRate) } - return "Full rate" + return "Full" } else { return "ND" } @@ -311,7 +310,6 @@ func startPublisher(streamName string) error { } messages := buildMessages() - atomic.AddInt64(&messagesSent, int64(len(messages))) if isAsyncSend { for _, streamMessage := range messages { err = prod.Send(streamMessage)