Skip to content

latency on perftest #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,23 @@ jobs:
- name: Install GNU make
run: choco install make
- run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }}
publish:
runs-on: ubuntu-latest
needs: [test]
steps:
- uses: docker/setup-buildx-action@v2
- uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: actions/checkout@v3
- name: Publish Docker Image
run: |
set -x
VERSION=latest
export VERSION
if [[ ! $GITHUB_REF =~ "/tags/" ]]
then
VERSION=dev
fi
make perf-test-docker-push
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.19 as builder
FROM golang:1.22 as builder
ENV GOPATH=/go GOOS=linux CGO_ENABLED=0
WORKDIR /go/src/github.com/rabbitmq/rabbitmq-stream-go-client
COPY go.mod go.sum VERSION ./
Expand Down
4 changes: 4 additions & 0 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ var (
crcCheck bool
runDuration int
initialCredits int
isAsyncSend bool
clientProvidedName string
)

func init() {
Expand Down Expand Up @@ -76,6 +78,8 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.")
baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random")
baseCmd.PersistentFlags().IntVarP(&initialCredits, "initial-credits", "", 10, "Consumer initial credits")
baseCmd.PersistentFlags().BoolVarP(&isAsyncSend, "async-send", "", false, "Enable the async send. By default it uses batchSend in this case is faster")
baseCmd.PersistentFlags().StringVarP(&clientProvidedName, "client-provided-name", "", "", "Client provided name")
baseCmd.AddCommand(versionCmd)
baseCmd.AddCommand(newSilent())
}
Expand Down
122 changes: 75 additions & 47 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"github.com/spf13/cobra"
"golang.org/x/text/language"
gomsg "golang.org/x/text/message"
"math/rand"
"os"
"sync"
Expand Down Expand Up @@ -36,12 +38,13 @@ func newSilent() *cobra.Command {
}

var (
publisherMessageCount int32
consumerMessageCount int32
publisherMessageCount int32
consumerMessageCount int32
//consumerMessageCountPerLatency int32
totalLatency int64
confirmedMessageCount int32
notConfirmedMessageCount int32
consumersCloseCount int32
messagesSent int64
//connections []*stream.Client
simulEnvironment *stream.Environment
)
Expand Down Expand Up @@ -74,12 +77,22 @@ func printStats() {
select {
case _ = <-ticker.C:
v := time.Now().Sub(start).Milliseconds()
PMessagesPerSecond := float64(0)
if publisherMessageCount > 0 {
PMessagesPerSecond = float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
}

PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v |",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent))
averageLatency := int64(0)
CMessagesPerSecond := float64(0)
ConfirmedMessagesPerSecond := float64(0)
if atomic.LoadInt32(&consumerMessageCount) > 0 {
CMessagesPerSecond = float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount))
ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
}
p := gomsg.NewPrinter(language.English)
logInfo(p.Sprintf("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %2v | %2v | latency: %d ms",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), averageLatency))
}
}

