Skip to content

adding producer examples #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 59 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<br/>
<div align="center">
<strong>
A work-in-progress Rust client for RabbitMQ Streams
A Rust client for RabbitMQ Streams
</strong>
</div>

Expand All @@ -14,7 +14,7 @@
<img src="https://github.com/rabbitmq/rabbitmq-stream-rust-client/workflows/Tests/badge.svg"
alt="Tests status" />
</a>

<a href="https://crates.io/crates/rabbitmq-stream-client">
<img src="https://img.shields.io/crates/d/rabbitmq-stream-client.svg?style=flat-square"
alt="Download" />
Expand All @@ -32,39 +32,53 @@
<a href="https://codecov.io/gh/rabbitmq/rabbitmq-stream-rust-client">
<img src="https://codecov.io/gh/rabbitmq/rabbitmq-stream-rust-client/branch/main/graph/badge.svg?token=2DHIQ20BDE" alt="codecov"/>
</a>
</div>


Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide provides comprehensive information on installation, usage, and examples.

</div>
## Table of Contents
1. [Introduction](#introduction)
2. [Installation](#installation)
3. [Getting Started](#getting-started)
4. [Usage](#usage)
- [Publishing Messages](#publishing-messages)
- [Consuming Messages](#consuming-messages)
- [Super Stream](#super-stream)
5. [Examples](#examples)
6. [Development](#development)
- [Compiling](#Compiling)
- [Running Tests](#running-tests)
- [Running Benchmarks](#running-benchmarks)
- [Contributing](#contributing)
- [License](#license)

## Introduction

# RabbitMQ Stream Client
The RabbitMQ Stream Rust Client is a library designed for integrating Rust applications with RabbitMQ streams efficiently. It supports high throughput and low latency message streaming.

This is a Rust client library for working with [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams).
## Installation

### Installation

Install from [crates.io](https://crates.io/)
Install from [crates.io](https://crates.io/crates/rabbitmq-stream-client)

```toml
[dependencies]
rabbitmq-stream-client = "*"
```

### Quick Start

The main access point is `Environment`, which is used to connect to a node.
Then run `cargo build `to include it in your project.

#### Example
## Getting Started
This section covers the initial setup and necessary steps to incorporate the RabbitMQ Stream client into your Rust application.

##### Building the environment
Ensure RabbitMQ server with stream support is installed.
The main access point is `Environment`, which is used to connect to a node.

```rust,no_run
use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
```

##### Building the environment with TLS
### Environment with TLS

```rust,no_run
use rabbitmq_stream_client::Environment;
Expand All @@ -86,7 +100,10 @@ let environment = Environment::builder()
.build()
```

##### Building the environment with a load balancer
### Environment with a load balancer


See the [documentation](https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams#with-a-load-balancer) about the stream and load-balancer.

```rust,no_run
use rabbitmq_stream_client::Environment;
Expand All @@ -99,21 +116,16 @@ let environment = Environment::builder()



##### Publishing messages
## Publishing messages

You can publish messages with three different methods:

* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs)
* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs)
* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs)

```rust,no_run
use rabbitmq_stream_client::{Environment, types::Message};
let environment = Environment::builder().build().await?;
let producer = environment.producer().name("myproducer").build("mystream").await?;
for i in 0..10 {
producer
.send_with_confirm(Message::builder().body(format!("message{}", i)).build())
.await?;
}
producer.close().await?;
```

##### Consuming messages
### Consuming messages

```rust,no_run
use rabbitmq_stream_client::{Environment};
Expand All @@ -136,9 +148,10 @@ sleep(Duration::from_secs(10)).await;
handle.close().await?;
```

### Superstreams

The client supports the superstream functionality.
### Super Stream

The client supports the super-stream functionality.

A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.

Expand All @@ -152,16 +165,21 @@ See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-st

See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/receive_super_stream.rs)

### Development
### Examples

Refer to the [examples](./examples) directory for detailed code samples illustrating various use cases
like error handling, batch processing, super streams and different ways to send messages.

## Development

#### Compiling
### Compiling

```bash
git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client .
make build
```

#### Running Tests
### Running Tests

To run tests you need to have a running RabbitMQ Stream node with a TLS configuration.
It is mandatory to use `make rabbitmq-server` to create a TLS configuration compatible with the tests.
Expand All @@ -172,9 +190,15 @@ make rabbitmq-server
make test
```

#### Running Benchmarks
### Running Benchmarks

```bash
make rabbitmq-server
make run-benchmark
```

## Contributing
Contributions are welcome! Please read our contributing guide to understand how to submit issues, enhancements, or patches.

## License
This project is licensed under the MIT License. See the LICENSE file for details.
59 changes: 18 additions & 41 deletions examples/batch_send.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
use futures::StreamExt;
use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
types::{ByteCapacity, Message},
Environment,
};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;

println!("creating batch_send stream");
let _ = environment.delete_stream("batch_send").await;
let message_count = 10;
let messages_to_batch = 100;
let iterations = 10000;
environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
Expand All @@ -29,41 +24,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let producer = environment.producer().build("batch_send").await?;

let mut messages = Vec::with_capacity(message_count);
for i in 0..message_count {
let msg = Message::builder().body(format!("message{}", i)).build();
messages.push(msg);
for _ in 0..iterations {
println!("accumulating messages in buffer");
let mut messages = Vec::with_capacity(messages_to_batch);
for i in 0..messages_to_batch {
let msg = Message::builder().body(format!("message{}", i)).build();
messages.push(msg);
}

println!("sending in batch mode");
producer
.batch_send(messages, |confirmation_status| async move {
println!("Message confirmed with status {:?}", confirmation_status);
})
.await?;
}

producer
.batch_send(messages, |confirmation_status| async move {
info!("Message confirmed with status {:?}", confirmation_status);
})
.await?;

producer.close().await?;

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build("batch_send")
.await
.unwrap();

for _ in 0..message_count {
let delivery = consumer.next().await.unwrap()?;
info!(
"Got message : {:?} with offset {}",
delivery
.message()
.data()
.map(|data| String::from_utf8(data.to_vec())),
delivery.offset()
);
}

consumer.handle().close().await.unwrap();

environment.delete_stream("batch_send").await?;
Ok(())
}
69 changes: 69 additions & 0 deletions examples/batch_send_with_tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use futures::StreamExt;
use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
Environment,
};
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;

let _ = environment.delete_stream("batch_send").await;
let message_count = 10;
environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
.create("batch_send")
.await?;

let producer = environment.producer().build("batch_send").await?;

let mut messages = Vec::with_capacity(message_count);
for i in 0..message_count {
let msg = Message::builder().body(format!("message{}", i)).build();
messages.push(msg);
}

producer
.batch_send(messages, |confirmation_status| async move {
info!("Message confirmed with status {:?}", confirmation_status);
})
.await?;

producer.close().await?;

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build("batch_send")
.await
.unwrap();

for _ in 0..message_count {
let delivery = consumer.next().await.unwrap()?;
info!(
"Got message : {:?} with offset {}",
delivery
.message()
.data()
.map(|data| String::from_utf8(data.to_vec())),
delivery.offset()
);
}

consumer.handle().close().await.unwrap();

environment.delete_stream("batch_send").await?;
Ok(())
}
Loading
Loading