Skip to content

Commit c05e420

Browse files
adding producer examples (#240)
* adding producer examples * Update documentation Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 30df99f commit c05e420

File tree

5 files changed

+269
-76
lines changed

5 files changed

+269
-76
lines changed

README.md

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<br/>
33
<div align="center">
44
<strong>
5-
A work-in-progress Rust client for RabbitMQ Streams
5+
A Rust client for RabbitMQ Streams
66
</strong>
77
</div>
88

@@ -14,7 +14,7 @@
1414
<img src="https://github.com/rabbitmq/rabbitmq-stream-rust-client/workflows/Tests/badge.svg"
1515
alt="Tests status" />
1616
</a>
17-
17+
1818
<a href="https://crates.io/crates/rabbitmq-stream-client">
1919
<img src="https://img.shields.io/crates/d/rabbitmq-stream-client.svg?style=flat-square"
2020
alt="Download" />
@@ -32,39 +32,53 @@
3232
<a href="https://codecov.io/gh/rabbitmq/rabbitmq-stream-rust-client">
3333
<img src="https://codecov.io/gh/rabbitmq/rabbitmq-stream-rust-client/branch/main/graph/badge.svg?token=2DHIQ20BDE" alt="codecov"/>
3434
</a>
35+
</div>
3536

3637

38+
Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide provides comprehensive information on installation, usage, and examples.
3739

38-
</div>
40+
## Table of Contents
41+
1. [Introduction](#introduction)
42+
2. [Installation](#installation)
43+
3. [Getting Started](#getting-started)
44+
4. [Usage](#usage)
45+
- [Publishing Messages](#publishing-messages)
46+
- [Consuming Messages](#consuming-messages)
47+
- [Super Stream](#super-stream)
48+
5. [Examples](#examples)
49+
6. [Development](#development)
50+
- [Compiling](#Compiling)
51+
- [Running Tests](#running-tests)
52+
- [Running Benchmarks](#running-benchmarks)
53+
- [Contributing](#contributing)
54+
- [License](#license)
3955

56+
## Introduction
4057

41-
# RabbitMQ Stream Client
58+
The RabbitMQ Stream Rust Client is a library designed for integrating Rust applications with RabbitMQ streams efficiently. It supports high throughput and low latency message streaming.
4259

43-
This is a Rust client library for working with [RabbitMQ Streams](https://www.rabbitmq.com/docs/streams).
60+
## Installation
4461

45-
### Installation
46-
47-
Install from [crates.io](https://crates.io/)
62+
Install from [crates.io](https://crates.io/crates/rabbitmq-stream-client)
4863

4964
```toml
5065
[dependencies]
5166
rabbitmq-stream-client = "*"
5267
```
5368

54-
### Quick Start
55-
56-
The main access point is `Environment`, which is used to connect to a node.
69+
Then run `cargo build `to include it in your project.
5770

58-
#### Example
71+
## Getting Started
72+
This section covers the initial setup and necessary steps to incorporate the RabbitMQ Stream client into your Rust application.
5973

60-
##### Building the environment
74+
Ensure RabbitMQ server with stream support is installed.
75+
The main access point is `Environment`, which is used to connect to a node.
6176

6277
```rust,no_run
6378
use rabbitmq_stream_client::Environment;
6479
let environment = Environment::builder().build().await?;
6580
```
66-
67-
##### Building the environment with TLS
81+
### Environment with TLS
6882

6983
```rust,no_run
7084
use rabbitmq_stream_client::Environment;
@@ -86,7 +100,10 @@ let environment = Environment::builder()
86100
.build()
87101
```
88102

89-
##### Building the environment with a load balancer
103+
### Environment with a load balancer
104+
105+
106+
See the [documentation](https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams#with-a-load-balancer) about the stream and load-balancer.
90107

91108
```rust,no_run
92109
use rabbitmq_stream_client::Environment;
@@ -99,21 +116,16 @@ let environment = Environment::builder()
99116

100117

101118

102-
##### Publishing messages
119+
## Publishing messages
120+
121+
You can publish messages with three different methods:
122+
123+
* `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs)
124+
* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs)
125+
* `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs)
103126

104-
```rust,no_run
105-
use rabbitmq_stream_client::{Environment, types::Message};
106-
let environment = Environment::builder().build().await?;
107-
let producer = environment.producer().name("myproducer").build("mystream").await?;
108-
for i in 0..10 {
109-
producer
110-
.send_with_confirm(Message::builder().body(format!("message{}", i)).build())
111-
.await?;
112-
}
113-
producer.close().await?;
114-
```
115127

116-
##### Consuming messages
128+
### Consuming messages
117129

118130
```rust,no_run
119131
use rabbitmq_stream_client::{Environment};
@@ -136,9 +148,10 @@ sleep(Duration::from_secs(10)).await;
136148
handle.close().await?;
137149
```
138150

139-
### Superstreams
140151

141-
The client supports the superstream functionality.
152+
### Super Stream
153+
154+
The client supports the super-stream functionality.
142155

143156
A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes.
144157

@@ -152,16 +165,21 @@ See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-st
152165

153166
See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/receive_super_stream.rs)
154167

155-
### Development
168+
### Examples
169+
170+
Refer to the [examples](./examples) directory for detailed code samples illustrating various use cases
171+
like error handling, batch processing, super streams and different ways to send messages.
172+
173+
## Development
156174

157-
#### Compiling
175+
### Compiling
158176

159177
```bash
160178
git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client .
161179
make build
162180
```
163181

164-
#### Running Tests
182+
### Running Tests
165183

166184
To run tests you need to have a running RabbitMQ Stream node with a TLS configuration.
167185
It is mandatory to use `make rabbitmq-server` to create a TLS configuration compatible with the tests.
@@ -172,9 +190,15 @@ make rabbitmq-server
172190
make test
173191
```
174192

175-
#### Running Benchmarks
193+
### Running Benchmarks
176194

177195
```bash
178196
make rabbitmq-server
179197
make run-benchmark
180198
```
199+
200+
## Contributing
201+
Contributions are welcome! Please read our contributing guide to understand how to submit issues, enhancements, or patches.
202+
203+
## License
204+
This project is licensed under the MIT License. See the LICENSE file for details.

examples/batch_send.rs

Lines changed: 18 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,21 @@
11
use futures::StreamExt;
22
use rabbitmq_stream_client::{
3-
types::{ByteCapacity, Message, OffsetSpecification},
3+
types::{ByteCapacity, Message},
44
Environment,
55
};
6-
use tracing::{info, Level};
7-
use tracing_subscriber::FmtSubscriber;
86

97
#[tokio::main]
108
async fn main() -> Result<(), Box<dyn std::error::Error>> {
11-
let subscriber = FmtSubscriber::builder()
12-
.with_max_level(Level::TRACE)
13-
.finish();
14-
15-
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
169
let environment = Environment::builder()
1710
.host("localhost")
1811
.port(5552)
1912
.build()
2013
.await?;
2114

15+
println!("creating batch_send stream");
2216
let _ = environment.delete_stream("batch_send").await;
23-
let message_count = 10;
17+
let messages_to_batch = 100;
18+
let iterations = 10000;
2419
environment
2520
.stream_creator()
2621
.max_length(ByteCapacity::GB(2))
@@ -29,41 +24,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2924

3025
let producer = environment.producer().build("batch_send").await?;
3126

32-
let mut messages = Vec::with_capacity(message_count);
33-
for i in 0..message_count {
34-
let msg = Message::builder().body(format!("message{}", i)).build();
35-
messages.push(msg);
27+
for _ in 0..iterations {
28+
println!("accumulating messages in buffer");
29+
let mut messages = Vec::with_capacity(messages_to_batch);
30+
for i in 0..messages_to_batch {
31+
let msg = Message::builder().body(format!("message{}", i)).build();
32+
messages.push(msg);
33+
}
34+
35+
println!("sending in batch mode");
36+
producer
37+
.batch_send(messages, |confirmation_status| async move {
38+
println!("Message confirmed with status {:?}", confirmation_status);
39+
})
40+
.await?;
3641
}
3742

38-
producer
39-
.batch_send(messages, |confirmation_status| async move {
40-
info!("Message confirmed with status {:?}", confirmation_status);
41-
})
42-
.await?;
43-
4443
producer.close().await?;
4544

46-
let mut consumer = environment
47-
.consumer()
48-
.offset(OffsetSpecification::First)
49-
.build("batch_send")
50-
.await
51-
.unwrap();
52-
53-
for _ in 0..message_count {
54-
let delivery = consumer.next().await.unwrap()?;
55-
info!(
56-
"Got message : {:?} with offset {}",
57-
delivery
58-
.message()
59-
.data()
60-
.map(|data| String::from_utf8(data.to_vec())),
61-
delivery.offset()
62-
);
63-
}
64-
65-
consumer.handle().close().await.unwrap();
66-
67-
environment.delete_stream("batch_send").await?;
6845
Ok(())
6946
}

examples/batch_send_with_tracing.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::{
3+
types::{ByteCapacity, Message, OffsetSpecification},
4+
Environment,
5+
};
6+
use tracing::{info, Level};
7+
use tracing_subscriber::FmtSubscriber;
8+
9+
#[tokio::main]
10+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
11+
let subscriber = FmtSubscriber::builder()
12+
.with_max_level(Level::TRACE)
13+
.finish();
14+
15+
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
16+
let environment = Environment::builder()
17+
.host("localhost")
18+
.port(5552)
19+
.build()
20+
.await?;
21+
22+
let _ = environment.delete_stream("batch_send").await;
23+
let message_count = 10;
24+
environment
25+
.stream_creator()
26+
.max_length(ByteCapacity::GB(2))
27+
.create("batch_send")
28+
.await?;
29+
30+
let producer = environment.producer().build("batch_send").await?;
31+
32+
let mut messages = Vec::with_capacity(message_count);
33+
for i in 0..message_count {
34+
let msg = Message::builder().body(format!("message{}", i)).build();
35+
messages.push(msg);
36+
}
37+
38+
producer
39+
.batch_send(messages, |confirmation_status| async move {
40+
info!("Message confirmed with status {:?}", confirmation_status);
41+
})
42+
.await?;
43+
44+
producer.close().await?;
45+
46+
let mut consumer = environment
47+
.consumer()
48+
.offset(OffsetSpecification::First)
49+
.build("batch_send")
50+
.await
51+
.unwrap();
52+
53+
for _ in 0..message_count {
54+
let delivery = consumer.next().await.unwrap()?;
55+
info!(
56+
"Got message : {:?} with offset {}",
57+
delivery
58+
.message()
59+
.data()
60+
.map(|data| String::from_utf8(data.to_vec())),
61+
delivery.offset()
62+
);
63+
}
64+
65+
consumer.handle().close().await.unwrap();
66+
67+
environment.delete_stream("batch_send").await?;
68+
Ok(())
69+
}

0 commit comments

Comments
 (0)