Expand All @@ -89,11 +102,12 @@ func printStats() {
for {
select {
case _ = <-tickerReset.C:

atomic.SwapInt32(&publisherMessageCount, 0)
logInfo("***********Resetting counters***********")
atomic.SwapInt32(&consumerMessageCount, 0)
atomic.SwapInt32(&confirmedMessageCount, 0)
atomic.SwapInt32(&notConfirmedMessageCount, 0)
atomic.SwapInt32(&confirmedMessageCount, 0)
atomic.SwapInt32(&publisherMessageCount, 0)
atomic.SwapInt64(&totalLatency, 0)
start = time.Now()
}
}
Expand All @@ -106,12 +120,12 @@ func decodeBody() string {
if publishers > 0 {

if fixedBody > 0 {
return fmt.Sprintf("Fixed Body: %d", fixedBody+8)
return fmt.Sprintf("Body sz: %d", fixedBody+8)
}
if variableBody > 0 {
return fmt.Sprintf("Variable Body: %d", variableBody)
return fmt.Sprintf("Body vsz: %d", variableBody)
}
return fmt.Sprintf("Fixed Body: %d", len("simul_message")+8)
return fmt.Sprintf("Body sz: %d", 8)
} else {
return "ND"
}
Expand All @@ -120,12 +134,12 @@ func decodeBody() string {
func decodeRate() string {
if publishers > 0 {
if rate > 0 {
return fmt.Sprintf("Fixed Rate: %d", rate)
return fmt.Sprintf("Rate Fx: %d", rate)
}
if variableRate > 0 {
return fmt.Sprintf("Variable Rate: %d", variableRate)
return fmt.Sprintf("Rate Vr: %d", variableRate)
}
return "Full rate"
return "Full"
} else {
return "ND"
}
Expand All @@ -151,8 +165,9 @@ func startSimulation() error {
err := initStreams()
checkErr(err)

//
simulEnvironment, err = stream.NewEnvironment(stream.NewEnvironmentOptions().
SetUris(rabbitmqBrokerUrl).
SetUri(rabbitmqBrokerUrl[0]).
SetMaxProducersPerClient(publishersPerClient).
SetMaxConsumersPerClient(consumersPerClient))
checkErr(err)
Expand Down Expand Up @@ -258,12 +273,11 @@ func startPublisher(streamName string) error {
if compression == "zstd" {
cp = stream.Compression{}.Zstd()
}

producerOptions.SetSubEntrySize(subEntrySize).SetCompression(cp)

logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp)
}

producerOptions.SetClientProvidedName(clientProvidedName)
rPublisher, err := ha.NewReliableProducer(simulEnvironment,
streamName,
producerOptions,
Expand All @@ -273,28 +287,9 @@ func startPublisher(streamName string) error {
return err
}

var arr []message.StreamMessage
var body []byte
for z := 0; z < batchSize; z++ {

if fixedBody > 0 {
body = make([]byte, fixedBody)
} else {
if variableBody > 0 {
rand.Seed(time.Now().UnixNano())
body = make([]byte, rand.Intn(variableBody))
}
}
n := time.Now().UnixNano()
var buff = make([]byte, 8)
binary.BigEndian.PutUint64(buff, uint64(n))
/// added to calculate the latency
msg := amqp.NewMessage(append(buff, body...))
arr = append(arr, msg)
}

go func(prod *ha.ReliableProducer, messages []message.StreamMessage) {
go func(prod *ha.ReliableProducer) {
for {

if rate > 0 {
rateWithBatchSize := float64(rate) / float64(batchSize)
sleepAfterMessage := float64(time.Second) / rateWithBatchSize
Expand All @@ -313,21 +308,49 @@ func startPublisher(streamName string) error {
}
time.Sleep(time.Duration(sleep) * time.Millisecond)
}
messages := buildMessages()

atomic.AddInt64(&messagesSent, int64(len(arr)))
for _, streamMessage := range arr {
err = prod.Send(streamMessage)
if isAsyncSend {
for _, streamMessage := range messages {
err = prod.Send(streamMessage)
checkErr(err)
}
} else {
err = prod.BatchSend(messages)
checkErr(err)
}
atomic.AddInt32(&publisherMessageCount, int32(len(arr)))

atomic.AddInt32(&publisherMessageCount, int32(len(messages)))

}
}(rPublisher, arr)
}(rPublisher)

return nil

}

func buildMessages() []message.StreamMessage {
var arr []message.StreamMessage
for z := 0; z < batchSize; z++ {
var body []byte
if fixedBody > 0 {
body = make([]byte, fixedBody)
} else {
if variableBody > 0 {
rand.Seed(time.Now().UnixNano())
body = make([]byte, rand.Intn(variableBody))
}
}
var buff = make([]byte, 8)
sentTime := time.Now().UnixMilli()
binary.BigEndian.PutUint64(buff, uint64(sentTime))
/// added to calculate the latency
msg := amqp.NewMessage(append(buff, body...))
arr = append(arr, msg)
}
return arr
}

func startPublishers() error {

logInfo("Starting %d publishers...", publishers)
Expand Down Expand Up @@ -362,8 +385,12 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
func startConsumer(consumerName string, streamName string) error {

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&consumerMessageCount, 1)

sentTime := binary.BigEndian.Uint64(message.GetData()[:8]) // Decode the timestamp
startTimeFromMessage := time.UnixMilli(int64(sentTime))
latency := time.Now().Sub(startTimeFromMessage).Milliseconds()
totalLatency += latency
atomic.AddInt32(&consumerMessageCount, 1)
}
offsetSpec := stream.OffsetSpecification{}.Last()
switch consumerOffset {
Expand Down Expand Up @@ -393,6 +420,7 @@ func startConsumer(consumerName string, streamName string) error {
handleMessages,
stream.NewConsumerOptions().
SetConsumerName(consumerName).
SetClientProvidedName(clientProvidedName).
SetOffset(offsetSpec).
SetCRCCheck(crcCheck).
SetInitialCredits(int16(initialCredits)))
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func newTCPParameterDefault() *TCPParameters {
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: 1048576,
WriteBuffer: 8192,
ReadBuffer: 65536,
NoDelay: false,
ReadBuffer: 8192,
NoDelay: true,
tlsConfig: nil,
}
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
_, _ = readUInt(r)
_, _ = readUInt(r)

c.credit(subscriptionId, 1)

var offsetLimit int64 = -1

// we can have two cases
Expand Down Expand Up @@ -411,6 +409,10 @@ func (c *Client) handleDeliver(r *bufio.Reader) {

}
}
// request a credit for the next chunk
c.credit(subscriptionId, 1)

// dispatch the messages with offset to the consumer
chunk.offsetMessages = batchConsumingMessages
if consumer.getStatus() == open {
consumer.response.chunkForConsumer <- chunk
Expand Down
Loading