From 132e54c7cd823dd3080a36b1f07c1acbe0d35a8f Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Mon, 4 Nov 2024 16:20:20 +0100 Subject: [PATCH 1/3] improving filtering examples + others --- README.md | 27 +++- examples/filtering.rs | 84 ------------ examples/filtering/receive_with_filtering.rs | 78 +++++++++++ examples/filtering/send_with_filtering.rs | 122 ++++++++++++++++++ .../receive_super_stream.rs | 0 .../{ => superstreams}/send_super_stream.rs | 0 6 files changed, 222 insertions(+), 89 deletions(-) delete mode 100644 examples/filtering.rs create mode 100644 examples/filtering/receive_with_filtering.rs create mode 100644 examples/filtering/send_with_filtering.rs rename examples/{ => superstreams}/receive_super_stream.rs (100%) rename examples/{ => superstreams}/send_super_stream.rs (100%) diff --git a/README.md b/README.md index 472d2e93..696893fe 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide pro - [Publishing Messages](#publishing-messages) - [Consuming Messages](#consuming-messages) - [Super Stream](#super-stream) + - [Filtering](#filtering) 5. [Examples](#examples) 6. [Development](#development) - [Compiling](#Compiling) @@ -121,11 +122,11 @@ let environment = Environment::builder() You can publish messages with three different methods: * `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs) -* `batch_send`: synchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs) +* `batch_send`: asynchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs) * `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs) -### Consuming messages +## Consuming messages ```rust,no_run use rabbitmq_stream_client::{Environment}; @@ -149,7 +150,7 @@ handle.close().await?; ``` -### Super Stream +## Super Stream The client supports the super-stream functionality. @@ -161,9 +162,25 @@ You can use SuperStreamProducer and SuperStreamConsumer classes which internally Have a look to the examples to see on how to work with super streams. -See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/send_super_stream.rs) +See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/send_super_stream.rs) + +See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/receive_super_stream.rs) + + +## Filtering + +Filtering is a new streaming feature enabled from RabbitMQ 3.13 based on Bloom filter. RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages. + +See the Java documentation for more details (Same concepts apply here): + +[Filtering - Java Doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#filtering) + +See Rust filtering examples: + +See the [Producer with filtering Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/send_with_filtering.rs) + +See the [Consumer with filtering Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/receive_with_filtering.rs) -See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/receive_super_stream.rs) ### Examples diff --git a/examples/filtering.rs b/examples/filtering.rs deleted file mode 100644 index 448a2b4b..00000000 --- a/examples/filtering.rs +++ /dev/null @@ -1,84 +0,0 @@ -use futures::StreamExt; -use rabbitmq_stream_client::types::{Message, OffsetSpecification}; -use rabbitmq_stream_client::{Environment, FilterConfiguration}; -use tracing::info; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let environment = Environment::builder() - .host("localhost") - .port(5552) - .build() - .await?; - - let message_count = 10; - environment.stream_creator().create("test").await?; - - let mut producer = environment - .producer() - .name("test_producer") - .filter_value_extractor(|message| { - String::from_utf8(message.data().unwrap().to_vec()).unwrap() - }) - .build("test") - .await?; - - // publish filtering message - for i in 0..message_count { - producer - .send_with_confirm(Message::builder().body(i.to_string()).build()) - .await?; - } - - producer.close().await?; - - // publish filtering message - let mut producer = environment - .producer() - .name("test_producer") - .build("test") - .await?; - - // publish unset filter value - for i in 0..message_count { - producer - .send_with_confirm(Message::builder().body(i.to_string()).build()) - .await?; - } - - producer.close().await?; - - // filter configuration: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering - let filter_configuration = - FilterConfiguration::new(vec!["1".to_string()], false).post_filter(|message| { - String::from_utf8(message.data().unwrap().to_vec()).unwrap_or("".to_string()) - == "1".to_string() - }); - // let filter_configuration = FilterConfiguration::new(vec!["1".to_string()], true); - - let mut consumer = environment - .consumer() - .offset(OffsetSpecification::First) - .filter_input(Some(filter_configuration)) - .build("test") - .await - .unwrap(); - - let task = tokio::task::spawn(async move { - loop { - let delivery = consumer.next().await.unwrap().unwrap(); - info!( - "Got message : {:?} with offset {}", - delivery - .message() - .data() - .map(|data| String::from_utf8(data.to_vec())), - delivery.offset() - ); - } - }); - let _ = tokio::time::timeout(tokio::time::Duration::from_secs(3), task).await; - - environment.delete_stream("test").await?; - Ok(()) -} diff --git a/examples/filtering/receive_with_filtering.rs b/examples/filtering/receive_with_filtering.rs new file mode 100644 index 00000000..e7a0cacc --- /dev/null +++ b/examples/filtering/receive_with_filtering.rs @@ -0,0 +1,78 @@ +/* Receives just Messages where filter California is applied. +This is assured by having added to the vector filter_values of FilterConfiguration the value California +and by the post_filter function to skip false positives +*/ + +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::ResponseCode; +use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification}; +use rabbitmq_stream_client::{Environment, FilterConfiguration}; +use std::convert::TryInto; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let stream = "test_stream_filtering"; + let environment = Environment::builder() + .host("localhost") + .port(5552) + .build() + .await?; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + + // filter configuration: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering + // We are telling the Consumer to ask the server just messages with filter California + // The post_filler is Optional and needed to skip false positives + let filter_configuration = FilterConfiguration::new(vec!["California".to_string()], false) + .post_filter(|message| { + let region: String = message + .application_properties() + .unwrap() + .get("region") + .unwrap() + .clone() + .try_into() + .unwrap(); + + region == "California".to_string() + }); + + let mut consumer = environment + .consumer() + .offset(OffsetSpecification::First) + .filter_input(Some(filter_configuration)) + .build(stream) + .await + .unwrap(); + + // Just Messages with filter California will appear + while let Some(delivery) = consumer.next().await { + let d = delivery.unwrap(); + println!( + "Got message : {:?} with offset {}", + d.message() + .data() + .map(|data| String::from_utf8(data.to_vec())), + d.offset() + ); + } + + Ok(()) +} diff --git a/examples/filtering/send_with_filtering.rs b/examples/filtering/send_with_filtering.rs new file mode 100644 index 00000000..de8d95ee --- /dev/null +++ b/examples/filtering/send_with_filtering.rs @@ -0,0 +1,122 @@ +/* Send 100 messages with filter of Region "California" and 100 messages with filter "Texas" +Filters are specified in the application_properties of the messages using the custom field "Region" +in the filter_value_extractor callback +The main thread wait on a condition variable until all the messages have been confirmed */ + +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::ResponseCode; +use rabbitmq_stream_client::types::{ByteCapacity, Message}; +use rabbitmq_stream_client::Environment; +use std::convert::TryInto; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use tokio::sync::Notify; + +// This callback instruct the Producer on what filter we want to apply +// In this case we are returning the value of the application_property "region" value +fn filter_value_extractor(message: &Message) -> String { + message + .application_properties() + .unwrap() + .get("region") + .unwrap() + .clone() + .try_into() + .unwrap() +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let confirmed_messages = Arc::new(AtomicU32::new(0)); + let notify_on_send = Arc::new(Notify::new()); + let stream = "test_stream_filtering"; + + let environment = Environment::builder() + .host("localhost") + .port(5552) + .build() + .await?; + + let message_count = 200; + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(&stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + + let mut producer = environment + .producer() + .name("test_producer") + // we are telling the producer to use the callback filter_value_extractor to compute the filter + .filter_value_extractor(filter_value_extractor) + .build("test_stream_filtering") + .await?; + + // Sending first 200 messages with filter California + for i in 0..message_count { + let counter = confirmed_messages.clone(); + let notifier = notify_on_send.clone(); + + let msg = Message::builder() + .body(format!("super stream message_{}", i)) + .application_properties() + .insert("region", "California") + .message_builder() + .build(); + + producer + .send(msg, move |_| { + let inner_counter = counter.clone(); + let inner_notifier = notifier.clone(); + async move { + if inner_counter.fetch_add(1, Ordering::Relaxed) == (message_count * 2) - 1 { + inner_notifier.notify_one(); + } + } + }) + .await + .unwrap(); + } + + // Sending 200 messages with filter Texas + for i in 0..message_count { + let counter = confirmed_messages.clone(); + let notifier = notify_on_send.clone(); + let msg = Message::builder() + .body(format!("super stream message_{}", i)) + .application_properties() + .insert("region", "Texas") + .message_builder() + .build(); + + producer + .send(msg, move |_| { + let inner_counter = counter.clone(); + let inner_notifier = notifier.clone(); + async move { + if inner_counter.fetch_add(1, Ordering::Relaxed) == (message_count * 2) - 1 { + inner_notifier.notify_one(); + } + } + }) + .await + .unwrap(); + } + + notify_on_send.notified().await; + producer.close().await?; + + Ok(()) +} diff --git a/examples/receive_super_stream.rs b/examples/superstreams/receive_super_stream.rs similarity index 100% rename from examples/receive_super_stream.rs rename to examples/superstreams/receive_super_stream.rs diff --git a/examples/send_super_stream.rs b/examples/superstreams/send_super_stream.rs similarity index 100% rename from examples/send_super_stream.rs rename to examples/superstreams/send_super_stream.rs From f3b66c7941c49794d4c9c59b3a30c10f3f633072 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 5 Nov 2024 10:26:00 +0100 Subject: [PATCH 2/3] adding Consuming part in the README --- README.md | 44 ++++++++++++++---------------- examples/simple-consumer.rs | 54 +++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 24 deletions(-) create mode 100644 examples/simple-consumer.rs diff --git a/README.md b/README.md index 696893fe..0972e646 100644 --- a/README.md +++ b/README.md @@ -128,27 +128,23 @@ You can publish messages with three different methods: ## Consuming messages -```rust,no_run -use rabbitmq_stream_client::{Environment}; -use futures::StreamExt; -use tokio::task; -use tokio::time::{sleep, Duration}; -let environment = Environment::builder().build().await?; -let mut consumer = environment.consumer().build("mystream").await?; -let handle = consumer.handle(); -task::spawn(async move { - while let Some(delivery) = consumer.next().await { - let d = delivery.unwrap(); - println!("Got message: {:#?} with offset: {}", - d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()), - d.offset(),); - } - }); -// wait 10 second and then close the consumer -sleep(Duration::from_secs(10)).await; -handle.close().await?; -``` +As streams never delete any messages, any consumer can start reading/consuming from any point in the log + +See the Consuming section part of the streaming doc for further info (Most of the examples refer to Java but applies for ths library too): + +[Consuming messages from a stream](https://www.rabbitmq.com/docs/streams#consuming) + +See also the Rust streaming tutorial-2 on how consume messages starting from different positions and how to enable Server-Side Offset Tracking too: + +[RabbitMQ Streams - Rust tutorial 2](https://www.rabbitmq.com/tutorials/tutorial-two-rust-stream) + +and the relative examples from the tutorials: + +[Rust tutorials examples](https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/rust-stream) + +See also a simple example here on how to consume from a stream: +[Consuming messages from a stream example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/simple-consume.rs) ## Super Stream @@ -162,9 +158,9 @@ You can use SuperStreamProducer and SuperStreamConsumer classes which internally Have a look to the examples to see on how to work with super streams. -See the [Super Stream Producer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/send_super_stream.rs) +See the [Super Stream Producer Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/send_super_stream.rs) -See the [Super Stream Consumer Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/receive_super_stream.rs) +See the [Super Stream Consumer Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/receive_super_stream.rs) ## Filtering @@ -177,9 +173,9 @@ See the Java documentation for more details (Same concepts apply here): See Rust filtering examples: -See the [Producer with filtering Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/send_with_filtering.rs) +See the [Producer with filtering Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/send_with_filtering.rs) -See the [Consumer with filtering Example:](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/receive_with_filtering.rs) +See the [Consumer with filtering Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/receive_with_filtering.rs) ### Examples diff --git a/examples/simple-consumer.rs b/examples/simple-consumer.rs new file mode 100644 index 00000000..829757bf --- /dev/null +++ b/examples/simple-consumer.rs @@ -0,0 +1,54 @@ +use futures::StreamExt; +use rabbitmq_stream_client::error::StreamCreateError; +use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + use rabbitmq_stream_client::Environment; + let environment = Environment::builder().build().await?; + let stream = "hello-rust-stream"; + + let create_response = environment + .stream_creator() + .max_length(ByteCapacity::GB(5)) + .create(stream) + .await; + + if let Err(e) = create_response { + if let StreamCreateError::Create { stream, status } = e { + match status { + // we can ignore this error because the stream already exists + ResponseCode::StreamAlreadyExists => {} + err => { + println!("Error creating stream: {:?} {:?}", stream, err); + } + } + } + } + + let mut consumer = environment + .consumer() + .offset(OffsetSpecification::First) + .build(stream) + .await + .unwrap(); + + while let delivery = consumer.next().await.unwrap() { + let delivery = delivery.unwrap(); + println!( + "Got message: {:#?} from stream: {} with offset: {}", + delivery + .message() + .data() + .map(|data| String::from_utf8(data.to_vec()).unwrap()) + .unwrap(), + delivery.stream(), + delivery.offset() + ); + } + + let _ = consumer.handle().close().await; + + println!("Super stream consumer stopped"); + Ok(()) +} From 810becafaa251c2d2303405ee497e316d9681246 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Tue, 5 Nov 2024 11:04:54 +0100 Subject: [PATCH 3/3] fixing PR comments --- README.md | 10 +++++----- examples/filtering/send_with_filtering.rs | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 0972e646..72d2b51a 100644 --- a/README.md +++ b/README.md @@ -144,7 +144,7 @@ and the relative examples from the tutorials: See also a simple example here on how to consume from a stream: -[Consuming messages from a stream example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/simple-consume.rs) +[Consuming messages from a stream example](./examples/simple-consume.rs) ## Super Stream @@ -158,9 +158,9 @@ You can use SuperStreamProducer and SuperStreamConsumer classes which internally Have a look to the examples to see on how to work with super streams. -See the [Super Stream Producer Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/send_super_stream.rs) +See the [Super Stream Producer Example](./examples/superstreams/send_super_stream.rs) -See the [Super Stream Consumer Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/superstreams/receive_super_stream.rs) +See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs) ## Filtering @@ -173,9 +173,9 @@ See the Java documentation for more details (Same concepts apply here): See Rust filtering examples: -See the [Producer with filtering Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/send_with_filtering.rs) +See the [Producer with filtering Example](./examples/filtering/send_with_filtering.rs) -See the [Consumer with filtering Example](https://github.com/rabbitmq/rabbitmq-stream-rust-client/blob/main/examples/filtering/receive_with_filtering.rs) +See the [Consumer with filtering Example](./examples/filtering/receive_with_filtering.rs) ### Examples diff --git a/examples/filtering/send_with_filtering.rs b/examples/filtering/send_with_filtering.rs index de8d95ee..c165dbda 100644 --- a/examples/filtering/send_with_filtering.rs +++ b/examples/filtering/send_with_filtering.rs @@ -58,7 +58,6 @@ async fn main() -> Result<(), Box> { let mut producer = environment .producer() - .name("test_producer") // we are telling the producer to use the callback filter_value_extractor to compute the filter .filter_value_extractor(filter_value_extractor) .build("test_stream_filtering")