Skip to content

Add example with reliable* #379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ Stream uri: `rabbitmq-stream://guest:guest@localhost:5552`

### Getting started for impatient

See [getting started](./examples/getting_started.go) example.
- [Getting started with reliable producer/consumer](./examples/reliable_getting_started/reliable_getting_started.go) example.
- [Getting started with standard producer/consumer](./examples/getting_started/getting_started.go) example.
- Getting started Video tutorial:

[![Getting Started](https://img.youtube.com/vi/8qfvl6FgC50/0.jpg)](https://www.youtube.com/watch?v=8qfvl6FgC50)

### Examples

Expand Down
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Stream examples
===

- [Getting started](./getting_started.go) - A good point to start.
- [Reliable getting started](./getting_started/getting_started.go) - The structures you need to start.
- [Getting started](./getting_started/getting_started.go) - Producer and Consumer example without reconnection
- [Offset Start](./offsetStart/offset.go) - How to set different points to start consuming
- [Offset Tracking](./offsetTracking/offsetTracking.go) - Manually store the consumer offset
- [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go) - Automatic store the consumer offset
Expand Down
108 changes: 108 additions & 0 deletions examples/reliable_getting_started/reliable_getting_started.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"errors"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)

func main() {
fmt.Printf("Getting started with Streaming client for RabbitMQ\n")

// Create the environment. You can set the log level to DEBUG for more information
// stream.SetLevelInfo(logs.DEBUG)
// the environment is the connection to the broker(s)
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
SetHost("localhost").
SetPort(5552))
if err != nil {
fmt.Printf("Error creating environment: %v\n", err)
return
}

// Create a stream
streamName := "my-stream"
// It is highly recommended to define the stream retention policy
err = env.DeclareStream(streamName, stream.NewStreamOptions().
SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))

// ignore the error if the stream already exists
if err != nil && !errors.Is(err, stream.StreamAlreadyExists) {
fmt.Printf("Error declaring stream: %v\n", err)
return
}

// declare the reliable consumer using the package ha
consumer, err := ha.NewReliableConsumer(env, streamName,
// start from the beginning of the stream
stream.NewConsumerOptions().
SetOffset(stream.OffsetSpecification{}.First()),
// handler where the messages will be processed
func(consumerContext stream.ConsumerContext, message *amqp.Message) {
fmt.Printf("Message received: %s\n", message.GetData())
})

if err != nil {
fmt.Printf("Error creating consumer: %v\n", err)
return
}

// Create the reliable producer using the package ha
producer, err := ha.NewReliableProducer(env, streamName,
// we leave the default options
stream.NewProducerOptions(),
// handler for the confirmation of the messages
func(messageConfirm []*stream.ConfirmationStatus) {
for _, msg := range messageConfirm {
if msg.IsConfirmed() {
fmt.Printf("message %s confirmed \n", msg.GetMessage().GetData())
} else {
fmt.Printf("message %s failed \n", msg.GetMessage().GetData())
}
}
})

if err != nil {
fmt.Printf("Error creating producer: %v\n", err)
return
}

// Send a message
for i := 0; i < 10; i++ {
err = producer.Send(amqp.NewMessage([]byte(fmt.Sprintf("Hello stream:%d", i))))
if err != nil {
fmt.Printf("Error sending message: %v\n", err)
return
}
}

// press any key to exit
fmt.Printf("Press any close the producer, consumer and environment\n")
_, _ = fmt.Scanln()

//// Close the producer
err = producer.Close()
if err != nil {
fmt.Printf("Error closing producer: %v\n", err)
}

// Close the consumer
err = consumer.Close()
if err != nil {
fmt.Printf("Error closing consumer: %v\n", err)
}

err = env.DeleteStream(streamName)
if err != nil {
fmt.Printf("Error deleting stream: %v\n", err)
}

// Close the environment
err = env.Close()
if err != nil {
fmt.Printf("Error closing environment: %s\n", err)
}

}
3 changes: 2 additions & 1 deletion pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
p.reconnectionSignal.L.Lock()
p.reconnectionSignal.Broadcast()
p.reconnectionSignal.L.Unlock()
logs.LogInfo("[Reliable] - %s reconnection signal sent", p.getInfo())
logs.LogDebug("[Reliable] - %s reconnection signal sent", p.getInfo())

}()
}

Expand Down