Skip to content

performances documentation #360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CiDockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rabbitmq:3.13-management
FROM rabbitmq:4-management

COPY .ci/conf/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf
COPY .ci/conf/enabled_plugins /etc/rabbitmq/enabled_plugins
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ The `BatchSend` is the primitive to send the messages, `Send` introduces a smart

The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ).

See also "Client performances" example in the [examples](./examples/performances/) directory

### Publish Confirmation

For each publish the server sends back to the client the confirmation or an error.
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ Stream examples
- [Single Active Consumer](./single_active_consumer) - Single Active Consumer example
- [Reliable](./reliable) - Reliable Producer and Reliable Consumer example
- [Super Stream](./super_stream) - Super Stream example with Single Active Consumer
- [Client performances](./performances) - Client performances example

38 changes: 38 additions & 0 deletions examples/performances/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## Client performances

This document describes how to tune the parameters of the client to:
- Increase the throughput
- And/or reduce the latency
- And/or reduce the disk space used by the messages.

### Throughput and Latency

The parameters that can be tuned are:
- `SetBatchSize(batchSize)` and `SetBatchPublishingDelay(100)` when use the `Send()` method
- The number of the messages when use the `BatchSend()` method

In this example you can play with the parameters to see the impact on the performances.
There is not a magic formula to find the best parameters, you need to test and find the best values for your use case.

### How to run the example
```
go run performances.go async 1000000 100;
```

### Disk space used by the messages

The client supports also the batch entry size and the compression:
`SetSubEntrySize(500).SetCompression(stream.Compression{}...`
These parameters can be used to reduce the space used by the messages due of the compression and the batch entry size.


### Default values

The default producer values are meant to be a good trade-off between throughput and latency.
You can tune the parameters to increase the throughput, reduce the latency or reduce the disk space used by the messages.



### Load tests
To execute load tests, you can use the official load test tool:
https://github.com/rabbitmq/rabbitmq-stream-perf-test
76 changes: 56 additions & 20 deletions examples/performances/performances.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"os"
"strconv"
"sync/atomic"
"time"
)
Expand All @@ -34,9 +35,17 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
}

func main() {
reader := bufio.NewReader(os.Stdin)

fmt.Println("RabbitMQ Sub Entry Batch example")
useSyncBatch := os.Args[1] == "sync"
useAsyncSend := os.Args[1] == "async"

messagesToSend, err := strconv.Atoi(os.Args[2])
CheckErr(err)
batchSize, err := strconv.Atoi(os.Args[3])
messagesToSend = messagesToSend / batchSize

reader := bufio.NewReader(os.Stdin)
fmt.Println("RabbitMQ performance example")

// Connect to the broker ( or brokers )
env, err := stream.NewEnvironment(
Expand All @@ -46,7 +55,6 @@ func main() {
SetUser("guest").
SetPassword("guest"))
CheckErr(err)
fmt.Printf("------------------------------------------\n\n")
fmt.Println("Connected to the RabbitMQ server")

streamName := uuid.New().String()
Expand All @@ -56,38 +64,66 @@ func main() {
},
)
CheckErr(err)
fmt.Printf("------------------------------------------\n\n")
fmt.Printf("Created Stream: %s \n", streamName)

producer, err := env.NewProducer(streamName, stream.NewProducerOptions().
SetSubEntrySize(500).
SetCompression(stream.Compression{}.None()))
producer, err := env.NewProducer(streamName,
stream.NewProducerOptions().
SetBatchSize(batchSize).
SetBatchPublishingDelay(100))
CheckErr(err)

//optional publish confirmation channel
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm)
messagesToSend := 20_000
fmt.Printf("------------------------------------------\n\n")
fmt.Printf("Start sending %d messages, data size: %d bytes\n", messagesToSend, len("hello_world"))
batchSize := 100
var arr []message.StreamMessage
for i := 0; i < batchSize; i++ {
arr = append(arr, amqp.NewMessage([]byte("hello_world")))
fmt.Printf("Start sending %d messages, data size: %d bytes\n", messagesToSend*batchSize, len("hello_world"))
var averageLatency time.Duration
var messagesConsumed int32
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&messagesConsumed, 1)
var latency time.Time
err := latency.UnmarshalBinary(message.Data[0])
CheckErr(err)
averageLatency += time.Since(latency)
}
_, err = env.NewConsumer(streamName, handleMessages, stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()))
CheckErr(err)

start := time.Now()
for i := 0; i < messagesToSend; i++ {
err := producer.BatchSend(arr)
CheckErr(err)

// here the client sends the messages in batch and it is up to the user to aggregate the messages
if useSyncBatch {
var arr []message.StreamMessage
for i := 0; i < messagesToSend; i++ {
for i := 0; i < batchSize; i++ {
latency, err := time.Now().MarshalBinary()
CheckErr(err)
arr = append(arr, amqp.NewMessage(latency))
}
err := producer.BatchSend(arr)
CheckErr(err)
arr = arr[:0]
}
}

// here the client aggregates the messages based on the batch size and batch publishing delay
if useAsyncSend {
for i := 0; i < messagesToSend; i++ {
for i := 0; i < batchSize; i++ {
latency, err := time.Now().MarshalBinary()
CheckErr(err)
err = producer.Send(amqp.NewMessage(latency))
CheckErr(err)
}
}
}

duration := time.Since(start)
fmt.Println("Press any key to report and stop ")
_, _ = reader.ReadString('\n')
fmt.Printf("------------------------------------------\n\n")
fmt.Printf("Sent %d messages in %s \n", messagesToSend*100, duration)
fmt.Printf("Sent %d messages in %s. Confirmed: %d avarage latency: %s \n", messagesToSend*batchSize, duration, messagesConfirmed, averageLatency/time.Duration(messagesConsumed))
fmt.Printf("------------------------------------------\n\n")

fmt.Println("Press any key to stop ")
_, _ = reader.ReadString('\n')
time.Sleep(200 * time.Millisecond)
CheckErr(err)
err = env.DeleteStream(streamName)
Expand Down
Loading