diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index ea87633d..f8f44132 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -87,3 +87,23 @@ jobs: - name: Install GNU make run: choco install make - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} + publish: + runs-on: ubuntu-latest + needs: [test] + steps: + - uses: docker/setup-buildx-action@v2 + - uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + - uses: actions/checkout@v3 + - name: Publish Docker Image + run: | + set -x + VERSION=latest + export VERSION + if [[ ! $GITHUB_REF =~ "/tags/" ]] + then + VERSION=dev + fi + make perf-test-docker-push diff --git a/Dockerfile b/Dockerfile index 3879ab8e..42016507 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.19 as builder +FROM golang:1.22 as builder ENV GOPATH=/go GOOS=linux CGO_ENABLED=0 WORKDIR /go/src/github.com/rabbitmq/rabbitmq-stream-go-client COPY go.mod go.sum VERSION ./ diff --git a/perfTest/cmd/commands.go b/perfTest/cmd/commands.go index 0ae7e0ce..9cb263c8 100644 --- a/perfTest/cmd/commands.go +++ b/perfTest/cmd/commands.go @@ -46,6 +46,8 @@ var ( crcCheck bool runDuration int initialCredits int + isAsyncSend bool + clientProvidedName string ) func init() { @@ -76,6 +78,8 @@ func setupCli(baseCmd *cobra.Command) { baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.") baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random") baseCmd.PersistentFlags().IntVarP(&initialCredits, "initial-credits", "", 10, "Consumer initial credits") + baseCmd.PersistentFlags().BoolVarP(&isAsyncSend, "async-send", "", false, "Enable the async send. By default it uses batchSend in this case is faster") + baseCmd.PersistentFlags().StringVarP(&clientProvidedName, "client-provided-name", "", "", "Client provided name") baseCmd.AddCommand(versionCmd) baseCmd.AddCommand(newSilent()) } diff --git a/perfTest/cmd/silent.go b/perfTest/cmd/silent.go index 6616d7b1..2467736e 100644 --- a/perfTest/cmd/silent.go +++ b/perfTest/cmd/silent.go @@ -9,6 +9,8 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "github.com/spf13/cobra" + "golang.org/x/text/language" + gomsg "golang.org/x/text/message" "math/rand" "os" "sync" @@ -36,12 +38,13 @@ func newSilent() *cobra.Command { } var ( - publisherMessageCount int32 - consumerMessageCount int32 + publisherMessageCount int32 + consumerMessageCount int32 + //consumerMessageCountPerLatency int32 + totalLatency int64 confirmedMessageCount int32 notConfirmedMessageCount int32 consumersCloseCount int32 - messagesSent int64 //connections []*stream.Client simulEnvironment *stream.Environment ) @@ -74,12 +77,22 @@ func printStats() { select { case _ = <-ticker.C: v := time.Now().Sub(start).Milliseconds() + PMessagesPerSecond := float64(0) + if publisherMessageCount > 0 { + PMessagesPerSecond = float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 + } - PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000 - CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 - ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 - logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v |", - PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent)) + averageLatency := int64(0) + CMessagesPerSecond := float64(0) + ConfirmedMessagesPerSecond := float64(0) + if atomic.LoadInt32(&consumerMessageCount) > 0 { + CMessagesPerSecond = float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000 + averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount)) + ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000 + } + p := gomsg.NewPrinter(language.English) + logInfo(p.Sprintf("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %2v | %2v | latency: %d ms", + PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), averageLatency)) } } @@ -89,11 +102,12 @@ func printStats() { for { select { case _ = <-tickerReset.C: - - atomic.SwapInt32(&publisherMessageCount, 0) + logInfo("***********Resetting counters***********") atomic.SwapInt32(&consumerMessageCount, 0) - atomic.SwapInt32(&confirmedMessageCount, 0) atomic.SwapInt32(¬ConfirmedMessageCount, 0) + atomic.SwapInt32(&confirmedMessageCount, 0) + atomic.SwapInt32(&publisherMessageCount, 0) + atomic.SwapInt64(&totalLatency, 0) start = time.Now() } } @@ -106,12 +120,12 @@ func decodeBody() string { if publishers > 0 { if fixedBody > 0 { - return fmt.Sprintf("Fixed Body: %d", fixedBody+8) + return fmt.Sprintf("Body sz: %d", fixedBody+8) } if variableBody > 0 { - return fmt.Sprintf("Variable Body: %d", variableBody) + return fmt.Sprintf("Body vsz: %d", variableBody) } - return fmt.Sprintf("Fixed Body: %d", len("simul_message")+8) + return fmt.Sprintf("Body sz: %d", 8) } else { return "ND" } @@ -120,12 +134,12 @@ func decodeBody() string { func decodeRate() string { if publishers > 0 { if rate > 0 { - return fmt.Sprintf("Fixed Rate: %d", rate) + return fmt.Sprintf("Rate Fx: %d", rate) } if variableRate > 0 { - return fmt.Sprintf("Variable Rate: %d", variableRate) + return fmt.Sprintf("Rate Vr: %d", variableRate) } - return "Full rate" + return "Full" } else { return "ND" } @@ -151,8 +165,9 @@ func startSimulation() error { err := initStreams() checkErr(err) + // simulEnvironment, err = stream.NewEnvironment(stream.NewEnvironmentOptions(). - SetUris(rabbitmqBrokerUrl). + SetUri(rabbitmqBrokerUrl[0]). SetMaxProducersPerClient(publishersPerClient). SetMaxConsumersPerClient(consumersPerClient)) checkErr(err) @@ -258,12 +273,11 @@ func startPublisher(streamName string) error { if compression == "zstd" { cp = stream.Compression{}.Zstd() } - producerOptions.SetSubEntrySize(subEntrySize).SetCompression(cp) - logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp) } + producerOptions.SetClientProvidedName(clientProvidedName) rPublisher, err := ha.NewReliableProducer(simulEnvironment, streamName, producerOptions, @@ -273,28 +287,9 @@ func startPublisher(streamName string) error { return err } - var arr []message.StreamMessage - var body []byte - for z := 0; z < batchSize; z++ { - - if fixedBody > 0 { - body = make([]byte, fixedBody) - } else { - if variableBody > 0 { - rand.Seed(time.Now().UnixNano()) - body = make([]byte, rand.Intn(variableBody)) - } - } - n := time.Now().UnixNano() - var buff = make([]byte, 8) - binary.BigEndian.PutUint64(buff, uint64(n)) - /// added to calculate the latency - msg := amqp.NewMessage(append(buff, body...)) - arr = append(arr, msg) - } - - go func(prod *ha.ReliableProducer, messages []message.StreamMessage) { + go func(prod *ha.ReliableProducer) { for { + if rate > 0 { rateWithBatchSize := float64(rate) / float64(batchSize) sleepAfterMessage := float64(time.Second) / rateWithBatchSize @@ -313,21 +308,49 @@ func startPublisher(streamName string) error { } time.Sleep(time.Duration(sleep) * time.Millisecond) } + messages := buildMessages() - atomic.AddInt64(&messagesSent, int64(len(arr))) - for _, streamMessage := range arr { - err = prod.Send(streamMessage) + if isAsyncSend { + for _, streamMessage := range messages { + err = prod.Send(streamMessage) + checkErr(err) + } + } else { + err = prod.BatchSend(messages) checkErr(err) } - atomic.AddInt32(&publisherMessageCount, int32(len(arr))) + + atomic.AddInt32(&publisherMessageCount, int32(len(messages))) } - }(rPublisher, arr) + }(rPublisher) return nil } +func buildMessages() []message.StreamMessage { + var arr []message.StreamMessage + for z := 0; z < batchSize; z++ { + var body []byte + if fixedBody > 0 { + body = make([]byte, fixedBody) + } else { + if variableBody > 0 { + rand.Seed(time.Now().UnixNano()) + body = make([]byte, rand.Intn(variableBody)) + } + } + var buff = make([]byte, 8) + sentTime := time.Now().UnixMilli() + binary.BigEndian.PutUint64(buff, uint64(sentTime)) + /// added to calculate the latency + msg := amqp.NewMessage(append(buff, body...)) + arr = append(arr, msg) + } + return arr +} + func startPublishers() error { logInfo("Starting %d publishers...", publishers) @@ -362,8 +385,12 @@ func handleConsumerClose(channelClose stream.ChannelClose) { func startConsumer(consumerName string, streamName string) error { handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - atomic.AddInt32(&consumerMessageCount, 1) + sentTime := binary.BigEndian.Uint64(message.GetData()[:8]) // Decode the timestamp + startTimeFromMessage := time.UnixMilli(int64(sentTime)) + latency := time.Now().Sub(startTimeFromMessage).Milliseconds() + totalLatency += latency + atomic.AddInt32(&consumerMessageCount, 1) } offsetSpec := stream.OffsetSpecification{}.Last() switch consumerOffset { @@ -393,6 +420,7 @@ func startConsumer(consumerName string, streamName string) error { handleMessages, stream.NewConsumerOptions(). SetConsumerName(consumerName). + SetClientProvidedName(clientProvidedName). SetOffset(offsetSpec). SetCRCCheck(crcCheck). SetInitialCredits(int16(initialCredits))) diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index 3a7e5f59..a3e5d943 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -52,8 +52,8 @@ func newTCPParameterDefault() *TCPParameters { RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, WriteBuffer: 8192, - ReadBuffer: 65536, - NoDelay: false, + ReadBuffer: 8192, + NoDelay: true, tlsConfig: nil, } } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 96f0d8e8..62f55050 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -321,8 +321,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) { _, _ = readUInt(r) _, _ = readUInt(r) - c.credit(subscriptionId, 1) - var offsetLimit int64 = -1 // we can have two cases @@ -411,6 +409,10 @@ func (c *Client) handleDeliver(r *bufio.Reader) { } } + // request a credit for the next chunk + c.credit(subscriptionId, 1) + + // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages if consumer.getStatus() == open { consumer.response.chunkForConsumer <- chunk