diff --git a/README.md b/README.md
index cf96a2c4..472d2e93 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
- A work-in-progress Rust client for RabbitMQ Streams
+ A Rust client for RabbitMQ Streams
@@ -14,7 +14,7 @@
-
+
@@ -32,39 +32,53 @@
+
+Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide provides comprehensive information on installation, usage, and examples.
-
+## Table of Contents
+1. [Introduction](#introduction)
+2. [Installation](#installation)
+3. [Getting Started](#getting-started)
+4. [Usage](#usage)
+ - [Publishing Messages](#publishing-messages)
+ - [Consuming Messages](#consuming-messages)
+ - [Super Stream](#super-stream)
+5. [Examples](#examples)
+6. [Development](#development)
+ - [Compiling](#Compiling)
+ - [Running Tests](#running-tests)
+ - [Running Benchmarks](#running-benchmarks)
+ - [Contributing](#contributing)
+ - [License](#license)
+## Introduction
-# RabbitMQ Stream Client
+The RabbitMQ Stream Rust Client is a library designed for integrating Rust applications with RabbitMQ streams efficiently. It supports high throughput and low latency message streaming.
-This is a Rust client library for working with [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams).
+## Installation
-### Installation
-
-Install from [crates.io](https://crates.io/)
+Install from [crates.io](https://crates.io/crates/rabbitmq-stream-client)
```toml
[dependencies]
rabbitmq-stream-client = "*"
```
-### Quick Start
-
-The main access point is `Environment`, which is used to connect to a node.
+Then run `cargo build `to include it in your project.
-#### Example
+## Getting Started
+This section covers the initial setup and necessary steps to incorporate the RabbitMQ Stream client into your Rust application.
-##### Building the environment
+Ensure RabbitMQ server with stream support is installed.
+The main access point is `Environment`, which is used to connect to a node.
```rust,no_run
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
```
-
-##### Building the environment with TLS
+### Environment with TLS
```rust,no_run
use rabbitmq_stream_client::Environment;
@@ -86,7 +100,10 @@ let environment = Environment::builder()
.build()
```
-##### Building the environment with a load balancer
+### Environment with a load balancer
+
+
+See the [documentation](https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams#with-a-load-balancer) about the stream and load-balancer.
```rust,no_run
use rabbitmq_stream_client::Environment;
@@ -99,21 +116,16 @@ let environment = Environment::builder()
-##### Publishing messages
+## Publishing messages
+
+You can publish messages with three different methods:
+
+* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs)
+* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs)
+* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs)
-```rust,no_run
-use rabbitmq_stream_client::{Environment, types::Message};
-let environment = Environment::builder().build().await?;
-let producer = environment.producer().name("myproducer").build("mystream").await?;
-for i in 0..10 {
- producer
- .send_with_confirm(Message::builder().body(format!("message{}", i)).build())
- .await?;
-}
-producer.close().await?;
-```
-##### Consuming messages
+### Consuming messages
```rust,no_run
use rabbitmq_stream_client::{Environment};
@@ -136,9 +148,10 @@ sleep(Duration::from_secs(10)).await;
handle.close().await?;
```
-### Superstreams
-The client supports the superstream functionality.
+### Super Stream
+
+The client supports the super-stream functionality.
A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.
@@ -152,16 +165,21 @@ See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-st
See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/receive_super_stream.rs)
-### Development
+### Examples
+
+Refer to the [examples](./examples) directory for detailed code samples illustrating various use cases
+like error handling, batch processing, super streams and different ways to send messages.
+
+## Development
-#### Compiling
+### Compiling
```bash
git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client .
make build
```
-#### Running Tests
+### Running Tests
To run tests you need to have a running RabbitMQ Stream node with a TLS configuration.
It is mandatory to use `make rabbitmq-server` to create a TLS configuration compatible with the tests.
@@ -172,9 +190,15 @@ make rabbitmq-server
make test
```
-#### Running Benchmarks
+### Running Benchmarks
```bash
make rabbitmq-server
make run-benchmark
```
+
+## Contributing
+Contributions are welcome! Please read our contributing guide to understand how to submit issues, enhancements, or patches.
+
+## License
+This project is licensed under the MIT License. See the LICENSE file for details.
diff --git a/examples/batch_send.rs b/examples/batch_send.rs
index 166ab886..875d7902 100644
--- a/examples/batch_send.rs
+++ b/examples/batch_send.rs
@@ -1,26 +1,21 @@
use futures::StreamExt;
use rabbitmq_stream_client::{
- types::{ByteCapacity, Message, OffsetSpecification},
+ types::{ByteCapacity, Message},
Environment,
};
-use tracing::{info, Level};
-use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() -> Result<(), Box> {
- let subscriber = FmtSubscriber::builder()
- .with_max_level(Level::TRACE)
- .finish();
-
- tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;
+ println!("creating batch_send stream");
let _ = environment.delete_stream("batch_send").await;
- let message_count = 10;
+ let messages_to_batch = 100;
+ let iterations = 10000;
environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
@@ -29,41 +24,23 @@ async fn main() -> Result<(), Box> {
let producer = environment.producer().build("batch_send").await?;
- let mut messages = Vec::with_capacity(message_count);
- for i in 0..message_count {
- let msg = Message::builder().body(format!("message{}", i)).build();
- messages.push(msg);
+ for _ in 0..iterations {
+ println!("accumulating messages in buffer");
+ let mut messages = Vec::with_capacity(messages_to_batch);
+ for i in 0..messages_to_batch {
+ let msg = Message::builder().body(format!("message{}", i)).build();
+ messages.push(msg);
+ }
+
+ println!("sending in batch mode");
+ producer
+ .batch_send(messages, |confirmation_status| async move {
+ println!("Message confirmed with status {:?}", confirmation_status);
+ })
+ .await?;
}
- producer
- .batch_send(messages, |confirmation_status| async move {
- info!("Message confirmed with status {:?}", confirmation_status);
- })
- .await?;
-
producer.close().await?;
- let mut consumer = environment
- .consumer()
- .offset(OffsetSpecification::First)
- .build("batch_send")
- .await
- .unwrap();
-
- for _ in 0..message_count {
- let delivery = consumer.next().await.unwrap()?;
- info!(
- "Got message : {:?} with offset {}",
- delivery
- .message()
- .data()
- .map(|data| String::from_utf8(data.to_vec())),
- delivery.offset()
- );
- }
-
- consumer.handle().close().await.unwrap();
-
- environment.delete_stream("batch_send").await?;
Ok(())
}
diff --git a/examples/batch_send_with_tracing.rs b/examples/batch_send_with_tracing.rs
new file mode 100644
index 00000000..166ab886
--- /dev/null
+++ b/examples/batch_send_with_tracing.rs
@@ -0,0 +1,69 @@
+use futures::StreamExt;
+use rabbitmq_stream_client::{
+ types::{ByteCapacity, Message, OffsetSpecification},
+ Environment,
+};
+use tracing::{info, Level};
+use tracing_subscriber::FmtSubscriber;
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ let subscriber = FmtSubscriber::builder()
+ .with_max_level(Level::TRACE)
+ .finish();
+
+ tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
+ let environment = Environment::builder()
+ .host("localhost")
+ .port(5552)
+ .build()
+ .await?;
+
+ let _ = environment.delete_stream("batch_send").await;
+ let message_count = 10;
+ environment
+ .stream_creator()
+ .max_length(ByteCapacity::GB(2))
+ .create("batch_send")
+ .await?;
+
+ let producer = environment.producer().build("batch_send").await?;
+
+ let mut messages = Vec::with_capacity(message_count);
+ for i in 0..message_count {
+ let msg = Message::builder().body(format!("message{}", i)).build();
+ messages.push(msg);
+ }
+
+ producer
+ .batch_send(messages, |confirmation_status| async move {
+ info!("Message confirmed with status {:?}", confirmation_status);
+ })
+ .await?;
+
+ producer.close().await?;
+
+ let mut consumer = environment
+ .consumer()
+ .offset(OffsetSpecification::First)
+ .build("batch_send")
+ .await
+ .unwrap();
+
+ for _ in 0..message_count {
+ let delivery = consumer.next().await.unwrap()?;
+ info!(
+ "Got message : {:?} with offset {}",
+ delivery
+ .message()
+ .data()
+ .map(|data| String::from_utf8(data.to_vec())),
+ delivery.offset()
+ );
+ }
+
+ consumer.handle().close().await.unwrap();
+
+ environment.delete_stream("batch_send").await?;
+ Ok(())
+}
diff --git a/examples/send_async.rs b/examples/send_async.rs
new file mode 100644
index 00000000..401fbeb6
--- /dev/null
+++ b/examples/send_async.rs
@@ -0,0 +1,85 @@
+use rabbitmq_stream_client::error::StreamCreateError;
+use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode};
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::sync::Arc;
+use tokio::sync::Notify;
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ use rabbitmq_stream_client::Environment;
+ let environment = Environment::builder().build().await?;
+ let message_count = 1000000;
+ let stream = "hello-rust-stream";
+ let confirmed_messages = Arc::new(AtomicU32::new(0));
+ let notify_on_send = Arc::new(Notify::new());
+ let _ = environment
+ .stream_creator()
+ .max_length(ByteCapacity::GB(5))
+ .create(stream)
+ .await;
+
+ let delete_stream = environment.delete_stream(stream).await;
+
+ match delete_stream {
+ Ok(_) => {
+ println!("Successfully deleted stream {}", stream);
+ }
+ Err(err) => {
+ println!("Failed to delete stream {}. error {}", stream, err);
+ }
+ }
+
+ let create_response = environment
+ .stream_creator()
+ .max_length(ByteCapacity::GB(5))
+ .create(&stream)
+ .await;
+
+ if let Err(e) = create_response {
+ if let StreamCreateError::Create { stream, status } = e {
+ match status {
+ // we can ignore this error because the stream already exists
+ ResponseCode::StreamAlreadyExists => {}
+ err => {
+ println!("Error creating stream: {:?} {:?}", stream, err);
+ }
+ }
+ }
+ }
+ println!(
+ "Send stream example. Sending {} messages to the stream: {}",
+ message_count, stream
+ );
+
+ let producer = environment.producer().build(stream).await.unwrap();
+
+ for i in 0..message_count {
+ let counter = confirmed_messages.clone();
+ let notifier = notify_on_send.clone();
+ let msg = Message::builder()
+ .body(format!("stream message_{}", i))
+ .build();
+ producer
+ .send(msg, move |confirmation_status| {
+ let inner_counter = counter.clone();
+ let inner_notifier = notifier.clone();
+ println!("Message confirmed with status {:?}", confirmation_status);
+ async move {
+ if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 {
+ inner_notifier.notify_one();
+ }
+ }
+ })
+ .await
+ .unwrap();
+ }
+
+ notify_on_send.notified().await;
+ println!(
+ "Successfully sent {} messages to the stream {}",
+ message_count, stream
+ );
+ let _ = producer.close().await;
+
+ Ok(())
+}
diff --git a/examples/send_with_confirm.rs b/examples/send_with_confirm.rs
new file mode 100644
index 00000000..9162c3dc
--- /dev/null
+++ b/examples/send_with_confirm.rs
@@ -0,0 +1,38 @@
+use rabbitmq_stream_client::error::StreamCreateError;
+use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode};
+
+#[tokio::main]
+async fn main() -> Result<(), Box> {
+ use rabbitmq_stream_client::Environment;
+ let environment = Environment::builder().build().await?;
+ let stream = "hello-rust-stream";
+ let number_of_messages = 1000000;
+ let create_response = environment
+ .stream_creator()
+ .max_length(ByteCapacity::GB(5))
+ .create(stream)
+ .await;
+
+ if let Err(e) = create_response {
+ if let StreamCreateError::Create { stream, status } = e {
+ match status {
+ // we can ignore this error because the stream already exists
+ ResponseCode::StreamAlreadyExists => {}
+ err => {
+ println!("Error creating stream: {:?} {:?}", stream, err);
+ }
+ }
+ }
+ }
+
+ let producer = environment.producer().build(stream).await?;
+
+ for i in 0..number_of_messages {
+ let msg = Message::builder()
+ .body(format!("stream message_{}", i))
+ .build();
+ producer.send_with_confirm(msg).await?;
+ }
+ producer.close().await?;
+ Ok(())
+}