From 86e358942372644e5bce80dd46c4d7d1a0b8938f Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 14:36:28 +0200 Subject: [PATCH 1/5] Expose storeOffset Api to the Env Signed-off-by: Gabriele Santomaggio --- README.md | 318 ++++++++++++++++++++------------- pkg/stream/client.go | 13 ++ pkg/stream/environment.go | 18 ++ pkg/stream/environment_test.go | 21 +++ 4 files changed, 242 insertions(+), 128 deletions(-) diff --git a/README.md b/README.md index 5a71b5b1..1eb16a10 100644 --- a/README.md +++ b/README.md @@ -22,9 +22,9 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv * [Multi hosts](#multi-hosts) * [Load Balancer](#load-balancer) * [TLS](#tls) - * [Sasl Mechanisms](#sasl-mechanisms) - * [Streams](#streams) - * [Statistics](#streams-statistics) + * [Sasl Mechanisms](#sasl-mechanisms) + * [Streams](#streams) + * [Statistics](#streams-statistics) * [Publish messages](#publish-messages) * [`Send` vs `BatchSend`](#send-vs-batchsend) * [Publish Confirmation](#publish-confirmation) @@ -38,10 +38,10 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv * [Consume Filtering](#consume-filtering) * [Single Active Consumer](#single-active-consumer) * [Handle Close](#handle-close) - * [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) + * [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) * [Super Stream](#super-stream) - [Performance test tool](#performance-test-tool) - * [Performance test tool Docker](#performance-test-tool-docker) + * [Performance test tool Docker](#performance-test-tool-docker) - [Build form source](#build-form-source) - [Project status](#project-status) @@ -55,7 +55,8 @@ The main structure is the `Environment` that contains the `Producer` and `Consum `Producer` and `Consumer` are the main interfaces to interact with the RabbitMQ Stream Queues.
They don't support the auto-reconnect in case of disconnection but have the events to detect it.
-The client provides the `ReliableProducer` and `ReliableConsumer` that support the auto-reconnect in case of disconnection.
+The client provides the `ReliableProducer` and `ReliableConsumer` that support the auto-reconnect in case of +disconnection.
See also the [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) section. ### Installing @@ -65,21 +66,25 @@ See also the [Reliable Producer and Reliable Consumer](#reliable-producer-and-re ``` imports: + ```golang -"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package -"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages +"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package +"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly ``` ### Run server with Docker --- You may need a server to test locally. Let's start the broker: + ```shell docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \ rabbitmq:4-management ``` + The broker should start in a few seconds. When it’s ready, enable the `stream` plugin and `stream_management`: + ```shell docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management ``` @@ -89,7 +94,8 @@ Stream uri: `rabbitmq-stream://guest:guest@localhost:5552` ### Getting started for impatient -- [Getting started with reliable producer/consumer](./examples/reliable_getting_started/reliable_getting_started.go) example. +- [Getting started with reliable producer/consumer](./examples/reliable_getting_started/reliable_getting_started.go) + example. - [Getting started with standard producer/consumer](./examples/getting_started/getting_started.go) example. - Getting started Video tutorial: @@ -109,23 +115,26 @@ See [best practices](./best_practices/README.md) for more details. ### Connect Standard way to connect single node: + ```golang env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions(). - SetHost("localhost"). - SetPort(5552). - SetUser("guest"). - SetPassword("guest")) - CheckErr(err) +stream.NewEnvironmentOptions(). +SetHost("localhost"). +SetPort(5552). +SetUser("guest"). +SetPassword("guest")) +CheckErr(err) ``` you can define the number of producers per connections, the default value is 1: + ```golang stream.NewEnvironmentOptions(). SetMaxProducersPerClient(2)) ``` you can define the number of consumers per connections, the default value is 1: + ```golang stream.NewEnvironmentOptions(). SetMaxConsumersPerClient(2)) @@ -141,12 +150,12 @@ It is possible to define multi hosts, in case one fails to connect the clients t ```golang addresses := []string{ - "rabbitmq-stream://guest:guest@host1:5552/%2f", - "rabbitmq-stream://guest:guest@host2:5552/%2f", - "rabbitmq-stream://guest:guest@host3:5552/%2f"} +"rabbitmq-stream://guest:guest@host1:5552/%2f", +"rabbitmq-stream://guest:guest@host2:5552/%2f", +"rabbitmq-stream://guest:guest@host3:5552/%2f"} env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions().SetUris(addresses)) +stream.NewEnvironmentOptions().SetUris(addresses)) ``` ### Load Balancer @@ -156,14 +165,14 @@ in case of load balancer you can use the `stream.AddressResolver` parameter in t ```golang addressResolver := stream.AddressResolver{ - Host: "load-balancer-ip", - Port: 5552, - } +Host: "load-balancer-ip", +Port: 5552, +} env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions(). - SetHost(addressResolver.Host). - SetPort(addressResolver.Port). - SetAddressResolver(addressResolver). +stream.NewEnvironmentOptions(). +SetHost(addressResolver.Host). +SetPort(addressResolver.Port). +SetAddressResolver(addressResolver). ``` In this configuration the client tries the connection until reach the right node. @@ -175,40 +184,43 @@ See also "Using a load balancer" example in the [examples](./examples/) director ### TLS To configure TLS you need to set the `IsTLS` parameter: + ```golang env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions(). - SetHost("localhost"). - SetPort(5551). // standard TLS port - SetUser("guest"). - SetPassword("guest"). - IsTLS(true). - SetTLSConfig(&tls.Config{}), - ) +stream.NewEnvironmentOptions(). +SetHost("localhost"). +SetPort(5551). // standard TLS port +SetUser("guest"). +SetPassword("guest"). +IsTLS(true). +SetTLSConfig(&tls.Config{}), +) ``` The `tls.Config` is the standard golang tls library https://pkg.go.dev/crypto/tls
See also "Getting started TLS" example in the [examples](./examples/) directory.
It is also possible to configure TLS using the Schema URI like: + ```golang env, err := stream.NewEnvironment( - stream.NewEnvironmentOptions(). - SetUri("rabbitmq-stream+tls://guest:guest@localhost:5551/"). - SetTLSConfig(&tls.Config{}), +stream.NewEnvironmentOptions(). +SetUri("rabbitmq-stream+tls://guest:guest@localhost:5551/"). +SetTLSConfig(&tls.Config{}), ) ``` ### Sasl Mechanisms To configure SASL you need to set the `SaslMechanism` parameter `Environment.SetSaslConfiguration`: + ```golang cfg := new(tls.Config) cfg.ServerName = "my_server_name" cfg.RootCAs = x509.NewCertPool() if ca, err := os.ReadFile("certs/ca_certificate.pem"); err == nil { - cfg.RootCAs.AppendCertsFromPEM(ca) +cfg.RootCAs.AppendCertsFromPEM(ca) } if cert, err := tls.LoadX509KeyPair("certs/client/cert.pem", "certs/client/key.pem"); err == nil { @@ -216,22 +228,23 @@ cfg.Certificates = append(cfg.Certificates, cert) } env, err := stream.NewEnvironment(stream.NewEnvironmentOptions(). - SetUri("rabbitmq-stream+tls://my_server_name:5551/"). - IsTLS(true). - SetSaslConfiguration(stream.SaslConfigurationExternal). // SASL EXTERNAL - SetTLSConfig(cfg)) +SetUri("rabbitmq-stream+tls://my_server_name:5551/"). +IsTLS(true). +SetSaslConfiguration(stream.SaslConfigurationExternal). // SASL EXTERNAL +SetTLSConfig(cfg)) ``` ### Streams To define streams you need to use the the `environment` interfaces `DeclareStream` and `DeleteStream`. -It is highly recommended to define stream retention policies during the stream creation, like `MaxLengthBytes` or `MaxAge`: +It is highly recommended to define stream retention policies during the stream creation, like `MaxLengthBytes` or +`MaxAge`: ```golang err = env.DeclareStream(streamName, - stream.NewStreamOptions(). - SetMaxLengthBytes(stream.ByteCapacity{}.GB(2))) +stream.NewStreamOptions(). +SetMaxLengthBytes(stream.ByteCapacity{}.GB(2))) ``` The function `DeclareStream` doesn't return errors if a stream is already defined with the same parameters. @@ -266,39 +279,43 @@ committedChunkId, err := statsAfter.CommittedChunkId() ### Publish messages To publish a message you need a `*stream.Producer` instance: + ```golang -producer, err := env.NewProducer("my-stream", nil) +producer, err := env.NewProducer("my-stream", nil) ``` With `ProducerOptions` is possible to customize the Producer behaviour. - The client provides two interfaces to send messages. `send`: + ```golang var message message.StreamMessage message = amqp.NewMessage([]byte("hello")) err = producer.Send(message) ``` + and `BatchSend`: + ```golang var messages []message.StreamMessage for z := 0; z < 10; z++ { - messages = append(messages, amqp.NewMessage([]byte("hello"))) +messages = append(messages, amqp.NewMessage([]byte("hello"))) } err = producer.BatchSend(messages) ``` ### `Send` vs `BatchSend` -The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation. +The `BatchSend` is the primitive to send the messages. It is up to the user to manage the aggregation. `Send` introduces a smart layer to publish messages and internally uses `BatchSend`. -Starting from version 1.5.0, the `Send` uses a dynamic send. +Starting from version 1.5.0, the `Send` uses a dynamic send. The client sends the message buffer regardless of any timeout.
What should you use?
The `Send` method is the best choice for most of the cases:
+ - It is asynchronous - It is smart to aggregate the messages in a batch with a low-latency - It is smart to split the messages in case the size is bigger than `requestedMaxFrameSize` @@ -306,14 +323,16 @@ The `Send` method is the best choice for most of the cases:
The `BatchSend` is useful in case you need to manage the aggregation by yourself.
It gives you more control over the aggregation process:
+ - It is synchronous - It is up to the user to manage the aggregation - It is up to the user to split the messages in case the size is bigger than `requestedMaxFrameSize` - It can be faster than `Send` in case the aggregation is managed by the user. #### Throughput vs Latency
+ With both methods you can have low-latency and/or high-throughput.
-The `Send` is the best choice for low-latency without care about aggregation. +The `Send` is the best choice for low-latency without care about aggregation. With `BatchSend` you have more control.
Performance test tool can help you to test `Send` and `BatchSend`
@@ -330,21 +349,22 @@ chPublishConfirm := producer.NotifyPublishConfirmation() handlePublishConfirm(chPublishConfirm) func handlePublishConfirm(confirms stream.ChannelPublishConfirm) { - go func() { - for confirmed := range confirms { - for _, msg := range confirmed { - if msg.IsConfirmed() { - fmt.Printf("message %s stored \n ", msg.GetMessage().GetData()) - } else { - fmt.Printf("message %s failed \n ", msg.GetMessage().GetData()) - } - } - } - }() +go func () { +for confirmed := range confirms { +for _, msg := range confirmed { +if msg.IsConfirmed() { +fmt.Printf("message %s stored \n ", msg.GetMessage().GetData()) +} else { +fmt.Printf("message %s failed \n ", msg.GetMessage().GetData()) +} +} +} +}() } ``` In the MessageStatus struct you can find two `publishingId`: + ```golang //first one messageStatus.GetMessage().GetPublishingId() @@ -355,31 +375,33 @@ messageStatus.GetPublishingId() The first one is provided by the user for special cases like Deduplication. The second one is assigned automatically by the client. In case the user specifies the `publishingId` with: + ```golang msg = amqp.NewMessage([]byte("mymessage")) msg.SetPublishingId(18) // <--- ``` - The filed: `messageStatus.GetMessage().HasPublishingId()` is true and
the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.GetPublishingId()` are the same. - See also "Getting started" example in the [examples](./examples/) directory ### Deduplication The deduplication is a feature that allows to avoid the duplication of messages.
It is enabled by the user by setting the producer name with the options:
+ ```golang producer, err := env.NewProducer(streamName, stream.NewProducerOptions().SetName("my_producer")) ``` + The stream plugin can handle deduplication data, see this blog post for more details: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find a "Deduplication" example in the [examples](./examples/) directory.
Run it more than time, the messages count will be always 10. To retrieve the last sequence id for producer you can use: + ``` publishingId, err := producer.GetLastPublishingId() ``` @@ -391,44 +413,48 @@ meaning outbound messages are not only batched in publishing frames, but in sub- Use this feature to increase throughput at the cost of increased latency.
You can find a "Sub Entries Batching" example in the [examples](./examples/) directory.
-Default compression is `None` (no compression) but you can define different kind of compressions: `GZIP`,`SNAPPY`,`LZ4`,`ZSTD`
+Default compression is `None` (no compression) but you can define different kind of compressions: `GZIP`,`SNAPPY`,`LZ4`, +`ZSTD`
Compression is valid only is `SubEntrySize > 1` ```golang producer, err := env.NewProducer(streamName, stream.NewProducerOptions(). - SetSubEntrySize(100). - SetCompression(stream.Compression{}.Gzip())) +SetSubEntrySize(100). +SetCompression(stream.Compression{}.Gzip())) ``` ### Publish Filtering -Stream filtering is a new feature in RabbitMQ 3.13. It allows to save bandwidth between the broker and consuming applications when those applications need only a subset of the messages of a stream. + +Stream filtering is a new feature in RabbitMQ 3.13. It allows to save bandwidth between the broker and consuming +applications when those applications need only a subset of the messages of a stream. See this [blog post](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) for more details. The blog post also contains a Java example but the Go client is similar. See the [Filtering](./examples/filtering/filtering.go) example in the [examples](./examples/) directory. - - ### Consume messages In order to consume messages from a stream you need to use the `NewConsumer` interface, ex: + ```golang -handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data) +handleMessages := func (consumerContext stream.ConsumerContext, message *amqp.Message) { +fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data) } consumer, err := env.NewConsumer( - "my-stream", - handleMessages, - .... +"my-stream", +handleMessages, +.... ``` With `ConsumerOptions` it is possible to customize the consumer behaviour. + ```golang stream.NewConsumerOptions(). - SetConsumerName("my_consumer"). // set a consumer name - SetCRCCheck(false). // Enable/Disable the CRC control. - SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning +SetConsumerName("my_consumer"). // set a consumer name +SetCRCCheck(false). // Enable/Disable the CRC control. +SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning ``` + Disabling the CRC control can increase the performances. See also "Offset Start" example in the [examples](./examples/) directory @@ -438,67 +464,89 @@ Close the consumer: other consumers ### Manual Track Offset + The server can store the current delivered offset given a consumer, in this way: + ```golang -handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - if atomic.AddInt32(&count, 1)%1000 == 0 { - err := consumerContext.Consumer.StoreOffset() // commit all messages up to the current message's offset - .... +handleMessages := func (consumerContext stream.ConsumerContext, message *amqp.Message) { +if atomic.AddInt32(&count, 1)%1000 == 0 { +err := consumerContext.Consumer.StoreOffset() // commit all messages up to the current message's offset +.... consumer, err := env.NewConsumer( .. stream.NewConsumerOptions(). - SetConsumerName("my_consumer"). <------ +SetConsumerName("my_consumer").<------ ``` + A consumer must have a name to be able to store offsets.
Note: *AVOID to store the offset for each single message, it will reduce the performances* See also "Offset Tracking" example in the [examples](./examples/) directory The server can also store a previous delivered offset rather than the current delivered offset, in this way: + ```golang -processMessageAsync := func(consumer stream.Consumer, message *amqp.Message, offset int64) { - .... - err := consumer.StoreCustomOffset(offset) // commit all messages up to this offset - .... +processMessageAsync := func (consumer stream.Consumer, message *amqp.Message, offset int64) { +.... +err := consumer.StoreCustomOffset(offset) // commit all messages up to this offset +.... ``` + This is useful in situations where we have to process messages asynchronously and we cannot block the original message handler. Which means we cannot store the current or latest delivered offset as we saw in the `handleMessages` function above. +It is possible to store the offset using the Environment interface: +This method is useful in case you want to store the offset for a consumer that is not active anymore.
+Note: *AVOID to store the offset for each single message, it will reduce the performances. This method opens and close a +connection each time*.
+ +```golang + env.StoreOffset(consumerName, streamName, 123) +``` + +The `StoreOffset` does not return any application error. For example: If the stream does not exist the API doesn't +return an error
+ ### Automatic Track Offset The following snippet shows how to enable automatic tracking with the defaults: + ```golang stream.NewConsumerOptions(). - SetConsumerName("my_consumer"). - SetAutoCommit(stream.NewAutoCommitStrategy() ... +SetConsumerName("my_consumer"). +SetAutoCommit(stream.NewAutoCommitStrategy() ... ``` + `nil` is also a valid value. Default values will be used ```golang stream.NewConsumerOptions(). - SetConsumerName("my_consumer"). - SetAutoCommit(nil) ... +SetConsumerName("my_consumer"). +SetAutoCommit(nil) ... ``` + Set the consumer name (mandatory for offset tracking)
The automatic tracking strategy has the following available settings: + - message count before storage: the client will store the offset after the specified number of messages,
-right after the execution of the message handler. The default is every 10,000 messages. + right after the execution of the message handler. The default is every 10,000 messages. - flush interval: the client will make sure to store the last received offset at the specified interval.
-This avoids having pending, not stored offsets in case of inactivity. The default is 5 seconds. + This avoids having pending, not stored offsets in case of inactivity. The default is 5 seconds. Those settings are configurable, as shown in the following snippet: + ```golang stream.NewConsumerOptions(). - // set a consumerOffsetNumber name - SetConsumerName("my_consumer"). - SetAutoCommit(stream.NewAutoCommitStrategy(). - SetCountBeforeStorage(50). // store each 50 messages stores - SetFlushInterval(10*time.Second)). // store each 10 seconds - SetOffset(stream.OffsetSpecification{}.First())) +// set a consumerOffsetNumber name +SetConsumerName("my_consumer"). +SetAutoCommit(stream.NewAutoCommitStrategy(). +SetCountBeforeStorage(50). // store each 50 messages stores +SetFlushInterval(10*time.Second)). // store each 10 seconds +SetOffset(stream.OffsetSpecification{}.First())) ``` See also "Automatic Offset Tracking" example in the [examples](./examples/) directory @@ -506,14 +554,17 @@ See also "Automatic Offset Tracking" example in the [examples](./examples/) dire ### Get consumer offset It is possible to query the consumer offset using: + ```golang offset, err := env.QueryOffset("consumer_name", "streamName") ``` -An error is returned if the offset doesn't exist. +An error is returned if the offset doesn't exist. ### Consume Filtering -Stream filtering is a new feature in RabbitMQ 3.13. It allows to save bandwidth between the broker and consuming applications when those applications need only a subset of the messages of a stream. + +Stream filtering is a new feature in RabbitMQ 3.13. It allows to save bandwidth between the broker and consuming +applications when those applications need only a subset of the messages of a stream. See this [blog post](https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) for more details. The blog post also contains a Java example but the Go client is similar. See the [Filtering](./examples/filtering/filtering.go) example in the [examples](./examples/) directory. @@ -524,13 +575,14 @@ The Single Active Consumer pattern ensures that only one consumer processes mess See the [Single Active Consumer](./examples/single_active_consumer) example. To create a consumer with the Single Active Consumer pattern, you need to set the `SingleActiveConsumer` option: + ```golang consumerName := "MyFirstGroupConsumer" - consumerUpdate := func(isActive bool) stream.OffsetSpecification {..} - stream.NewConsumerOptions(). - SetConsumerName(consumerName). - SetSingleActiveConsumer( - stream.NewSingleActiveConsumer(consumerUpdate)) +consumerUpdate := func (isActive bool) stream.OffsetSpecification {..} +stream.NewConsumerOptions(). +SetConsumerName(consumerName). +SetSingleActiveConsumer( +stream.NewSingleActiveConsumer(consumerUpdate)) ``` The `ConsumerUpdate` function is called when the consumer is promoted.
@@ -543,25 +595,26 @@ offset to restart the consumer. The `ConsumerName` is mandatory to enable the SAC. It is the way to create different group of consumers
Different groups of consumers can consume the same stream at the same time.
-The `NewConsumerOptions().SetOffset()` is not necessary when the SAC is active the `ConsumerUpdate` function +The `NewConsumerOptions().SetOffset()` is not necessary when the SAC is active the `ConsumerUpdate` function replaces the value. -See also this post for more details: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams - +See also this post for more +details: https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams ### Handle Close + Client provides an interface to handle the producer/consumer close. ```golang channelClose := consumer.NotifyClose() defer consumerClose(channelClose) func consumerClose(channelClose stream.ChannelClose) { - event := <-channelClose - fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason) +event := <-channelClose +fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason) } ``` -In this way it is possible to handle fail-over +In this way it is possible to handle fail-over ### Reliable Producer and Reliable Consumer @@ -569,6 +622,7 @@ The `ReliableProducer` and `ReliableConsumer` are built up the standard producer Both use the standard events to handle the close. So you can write your own code to handle the fail-over.
Features: + - [`Both`] auto-reconnect in case of disconnection. - [`Both`] check if stream exists, if not they close the `ReliableProducer` and `ReliableConsumer`. - [`Both`] check if the stream has a valid leader and replicas, if not they retry until the stream is ready. @@ -584,41 +638,47 @@ Each partition is a separate stream, but the client sees the Super Stream as a s You can find a "Super Stream" example in the [examples](./examples/super_stream) directory.
-In this blog post you can find more details: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams +In this blog post you can find more +details: https://www.rabbitmq.com/blog/2022/07/13/rabbitmq-3-11-feature-preview-super-streams + +You can read also the java stream-client blog +post: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams -You can read also the java stream-client blog post: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-streams - The code is written in Java but the same concepts are valid for the Go client. - The Go client has the same features as the Java client. -Super Stream supports [publish-filtering](#publish-filtering) and [consume-filtering](#consume-filtering) features. +Super Stream supports [publish-filtering](#publish-filtering) and [consume-filtering](#consume-filtering) features. Offset tracking is supported for the Super Stream consumer.
-In the same way as the standard stream, you can use the `SetAutoCommit` or `SetManualCommit` option to enable/disable the automatic offset tracking.
+In the same way as the standard stream, you can use the `SetAutoCommit` or `SetManualCommit` option to enable/disable +the automatic offset tracking.
On the super stream consumer message handler is possible to identify the partition, the consumer and the offset:
+ ```golang - handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { - .... - consumerContext.Consumer.GetName() // consumer name - consumerContext.Consumer.GetOffset() // current offset - consumerContext.Consumer.GetStreamName() // stream name (partition name ) - .... - } + handleMessages := func (consumerContext stream.ConsumerContext, message *amqp.Message) { +.... +consumerContext.Consumer.GetName() // consumer name +consumerContext.Consumer.GetOffset() // current offset +consumerContext.Consumer.GetStreamName() // stream name (partition name ) +.... +} ``` Manual tracking API: - - `consumerContext.Consumer.StoreOffset()`: stores the current offset. - - `consumerContext.Consumer.StoreCustomOffset(xxx)` stores a custom offset. -Like the standard stream, you should avoid to store the offset for each single message: it will reduce the performances. +- `consumerContext.Consumer.StoreOffset()`: stores the current offset. +- `consumerContext.Consumer.StoreCustomOffset(xxx)` stores a custom offset. +Like the standard stream, you should avoid to store the offset for each single message: it will reduce the performances. ### Performance test tool Performance test tool it is useful to execute tests. The performance test tool is in the [perfTest](./perfTest) directory.
-See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool - +See also +the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) +tool ### Build form source @@ -627,9 +687,11 @@ make build ``` To execute the tests you need a docker image, you can use: + ```shell make rabbitmq-server ``` + to run a ready rabbitmq-server with stream enabled for tests. then `make test` diff --git a/pkg/stream/client.go b/pkg/stream/client.go index a620a4ae..bf676cf4 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -824,6 +824,19 @@ func (c *Client) queryOffset(consumerName string, streamName string) (int64, err return offset.(int64), nil } +func (c *Client) StoreOffset(consumerName string, streamName string, offset int64) error { + length := 2 + 2 + 2 + len(consumerName) + 2 + + len(streamName) + 8 + var b = bytes.NewBuffer(make([]byte, 0, length+4)) + writeProtocolHeader(b, length, commandStoreOffset) + + writeString(b, consumerName) + writeString(b, streamName) + + writeLong(b, offset) + return c.socket.writeAndFlush(b.Bytes()) +} + func (c *Client) DeclareSubscriber(streamName string, messagesHandler MessagesHandler, options *ConsumerOptions) (*Consumer, error) { diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 17903f07..907ba8da 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -836,6 +836,24 @@ func (env *Environment) QueryPartitions(superStreamName string) ([]string, error return client.QueryPartitions(superStreamName) } +// StoreOffset stores the offset for a consumer for a stream +// You should use the StoreOffset method of the Consumer interface +// to store the offset for a consumer +// The StoreOffset should not be called for each message. +// the best practice is to store after a batch of messages +// StoreOffset does not return any application error, if the stream does not exist or the consumer does not exist +// the error is logged in the server +func (env *Environment) StoreOffset(consumerName string, streamName string, offset int64) error { + client, err := env.newReconnectClient() + defer func(client *Client) { + _ = client.Close() + }(client) + if err != nil { + return err + } + return client.StoreOffset(consumerName, streamName, offset) +} + func (env *Environment) QueryRoute(superStream string, routingKey string) ([]string, error) { client, err := env.newReconnectClient() defer func(client *Client) { diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 55f12386..359caa62 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -476,4 +476,25 @@ var _ = Describe("Environment test", func() { }) + Describe("Query Offset should return the value from Store Offset", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + streamName := uuid.New().String() + Expect(env.DeclareStream(streamName, nil)).NotTo(HaveOccurred()) + const consumerName = "my_consumer" + Expect(env.StoreOffset(consumerName, streamName, 123)).NotTo(HaveOccurred()) + off, err := env.QueryOffset(consumerName, streamName) + Expect(err).NotTo(HaveOccurred()) + Expect(off).To(Equal(int64(123))) + Expect(env.DeleteStream(streamName)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) + + Describe("Query Offset should not return any error in case of stream does not exist", func() { + env, err := NewEnvironment(NewEnvironmentOptions()) + Expect(err).NotTo(HaveOccurred()) + Expect(env.StoreOffset("my_consumer", "stream_doesnt_exist", 123)).NotTo(HaveOccurred()) + Expect(env.Close()).NotTo(HaveOccurred()) + }) + }) From 1b2eb094e14485c261b21f30ef54697f867bbf37 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 14:47:52 +0200 Subject: [PATCH 2/5] documentation Signed-off-by: Gabriele Santomaggio --- README.md | 77 +++++++++++++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 1eb16a10..600f2f62 100644 --- a/README.md +++ b/README.md @@ -118,12 +118,11 @@ Standard way to connect single node: ```golang env, err := stream.NewEnvironment( -stream.NewEnvironmentOptions(). -SetHost("localhost"). -SetPort(5552). -SetUser("guest"). -SetPassword("guest")) -CheckErr(err) + stream.NewEnvironmentOptions(). + SetHost("localhost"). + SetPort(5552). + SetUser("guest"). + SetPassword("guest")) ``` you can define the number of producers per connections, the default value is 1: @@ -169,10 +168,10 @@ Host: "load-balancer-ip", Port: 5552, } env, err := stream.NewEnvironment( -stream.NewEnvironmentOptions(). -SetHost(addressResolver.Host). -SetPort(addressResolver.Port). -SetAddressResolver(addressResolver). + stream.NewEnvironmentOptions(). + SetHost(addressResolver.Host). + SetPort(addressResolver.Port). + SetAddressResolver(addressResolver). ``` In this configuration the client tries the connection until reach the right node. @@ -188,12 +187,12 @@ To configure TLS you need to set the `IsTLS` parameter: ```golang env, err := stream.NewEnvironment( stream.NewEnvironmentOptions(). -SetHost("localhost"). -SetPort(5551). // standard TLS port -SetUser("guest"). -SetPassword("guest"). -IsTLS(true). -SetTLSConfig(&tls.Config{}), + SetHost("localhost"). + SetPort(5551). // standard TLS port + SetUser("guest"). + SetPassword("guest"). + IsTLS(true). + SetTLSConfig(&tls.Config{}), ) ``` @@ -220,18 +219,18 @@ cfg.ServerName = "my_server_name" cfg.RootCAs = x509.NewCertPool() if ca, err := os.ReadFile("certs/ca_certificate.pem"); err == nil { -cfg.RootCAs.AppendCertsFromPEM(ca) + cfg.RootCAs.AppendCertsFromPEM(ca) } if cert, err := tls.LoadX509KeyPair("certs/client/cert.pem", "certs/client/key.pem"); err == nil { -cfg.Certificates = append(cfg.Certificates, cert) + cfg.Certificates = append(cfg.Certificates, cert) } env, err := stream.NewEnvironment(stream.NewEnvironmentOptions(). -SetUri("rabbitmq-stream+tls://my_server_name:5551/"). -IsTLS(true). -SetSaslConfiguration(stream.SaslConfigurationExternal). // SASL EXTERNAL -SetTLSConfig(cfg)) + SetUri("rabbitmq-stream+tls://my_server_name:5551/"). + IsTLS(true). + SetSaslConfiguration(stream.SaslConfigurationExternal). // SASL EXTERNAL + SetTLSConfig(cfg)) ``` ### Streams @@ -300,7 +299,7 @@ and `BatchSend`: ```golang var messages []message.StreamMessage for z := 0; z < 10; z++ { -messages = append(messages, amqp.NewMessage([]byte("hello"))) + messages = append(messages, amqp.NewMessage([]byte("hello"))) } err = producer.BatchSend(messages) ``` @@ -350,15 +349,15 @@ handlePublishConfirm(chPublishConfirm) func handlePublishConfirm(confirms stream.ChannelPublishConfirm) { go func () { -for confirmed := range confirms { -for _, msg := range confirmed { -if msg.IsConfirmed() { -fmt.Printf("message %s stored \n ", msg.GetMessage().GetData()) -} else { -fmt.Printf("message %s failed \n ", msg.GetMessage().GetData()) -} -} -} + for confirmed := range confirms { + for _, msg := range confirmed { + if msg.IsConfirmed() { + fmt.Printf("message %s stored \n ", msg.GetMessage().GetData()) + } else { + fmt.Printf("message %s failed \n ", msg.GetMessage().GetData()) + } + } + } }() } ``` @@ -402,7 +401,7 @@ Run it more than time, the messages count will be always 10. To retrieve the last sequence id for producer you can use: -``` +```golang publishingId, err := producer.GetLastPublishingId() ``` @@ -419,8 +418,8 @@ Compression is valid only is `SubEntrySize > 1` ```golang producer, err := env.NewProducer(streamName, stream.NewProducerOptions(). -SetSubEntrySize(100). -SetCompression(stream.Compression{}.Gzip())) + SetSubEntrySize(100). + SetCompression(stream.Compression{}.Gzip())) ``` ### Publish Filtering @@ -437,12 +436,12 @@ In order to consume messages from a stream you need to use the `NewConsumer` int ```golang handleMessages := func (consumerContext stream.ConsumerContext, message *amqp.Message) { -fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data) -} + fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data) + } consumer, err := env.NewConsumer( -"my-stream", -handleMessages, + "my-stream", + handleMessages, .... ``` From 2857f13084db2fb44b1b4fc63eddbb99ccde0d2c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 14:57:59 +0200 Subject: [PATCH 3/5] go mod tidy Signed-off-by: Gabriele Santomaggio --- go.mod | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index ef226bda..669f9ba1 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/rabbitmq/rabbitmq-stream-go-client -go 1.22.0 +go 1.23.0 + toolchain go1.24.1 require ( From 62e72a00ed629d8fe17cb7d8ba9a06c7f8a5e1a3 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 15:31:13 +0200 Subject: [PATCH 4/5] go mod tidy Signed-off-by: Gabriele Santomaggio --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index 669f9ba1..7275be30 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/rabbitmq/rabbitmq-stream-go-client go 1.23.0 -toolchain go1.24.1 - require ( github.com/golang/snappy v1.0.0 github.com/google/uuid v1.6.0 From 8c3072f212bec03d69d425858d171f7ba94e81b3 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 1 Apr 2025 15:39:13 +0200 Subject: [PATCH 5/5] update go version 1.23 Signed-off-by: Gabriele Santomaggio --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 42016507..679e7192 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.22 as builder +FROM golang:1.23 as builder ENV GOPATH=/go GOOS=linux CGO_ENABLED=0 WORKDIR /go/src/github.com/rabbitmq/rabbitmq-stream-go-client COPY go.mod go.sum VERSION ./