From eb3e36e5d6914bb46f83651edb4dfe9f2416cf2a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 28 Oct 2024 15:48:51 +0100 Subject: [PATCH] performances documentation Signed-off-by: Gabriele Santomaggio --- CiDockerfile | 2 +- README.md | 2 + examples/README.md | 1 + examples/performances/README.md | 38 ++++++++++++++ examples/performances/performances.go | 76 ++++++++++++++++++++------- 5 files changed, 98 insertions(+), 21 deletions(-) create mode 100644 examples/performances/README.md diff --git a/CiDockerfile b/CiDockerfile index 45f0553d..c24c67c2 100644 --- a/CiDockerfile +++ b/CiDockerfile @@ -1,4 +1,4 @@ -FROM rabbitmq:3.13-management +FROM rabbitmq:4-management COPY .ci/conf/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf COPY .ci/conf/enabled_plugins /etc/rabbitmq/enabled_plugins diff --git a/README.md b/README.md index 55c7138e..ed03c043 100644 --- a/README.md +++ b/README.md @@ -300,6 +300,8 @@ The `BatchSend` is the primitive to send the messages, `Send` introduces a smart The `Send` interface works in most of the cases, In some condition is about 15/20 slower than `BatchSend`. See also this [thread](https://groups.google.com/g/rabbitmq-users/c/IO_9-BbCzgQ). +See also "Client performances" example in the [examples](./examples/performances/) directory + ### Publish Confirmation For each publish the server sends back to the client the confirmation or an error. diff --git a/examples/README.md b/examples/README.md index 5e950b93..3abbfd2f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -16,4 +16,5 @@ Stream examples - [Single Active Consumer](./single_active_consumer) - Single Active Consumer example - [Reliable](./reliable) - Reliable Producer and Reliable Consumer example - [Super Stream](./super_stream) - Super Stream example with Single Active Consumer + - [Client performances](./performances) - Client performances example diff --git a/examples/performances/README.md b/examples/performances/README.md new file mode 100644 index 00000000..9059fe41 --- /dev/null +++ b/examples/performances/README.md @@ -0,0 +1,38 @@ +## Client performances + +This document describes how to tune the parameters of the client to: +- Increase the throughput +- And/or reduce the latency +- And/or reduce the disk space used by the messages. + +### Throughput and Latency + +The parameters that can be tuned are: +- `SetBatchSize(batchSize)` and `SetBatchPublishingDelay(100)` when use the `Send()` method +- The number of the messages when use the `BatchSend()` method + +In this example you can play with the parameters to see the impact on the performances. +There is not a magic formula to find the best parameters, you need to test and find the best values for your use case. + +### How to run the example +``` +go run performances.go async 1000000 100; +``` + +### Disk space used by the messages + +The client supports also the batch entry size and the compression: +`SetSubEntrySize(500).SetCompression(stream.Compression{}...` +These parameters can be used to reduce the space used by the messages due of the compression and the batch entry size. + + +### Default values + +The default producer values are meant to be a good trade-off between throughput and latency. +You can tune the parameters to increase the throughput, reduce the latency or reduce the disk space used by the messages. + + + +### Load tests +To execute load tests, you can use the official load test tool: +https://github.com/rabbitmq/rabbitmq-stream-perf-test \ No newline at end of file diff --git a/examples/performances/performances.go b/examples/performances/performances.go index f215b3c5..3bdadfb6 100644 --- a/examples/performances/performances.go +++ b/examples/performances/performances.go @@ -8,6 +8,7 @@ import ( "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "os" + "strconv" "sync/atomic" "time" ) @@ -34,9 +35,17 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) { } func main() { - reader := bufio.NewReader(os.Stdin) - fmt.Println("RabbitMQ Sub Entry Batch example") + useSyncBatch := os.Args[1] == "sync" + useAsyncSend := os.Args[1] == "async" + + messagesToSend, err := strconv.Atoi(os.Args[2]) + CheckErr(err) + batchSize, err := strconv.Atoi(os.Args[3]) + messagesToSend = messagesToSend / batchSize + + reader := bufio.NewReader(os.Stdin) + fmt.Println("RabbitMQ performance example") // Connect to the broker ( or brokers ) env, err := stream.NewEnvironment( @@ -46,7 +55,6 @@ func main() { SetUser("guest"). SetPassword("guest")) CheckErr(err) - fmt.Printf("------------------------------------------\n\n") fmt.Println("Connected to the RabbitMQ server") streamName := uuid.New().String() @@ -56,38 +64,66 @@ func main() { }, ) CheckErr(err) - fmt.Printf("------------------------------------------\n\n") fmt.Printf("Created Stream: %s \n", streamName) - producer, err := env.NewProducer(streamName, stream.NewProducerOptions(). - SetSubEntrySize(500). - SetCompression(stream.Compression{}.None())) + producer, err := env.NewProducer(streamName, + stream.NewProducerOptions(). + SetBatchSize(batchSize). + SetBatchPublishingDelay(100)) CheckErr(err) - //optional publish confirmation channel chPublishConfirm := producer.NotifyPublishConfirmation() handlePublishConfirm(chPublishConfirm) - messagesToSend := 20_000 fmt.Printf("------------------------------------------\n\n") - fmt.Printf("Start sending %d messages, data size: %d bytes\n", messagesToSend, len("hello_world")) - batchSize := 100 - var arr []message.StreamMessage - for i := 0; i < batchSize; i++ { - arr = append(arr, amqp.NewMessage([]byte("hello_world"))) + fmt.Printf("Start sending %d messages, data size: %d bytes\n", messagesToSend*batchSize, len("hello_world")) + var averageLatency time.Duration + var messagesConsumed int32 + handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { + atomic.AddInt32(&messagesConsumed, 1) + var latency time.Time + err := latency.UnmarshalBinary(message.Data[0]) + CheckErr(err) + averageLatency += time.Since(latency) } + _, err = env.NewConsumer(streamName, handleMessages, stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First())) + CheckErr(err) start := time.Now() - for i := 0; i < messagesToSend; i++ { - err := producer.BatchSend(arr) - CheckErr(err) + + // here the client sends the messages in batch and it is up to the user to aggregate the messages + if useSyncBatch { + var arr []message.StreamMessage + for i := 0; i < messagesToSend; i++ { + for i := 0; i < batchSize; i++ { + latency, err := time.Now().MarshalBinary() + CheckErr(err) + arr = append(arr, amqp.NewMessage(latency)) + } + err := producer.BatchSend(arr) + CheckErr(err) + arr = arr[:0] + } + } + + // here the client aggregates the messages based on the batch size and batch publishing delay + if useAsyncSend { + for i := 0; i < messagesToSend; i++ { + for i := 0; i < batchSize; i++ { + latency, err := time.Now().MarshalBinary() + CheckErr(err) + err = producer.Send(amqp.NewMessage(latency)) + CheckErr(err) + } + } } + duration := time.Since(start) + fmt.Println("Press any key to report and stop ") + _, _ = reader.ReadString('\n') fmt.Printf("------------------------------------------\n\n") - fmt.Printf("Sent %d messages in %s \n", messagesToSend*100, duration) + fmt.Printf("Sent %d messages in %s. Confirmed: %d avarage latency: %s \n", messagesToSend*batchSize, duration, messagesConfirmed, averageLatency/time.Duration(messagesConsumed)) fmt.Printf("------------------------------------------\n\n") - fmt.Println("Press any key to stop ") - _, _ = reader.ReadString('\n') time.Sleep(200 * time.Millisecond) CheckErr(err) err = env.DeleteStream(streamName)