Skip to content

Commit adedde3

Browse files
committed
implementing super_stream consumer
1 parent aa085c3 commit adedde3

File tree

7 files changed

+165
-14
lines changed

7 files changed

+165
-14
lines changed

src/environment.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
stream_creator::StreamCreator,
1414
superstream::RoutingStrategy,
1515
superstream_producer::SuperStreamProducerBuilder,
16+
superstream_consumer::SuperStreamConsumerBuilder,
1617
RabbitMQStreamResult,
1718
};
1819

@@ -72,6 +73,14 @@ impl Environment {
7273
filter_configuration: None,
7374
}
7475
}
76+
77+
pub fn super_stream_consumer(&self) -> SuperStreamConsumerBuilder {
78+
SuperStreamConsumerBuilder {
79+
environment: self.clone(),
80+
offset_specification: OffsetSpecification::Next,
81+
}
82+
}
83+
7584
pub(crate) async fn create_client(&self) -> RabbitMQStreamResult<Client> {
7685
Client::connect(self.options.client_options.clone()).await
7786
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ mod offset_specification;
8080
mod producer;
8181
mod stream_creator;
8282
mod superstream;
83+
mod superstream_consumer;
8384
mod superstream_producer;
8485

8586
pub type RabbitMQStreamResult<T> = Result<T, error::ClientError>;
@@ -98,6 +99,7 @@ pub mod types {
9899
pub use crate::stream_creator::StreamCreator;
99100
pub use crate::superstream::HashRoutingMurmurStrategy;
100101
pub use crate::superstream::RoutingKeyRoutingStrategy;
102+
pub use crate::superstream_consumer::SuperStreamConsumer;
101103
pub use crate::superstream::RoutingStrategy;
102104
pub use rabbitmq_stream_protocol::message::Message;
103105
pub use rabbitmq_stream_protocol::{Response, ResponseCode, ResponseKind};

src/superstream.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,6 @@ use murmur3::murmur3_32;
44
use rabbitmq_stream_protocol::message::Message;
55
use std::io::Cursor;
66

7-
/*
8-
pub enum RouteType {
9-
Hash = 0,
10-
Key = 1,
11-
}
12-
*/
13-
147
#[derive(Clone)]
158
pub struct DefaultSuperStreamMetadata {
169
pub super_stream: String,

src/superstream_consumer.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::sync::Arc;
2+
3+
use rabbitmq_stream_protocol::{commands::subscribe::OffsetSpecification, message::Message};
4+
5+
use crate::superstream::DefaultSuperStreamMetadata;
6+
use crate::{error::ConsumerCreateError, Client, Consumer, Environment};
7+
8+
type FilterPredicate = Option<Arc<dyn Fn(&Message) -> bool + Send + Sync>>;
9+
10+
/// API for consuming RabbitMQ stream messages
11+
#[derive(Clone)]
12+
pub struct SuperStreamConsumer {
13+
pub internal: Arc<SuperStreamConsumerInternal>,
14+
}
15+
16+
struct SuperStreamConsumerInternal {
17+
client: Client,
18+
super_stream: String,
19+
offset_specification: OffsetSpecification,
20+
pub consumers: Vec<Consumer>,
21+
}
22+
23+
/// Builder for [`Consumer`]
24+
pub struct SuperStreamConsumerBuilder {
25+
pub(crate) environment: Environment,
26+
pub(crate) offset_specification: OffsetSpecification,
27+
}
28+
29+
impl SuperStreamConsumerBuilder {
30+
pub async fn build(
31+
self,
32+
super_stream: &str,
33+
) -> Result<SuperStreamConsumer, ConsumerCreateError> {
34+
// Connect to the user specified node first, then look for a random replica to connect to instead.
35+
// This is recommended for load balancing purposes.
36+
let client = self.environment.create_client().await?;
37+
38+
let mut super_stream_metadata = DefaultSuperStreamMetadata {
39+
super_stream: super_stream.to_string(),
40+
client: client.clone(),
41+
partitions: Vec::new(),
42+
routes: Vec::new(),
43+
};
44+
let partitions = super_stream_metadata.partitions().await;
45+
let mut consumers: Vec<Consumer> = Vec::new();
46+
47+
for partition in partitions.into_iter() {
48+
let consumer = self
49+
.environment
50+
.consumer()
51+
.offset(self.offset_specification.clone())
52+
.build(partition.as_str())
53+
.await
54+
.unwrap();
55+
56+
consumers.push(consumer);
57+
}
58+
59+
let super_stream_consumer_internal = Arc::new(SuperStreamConsumerInternal {
60+
super_stream: super_stream.to_string(),
61+
client: client.clone(),
62+
offset_specification: self.offset_specification.clone(),
63+
consumers,
64+
});
65+
66+
Ok(SuperStreamConsumer {
67+
internal: super_stream_consumer_internal,
68+
})
69+
}
70+
71+
pub fn offset(mut self, offset_specification: OffsetSpecification) -> Self {
72+
self.offset_specification = offset_specification;
73+
self
74+
}
75+
}
76+
77+
impl SuperStreamConsumer {
78+
79+
pub async fn get_consumers(&self) -> &Vec<Consumer> {
80+
return &self.internal.consumers
81+
}
82+
}

src/superstream_producer.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::future::Future;
1212
use std::marker::PhantomData;
1313
use std::sync::Arc;
1414

15-
type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
15+
//type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
1616

1717
#[derive(Clone)]
1818
pub struct SuperStreamProducer<T>(
@@ -62,8 +62,8 @@ impl SuperStreamProducer<NoDedup> {
6262

6363
for route in routes.into_iter() {
6464
if !self.1.contains_key(route.as_str()) {
65-
let producer = self.0.environment.producer().build(route.as_str()).await;
66-
self.1.insert(route.clone(), producer.unwrap());
65+
let producer = self.0.environment.producer().build(route.as_str()).await;
66+
self.1.insert(route.clone(), producer.unwrap());
6767
}
6868

6969
let producer = self.1.get(route.as_str()).unwrap();
@@ -72,7 +72,6 @@ impl SuperStreamProducer<NoDedup> {
7272
Ok(())
7373
}
7474

75-
7675
pub async fn close(self) -> Result<(), ProducerCloseError> {
7776
self.0.client.close().await?;
7877

tests/integration/consumer_test.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,20 @@ use std::time::Duration;
33
use crate::common::TestEnvironment;
44
use fake::{Fake, Faker};
55
use futures::StreamExt;
6+
use tokio::task;
67
use rabbitmq_stream_client::{
78
error::{
89
ClientError, ConsumerCloseError, ConsumerDeliveryError, ConsumerStoreOffsetError,
910
ProducerCloseError,
1011
},
11-
types::{Delivery, Message, OffsetSpecification},
12+
types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer},
1213
Consumer, FilterConfiguration, NoDedup, Producer,
1314
};
1415

1516
use rabbitmq_stream_protocol::ResponseCode;
1617
use std::sync::Arc;
18+
use std::sync::atomic::{AtomicU32, Ordering};
19+
use rabbitmq_stream_client::types::{HashRoutingMurmurStrategy, RoutingStrategy};
1720

1821
#[tokio::test(flavor = "multi_thread")]
1922
async fn consumer_test() {
@@ -54,6 +57,69 @@ async fn consumer_test() {
5457
producer.close().await.unwrap();
5558
}
5659

60+
fn hash_strategy_value_extractor(message: &Message) -> String {
61+
let s = String::from_utf8(Vec::from(message.data().unwrap())).expect("Found invalid UTF-8");
62+
return s;
63+
}
64+
65+
#[tokio::test(flavor = "multi_thread")]
66+
async fn super_stream_consumer_test() {
67+
let env = TestEnvironment::create_super_stream().await;
68+
let reference: String = Faker.fake();
69+
70+
let message_count = 10;
71+
let mut super_stream_producer = env
72+
.env
73+
.super_stream_producer(RoutingStrategy::HashRoutingStrategy(
74+
HashRoutingMurmurStrategy {
75+
routing_extractor: &hash_strategy_value_extractor,
76+
},
77+
))
78+
.build(&env.super_stream)
79+
.await
80+
.unwrap();
81+
82+
static super_stream_consumer: SuperStreamConsumer = env
83+
.env
84+
.super_stream_consumer()
85+
.offset(OffsetSpecification::Next)
86+
.build(&env.stream)
87+
.await
88+
.unwrap();
89+
90+
for n in 0..message_count {
91+
let msg = Message::builder().body(format!("message{}", n)).build();
92+
let _ = super_stream_producer
93+
.send(msg, |confirmation_status| async move {
94+
println!("Message confirmed with status {:?}", confirmation_status);
95+
})
96+
.await
97+
.unwrap();
98+
}
99+
100+
101+
let received_messages = Arc::new(AtomicU32::new(0));
102+
let consumers = super_stream_consumer.get_consumers().await;
103+
104+
let mut tasks = Vec::new();
105+
for mut consumer in consumers.into_iter() {
106+
let received_messages_outer = received_messages.clone();
107+
tasks.push(task::spawn(async move {
108+
let inner_received_messages = received_messages_outer.clone();
109+
let delivery = consumer.next().await.unwrap();
110+
let _ = String::from_utf8(delivery.unwrap().message().data().unwrap().to_vec()).unwrap();
111+
inner_received_messages.fetch_add(1, Ordering::Relaxed);
112+
113+
}));
114+
}
115+
116+
futures::future::join_all(tasks).await;
117+
118+
assert!(received_messages.fetch_add(1, Ordering::Relaxed) == message_count);
119+
//consumer.handle().close().await.unwrap();
120+
super_stream_producer.close().await.unwrap();
121+
}
122+
57123
#[tokio::test(flavor = "multi_thread")]
58124
async fn consumer_test_offset_specification_offset() {
59125
let env = TestEnvironment::create().await;

tests/integration/producer_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ fn hash_strategy_value_extractor(message: &Message) -> String {
403403
#[tokio::test(flavor = "multi_thread")]
404404
async fn key_super_steam_producer_test() {
405405
let env = TestEnvironment::create_super_stream().await;
406-
let mut confirmed_messages = Arc::new(AtomicU32::new(0));
406+
let confirmed_messages = Arc::new(AtomicU32::new(0));
407407
let notify_on_send = Arc::new(Notify::new());
408408
let message_count = 100;
409409

@@ -443,7 +443,7 @@ async fn key_super_steam_producer_test() {
443443
#[tokio::test(flavor = "multi_thread")]
444444
async fn hash_super_steam_producer_test() {
445445
let env = TestEnvironment::create_super_stream().await;
446-
let mut confirmed_messages = Arc::new(AtomicU32::new(0));
446+
let confirmed_messages = Arc::new(AtomicU32::new(0));
447447
let notify_on_send = Arc::new(Notify::new());
448448
let message_count = 100;
449449

0 commit comments

Comments
 (0)