From 63b5c743f63edd097743a15feb6cada5f8b81bc5 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Mon, 4 Nov 2024 10:30:14 +0100 Subject: [PATCH 1/2] adding producer examples --- README.md | 23 ++++---- examples/batch_send.rs | 59 ++++++-------------- examples/batch_send_with_tracing.rs | 69 +++++++++++++++++++++++ examples/send_async.rs | 85 +++++++++++++++++++++++++++++ examples/send_with_confirm.rs | 38 +++++++++++++ 5 files changed, 220 insertions(+), 54 deletions(-) create mode 100644 examples/batch_send_with_tracing.rs create mode 100644 examples/send_async.rs create mode 100644 examples/send_with_confirm.rs diff --git a/README.md b/README.md index cf96a2c4..95cce1a2 100644 --- a/README.md +++ b/README.md @@ -97,23 +97,20 @@ let environment = Environment::builder() .build() ``` +### Publishing messages +You can publish messages with three different methods: -##### Publishing messages +* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. +* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. +* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. -```rust,no_run -use rabbitmq_stream_client::{Environment, types::Message}; -let environment = Environment::builder().build().await?; -let producer = environment.producer().name("myproducer").build("mystream").await?; -for i in 0..10 { - producer - .send_with_confirm(Message::builder().body(format!("message{}", i)).build()) - .await?; -} -producer.close().await?; -``` +On the [examples](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/) directory you can find diffent way to send the messages: +- [producer using send](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/send_async.rs) +- [producer using send_with_confirm](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/send_with_confirm.py) +- [producer using batch_send](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/batch_send.py) -##### Consuming messages +### Consuming messages ```rust,no_run use rabbitmq_stream_client::{Environment}; diff --git a/examples/batch_send.rs b/examples/batch_send.rs index 166ab886..875d7902 100644 --- a/examples/batch_send.rs +++ b/examples/batch_send.rs @@ -1,26 +1,21 @@ use futures::StreamExt; use rabbitmq_stream_client::{ - types::{ByteCapacity, Message, OffsetSpecification}, + types::{ByteCapacity, Message}, Environment, }; -use tracing::{info, Level}; -use tracing_subscriber::FmtSubscriber; #[tokio::main] async fn main() -> Result<(), Box> { - let subscriber = FmtSubscriber::builder() - .with_max_level(Level::TRACE) - .finish(); - - tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); let environment = Environment::builder() .host("localhost") .port(5552) .build() .await?; + println!("creating batch_send stream"); let _ = environment.delete_stream("batch_send").await; - let message_count = 10; + let messages_to_batch = 100; + let iterations = 10000; environment .stream_creator() .max_length(ByteCapacity::GB(2)) @@ -29,41 +24,23 @@ async fn main() -> Result<(), Box> { let producer = environment.producer().build("batch_send").await?; - let mut messages = Vec::with_capacity(message_count); - for i in 0..message_count { - let msg = Message::builder().body(format!("message{}", i)).build(); - messages.push(msg); + for _ in 0..iterations { + println!("accumulating messages in buffer"); + let mut messages = Vec::with_capacity(messages_to_batch); + for i in 0..messages_to_batch { + let msg = Message::builder().body(format!("message{}", i)).build(); + messages.push(msg); + } + + println!("sending in batch mode"); + producer + .batch_send(messages, |confirmation_status| async move { + println!("Message confirmed with status {:?}", confirmation_status); + }) + .await?; } - producer - .batch_send(messages, |confirmation_status| async move { - info!("Message confirmed with status {:?}", confirmation_status); - }) - .await?; - producer.close().await?; - let mut consumer = environment - .consumer() - .offset(OffsetSpecification::First) - .build("batch_send") - .await - .unwrap(); - - for _ in 0..message_count { - let delivery = consumer.next().await.unwrap()?; - info!( - "Got message : {:?} with offset {}", - delivery - .message() - .data() - .map(|data| String::from_utf8(data.to_vec())), - delivery.offset() - ); - } - - consumer.handle().close().await.unwrap(); - - environment.delete_stream("batch_send").await?; Ok(()) } diff --git a/examples/batch_send_with_tracing.rs b/examples/batch_send_with_tracing.rs new file mode 100644 index 00000000..166ab886 --- /dev/null +++ b/examples/batch_send_with_tracing.rs @@ -0,0 +1,69 @@ +use futures::StreamExt; +use rabbitmq_stream_client::{ + types::{ByteCapacity, Message, OffsetSpecification}, + Environment, +}; +use tracing::{info, Level}; +use tracing_subscriber::FmtSubscriber; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::TRACE) + .finish(); + + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + let environment = Environment::builder() + .host("localhost") + .port(5552) + .build() + .await?; + + let _ = environment.delete_stream("batch_send").await; + let message_count = 10; + environment + .stream_creator() + .max_length(ByteCapacity::GB(2)) + .create("batch_send") + .await?; + + let producer = environment.producer().build("batch_send").await?; + + let mut messages = Vec::with_capacity(message_count); + for i in 0..message_count { + let msg = Message::builder().body(format!("message{}", i)).build(); + messages.push(msg); + } + + producer + .batch_send(messages, |confirmation_status| async move { + info!("Message confirmed with status {:?}", confirmation_status); + }) + .await?; + + producer.close().await?; + + let mut consumer = environment + .consumer() + .offset(OffsetSpecification::First) + .build("batch_send") + .await + .unwrap(); + + for _ in 0..message_count { + let delivery = consumer.next().await.unwrap()?; + info!( + "Got message : {:?} with offset {}", + delivery + .message() + .data() + .map(|data| String::from_utf8(data.to_vec())), + delivery.offset() + ); + } + + consumer.handle().close().await.unwrap(); + + environment.delete_stream("batch_send").await?; + Ok(()) +} diff --git a/examples/send_async.rs b/examples/send_async.rs new file mode 100644 index 00000000..401fbeb6 --- /dev/null +++ b/examples/send_async.rs @@ -0,0 +1,85 @@ +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let message_count = 1000000; + let stream = "hello-rust-stream"; + let confirmed_messages = Arc::new(AtomicU32::new(0)); + let notify_on_send = Arc::new(Notify::new()); + let _ = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(stream) + .await; + + let delete_stream = environment.delete_stream(stream).await; + + match delete_stream { + Ok(_) => { + println!("Successfully deleted stream {}", stream); + } + Err(err) => { + println!("Failed to delete stream {}. error {}", stream, err); + } + } + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(&stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + println!( + "Send stream example. Sending {} messages to the stream: {}", + message_count, stream + ); + + let producer = environment.producer().build(stream).await.unwrap(); + + for i in 0..message_count { + let counter = confirmed_messages.clone(); + let notifier = notify_on_send.clone(); + let msg = Message::builder() + .body(format!("stream message_{}", i)) + .build(); + producer + .send(msg, move |confirmation_status| { + let inner_counter = counter.clone(); + let inner_notifier = notifier.clone(); + println!("Message confirmed with status {:?}", confirmation_status); + async move { + if inner_counter.fetch_add(1, Ordering::Relaxed) == message_count - 1 { + inner_notifier.notify_one(); + } + } + }) + .await + .unwrap(); + } + + notify_on_send.notified().await; + println!( + "Successfully sent {} messages to the stream {}", + message_count, stream + ); + let _ = producer.close().await; + + Ok(()) +} diff --git a/examples/send_with_confirm.rs b/examples/send_with_confirm.rs new file mode 100644 index 00000000..9162c3dc --- /dev/null +++ b/examples/send_with_confirm.rs @@ -0,0 +1,38 @@ +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let stream = "hello-rust-stream"; + let number_of_messages = 1000000; + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + + let producer = environment.producer().build(stream).await?; + + for i in 0..number_of_messages { + let msg = Message::builder() + .body(format!("stream message_{}", i)) + .build(); + producer.send_with_confirm(msg).await?; + } + producer.close().await?; + Ok(()) +} From b8cdf53d0d2fc47484e9d0166274173ea12c90f7 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 4 Nov 2024 14:53:43 +0100 Subject: [PATCH 2/2] Update documentation Signed-off-by: Gabriele Santomaggio --- README.md | 87 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 95cce1a2..472d2e93 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@
- A work-in-progress Rust client for RabbitMQ Streams + A Rust client for RabbitMQ Streams
@@ -14,7 +14,7 @@ Tests status - + Download @@ -32,39 +32,53 @@ codecov + +Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide provides comprehensive information on installation, usage, and examples. - +## Table of Contents +1. [Introduction](#introduction) +2. [Installation](#installation) +3. [Getting Started](#getting-started) +4. [Usage](#usage) + - [Publishing Messages](#publishing-messages) + - [Consuming Messages](#consuming-messages) + - [Super Stream](#super-stream) +5. [Examples](#examples) +6. [Development](#development) + - [Compiling](#Compiling) + - [Running Tests](#running-tests) + - [Running Benchmarks](#running-benchmarks) + - [Contributing](#contributing) + - [License](#license) +## Introduction -# RabbitMQ Stream Client +The RabbitMQ Stream Rust Client is a library designed for integrating Rust applications with RabbitMQ streams efficiently. It supports high throughput and low latency message streaming. -This is a Rust client library for working with [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams). +## Installation -### Installation - -Install from [crates.io](https://crates.io/) +Install from [crates.io](https://crates.io/crates/rabbitmq-stream-client) ```toml [dependencies] rabbitmq-stream-client = "*" ``` -### Quick Start +Then run `cargo build `to include it in your project. -The main access point is `Environment`, which is used to connect to a node. +## Getting Started +This section covers the initial setup and necessary steps to incorporate the RabbitMQ Stream client into your Rust application. -#### Example - -##### Building the environment +Ensure RabbitMQ server with stream support is installed. +The main access point is `Environment`, which is used to connect to a node. ```rust,no_run use rabbitmq_stream_client::Environment; let environment = Environment::builder().build().await?; ``` - -##### Building the environment with TLS +### Environment with TLS ```rust,no_run use rabbitmq_stream_client::Environment; @@ -86,7 +100,10 @@ let environment = Environment::builder() .build() ``` -##### Building the environment with a load balancer +### Environment with a load balancer + + +See the [documentation](https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams#with-a-load-balancer) about the stream and load-balancer. ```rust,no_run use rabbitmq_stream_client::Environment; @@ -97,18 +114,16 @@ let environment = Environment::builder() .build() ``` -### Publishing messages + + +## Publishing messages You can publish messages with three different methods: -* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. -* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. -* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. +* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs) +* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs) +* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs) -On the [examples](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/) directory you can find diffent way to send the messages: -- [producer using send](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/send_async.rs) -- [producer using send_with_confirm](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/send_with_confirm.py) -- [producer using batch_send](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/batch_send.py) ### Consuming messages @@ -133,9 +148,10 @@ sleep(Duration::from_secs(10)).await; handle.close().await?; ``` -### Superstreams -The client supports the superstream functionality. +### Super Stream + +The client supports the super-stream functionality. A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes. @@ -149,16 +165,21 @@ See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-st See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/receive_super_stream.rs) -### Development +### Examples -#### Compiling +Refer to the [examples](./examples) directory for detailed code samples illustrating various use cases +like error handling, batch processing, super streams and different ways to send messages. + +## Development + +### Compiling ```bash git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client . make build ``` -#### Running Tests +### Running Tests To run tests you need to have a running RabbitMQ Stream node with a TLS configuration. It is mandatory to use `make rabbitmq-server` to create a TLS configuration compatible with the tests. @@ -169,9 +190,15 @@ make rabbitmq-server make test ``` -#### Running Benchmarks +### Running Benchmarks ```bash make rabbitmq-server make run-benchmark ``` + +## Contributing +Contributions are welcome! Please read our contributing guide to understand how to submit issues, enhancements, or patches. + +## License +This project is licensed under the MIT License. See the LICENSE file for details.