diff --git a/best_practices/README.md b/best_practices/README.md index cb3512f1..01507d0a 100644 --- a/best_practices/README.md +++ b/best_practices/README.md @@ -1,10 +1,9 @@ -Client best practices -===================== +# Client best practices The scope of this document is to provide a set of best practices for the client applications that use the Go client library.
- #### General recommendations + - Messages are not thread-safe, you should not share the same message between different go-routines or different Send/BatchSend calls. - Use the producer name only if you need deduplication. - Avoid to store the consumer offset to the server too often. @@ -21,12 +20,15 @@ No particular tuning is required. Just follow the [Getting started](../examples/ Each connection can support multiple producers and consumers, you can reduce the number of connections by using the same connection for multiple producers and consumers.
With: + ```golang SetMaxConsumersPerClient(10). SetMaxConsumersPerClient(10) ``` + The TCP connection will be shared between the producers and consumers. Note about consumers: One slow consumer can block the others, so it is important: + - To have a good balance between the number of consumers and the speed of the consumers. - work application side to avoid slow consumers, for example, by using a go-routines/buffers. @@ -35,7 +37,7 @@ Note about consumers: One slow consumer can block the others, so it is important To achieve high throughput, you should use one producer per connection, and one consumer per connection. This will avoid lock contention between the producers when sending messages and between the consumers when receiving messages. -The method `Send` is usually enough to achieve high throughput. +The method `Send` is usually enough to achieve high throughput. In some case you can use the `BatchSend` method. See the `Send` vs `BatchSend` documentation for more details. #### Low latency @@ -46,11 +48,11 @@ The method `Send` is the best choice to achieve low latency. Default values are You can change the `BatchSize` parameter to increase or reduce the max number of messages sent in one batch. Note: Since the client uses dynamic send, the `BatchSize` parameter is a hint to the client, the client can send less than the `BatchSize`. -#### Store several text based messages +#### Store several text based messages In case you want to store logs, text-based or big messages, you can use the `Sub Entries Batching` method. Where it is possible to store multiple messages in one entry and compress the entry with different algorithms. -It is useful to reduce the disk space and the network bandwidth. +It is useful to reduce the disk space and the network bandwidth. See the `Sub Entries Batching` documentation for more details.
#### Store several small messages @@ -58,13 +60,11 @@ See the `Sub Entries Batching` documentation for more details.
In case you want to store a lot of small messages, you can use the `BatchSend` method. Where it is possible to store multiple messages in one entry. This will avoid creating small chunks on the server side.
- #### Avoid duplications In case you want to store messages with deduplication, you need to set the producer name and the deduplication id. See the `Deduplication` documentation for more details.
- #### Consumer fail over In case you want to have a consumer fail over, you can use the `Single Active Consumer` method. @@ -75,32 +75,34 @@ Where only one consumer is active at a time, and the other consumers are in stan The client library provides a reliable producer and consumer, where the producer and consumer can recover from a connection failure. See the `Reliable` documentation for more details.
- #### Scaling the streams In case you want to scale the streams, you can use the `Super Stream` method. Where you can have multiple streams and only one stream is active at a time. See the `Super Stream` documentation for more details.
- #### Filtering the data when consuming In case you want to filter the data when consuming, you can use the `Stream Filtering` method. Where you can filter the data based on the metadata. See the `Stream Filtering` documentation for more details.
- #### Using a load balancer In case you want to use a load balancer, you can use the `Using a load balancer` method. In Kubernetes, you can use the service name as load balancer dns. See the `Using a load balancer` documentation for more details.
+#### Configure TCP parameters +By default, this client uses optimized TCP read and write buffer sizes to achieve the best performance. +In some environments, this optimization may cause latency issues. To restore the default OS parameters, you can call: +```go +env, err := stream.NewEnvironment(stream.NewEnvironmentOptions(). + SetWriteBuffer(-1). + SetReadBuffer(-1) +) +``` - - - - - +See these issues [Issue #293](https://github.com/rabbitmq/rabbitmq-stream-go-client/issues/293) and [PR #374](https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/374), to get more insight. diff --git a/pkg/stream/client.go b/pkg/stream/client.go index e8700c7b..a620a4ae 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -6,7 +6,6 @@ import ( "crypto/tls" "errors" "fmt" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "math/rand" "net" "net/url" @@ -15,6 +14,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) // SaslConfiguration see @@ -161,14 +162,20 @@ func (c *Client) connect() error { return errorConnection } - if err = connection.SetWriteBuffer(c.tcpParameters.WriteBuffer); err != nil { - logs.LogError("Failed to SetWriteBuffer to %d due to %v", c.tcpParameters.WriteBuffer, err) - return err + if c.tcpParameters.WriteBuffer > 0 { + if err = connection.SetWriteBuffer(c.tcpParameters.WriteBuffer); err != nil { + logs.LogError("Failed to SetWriteBuffer to %d due to %v", c.tcpParameters.WriteBuffer, err) + return err + } } - if err = connection.SetReadBuffer(c.tcpParameters.ReadBuffer); err != nil { - logs.LogError("Failed to SetReadBuffer to %d due to %v", c.tcpParameters.ReadBuffer, err) - return err + + if c.tcpParameters.ReadBuffer > 0 { + if err = connection.SetReadBuffer(c.tcpParameters.ReadBuffer); err != nil { + logs.LogError("Failed to SetReadBuffer to %d due to %v", c.tcpParameters.ReadBuffer, err) + return err + } } + if err = connection.SetNoDelay(c.tcpParameters.NoDelay); err != nil { logs.LogError("Failed to SetNoDelay to %b due to %v", c.tcpParameters.NoDelay, err) return err