diff --git a/README.md b/README.md index b39de3ee..5a71b5b1 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,11 @@ Stream uri: `rabbitmq-stream://guest:guest@localhost:5552` ### Getting started for impatient -See [getting started](./examples/getting_started.go) example. +- [Getting started with reliable producer/consumer](./examples/reliable_getting_started/reliable_getting_started.go) example. +- [Getting started with standard producer/consumer](./examples/getting_started/getting_started.go) example. +- Getting started Video tutorial: + +[![Getting Started](https://img.youtube.com/vi/8qfvl6FgC50/0.jpg)](https://www.youtube.com/watch?v=8qfvl6FgC50) ### Examples diff --git a/examples/README.md b/examples/README.md index 96315143..95608649 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,7 +1,7 @@ Stream examples === - - - [Getting started](./getting_started.go) - A good point to start. + - [Reliable getting started](./getting_started/getting_started.go) - The structures you need to start. + - [Getting started](./getting_started/getting_started.go) - Producer and Consumer example without reconnection - [Offset Start](./offsetStart/offset.go) - How to set different points to start consuming - [Offset Tracking](./offsetTracking/offsetTracking.go) - Manually store the consumer offset - [Automatic Offset Tracking](./automaticOffsetTracking/automaticOffsetTracking.go) - Automatic store the consumer offset diff --git a/examples/getting_started.go b/examples/getting_started/getting_started.go similarity index 100% rename from examples/getting_started.go rename to examples/getting_started/getting_started.go diff --git a/examples/reliable_getting_started/reliable_getting_started.go b/examples/reliable_getting_started/reliable_getting_started.go new file mode 100644 index 00000000..76a6911e --- /dev/null +++ b/examples/reliable_getting_started/reliable_getting_started.go @@ -0,0 +1,108 @@ +package main + +import ( + "errors" + "fmt" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" +) + +func main() { + fmt.Printf("Getting started with Streaming client for RabbitMQ\n") + + // Create the environment. You can set the log level to DEBUG for more information + // stream.SetLevelInfo(logs.DEBUG) + // the environment is the connection to the broker(s) + env, err := stream.NewEnvironment(stream.NewEnvironmentOptions(). + SetHost("localhost"). + SetPort(5552)) + if err != nil { + fmt.Printf("Error creating environment: %v\n", err) + return + } + + // Create a stream + streamName := "my-stream" + // It is highly recommended to define the stream retention policy + err = env.DeclareStream(streamName, stream.NewStreamOptions(). + SetMaxLengthBytes(stream.ByteCapacity{}.GB(2))) + + // ignore the error if the stream already exists + if err != nil && !errors.Is(err, stream.StreamAlreadyExists) { + fmt.Printf("Error declaring stream: %v\n", err) + return + } + + // declare the reliable consumer using the package ha + consumer, err := ha.NewReliableConsumer(env, streamName, + // start from the beginning of the stream + stream.NewConsumerOptions(). + SetOffset(stream.OffsetSpecification{}.First()), + // handler where the messages will be processed + func(consumerContext stream.ConsumerContext, message *amqp.Message) { + fmt.Printf("Message received: %s\n", message.GetData()) + }) + + if err != nil { + fmt.Printf("Error creating consumer: %v\n", err) + return + } + + // Create the reliable producer using the package ha + producer, err := ha.NewReliableProducer(env, streamName, + // we leave the default options + stream.NewProducerOptions(), + // handler for the confirmation of the messages + func(messageConfirm []*stream.ConfirmationStatus) { + for _, msg := range messageConfirm { + if msg.IsConfirmed() { + fmt.Printf("message %s confirmed \n", msg.GetMessage().GetData()) + } else { + fmt.Printf("message %s failed \n", msg.GetMessage().GetData()) + } + } + }) + + if err != nil { + fmt.Printf("Error creating producer: %v\n", err) + return + } + + // Send a message + for i := 0; i < 10; i++ { + err = producer.Send(amqp.NewMessage([]byte(fmt.Sprintf("Hello stream:%d", i)))) + if err != nil { + fmt.Printf("Error sending message: %v\n", err) + return + } + } + + // press any key to exit + fmt.Printf("Press any close the producer, consumer and environment\n") + _, _ = fmt.Scanln() + + //// Close the producer + err = producer.Close() + if err != nil { + fmt.Printf("Error closing producer: %v\n", err) + } + + // Close the consumer + err = consumer.Close() + if err != nil { + fmt.Printf("Error closing consumer: %v\n", err) + } + + err = env.DeleteStream(streamName) + if err != nil { + fmt.Printf("Error deleting stream: %v\n", err) + } + + // Close the environment + err = env.Close() + if err != nil { + fmt.Printf("Error closing environment: %s\n", err) + } + +} diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index 8f12ef34..3aa8bd10 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -48,7 +48,8 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) { p.reconnectionSignal.L.Lock() p.reconnectionSignal.Broadcast() p.reconnectionSignal.L.Unlock() - logs.LogInfo("[Reliable] - %s reconnection signal sent", p.getInfo()) + logs.LogDebug("[Reliable] - %s reconnection signal sent", p.getInfo()) + }() }