Skip to content

Commit f6bffdb

Browse files
committed
super_stream_producer first basic version
1 parent e518122 commit f6bffdb

File tree

2 files changed

+78
-55
lines changed

2 files changed

+78
-55
lines changed

src/superstream.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,16 @@ pub enum RouteType {
99
Key = 1,
1010
}
1111

12-
trait Metadata {
13-
async fn partitions(&mut self) -> Vec<String>;
14-
async fn routes(&mut self, routing_key: String) -> Vec<String>;
12+
#[derive(Clone)]
13+
pub struct DefaultSuperStreamMetadata {
14+
pub super_stream: String,
15+
pub client: Client,
16+
pub partitions: Vec<String>,
17+
pub routes: Vec<String>,
1518
}
1619

17-
struct DefaultSuperStreamMetadata {
18-
super_stream: String,
19-
client: Client,
20-
partitions: Vec<String>,
21-
routes: Vec<String>,
22-
}
23-
24-
impl Metadata for DefaultSuperStreamMetadata {
25-
async fn partitions(&mut self) -> Vec<String> {
20+
impl DefaultSuperStreamMetadata {
21+
pub async fn partitions(&mut self) -> Vec<String> {
2622
if self.partitions.len() == 0 {
2723
let response = self.client.partitions(self.super_stream.clone()).await;
2824

@@ -31,7 +27,7 @@ impl Metadata for DefaultSuperStreamMetadata {
3127

3228
return self.partitions.clone();
3329
}
34-
async fn routes(&mut self, routing_key: String) -> Vec<String> {
30+
pub async fn routes(&mut self, routing_key: String) -> Vec<String> {
3531
if self.routes.len() == 0 {
3632
let response = self
3733
.client
@@ -45,16 +41,17 @@ impl Metadata for DefaultSuperStreamMetadata {
4541
}
4642
}
4743

48-
pub trait RoutingStrategy {
49-
async fn routes(&self, message: Message, metadata: &mut impl Metadata) -> Vec<String>;
50-
}
51-
52-
struct RoutingKeyRoutingStrategy {
44+
#[derive(Clone)]
45+
pub struct RoutingKeyRoutingStrategy {
5346
routing_extractor: &'static dyn Fn(Message) -> String,
5447
}
5548

56-
impl RoutingStrategy for RoutingKeyRoutingStrategy {
57-
async fn routes(&self, message: Message, metadata: &mut impl Metadata) -> Vec<String> {
49+
impl RoutingKeyRoutingStrategy {
50+
pub async fn routes(
51+
&self,
52+
message: Message,
53+
metadata: &mut DefaultSuperStreamMetadata,
54+
) -> Vec<String> {
5855
let key = (self.routing_extractor)(message);
5956

6057
let routes = metadata.routes(key).await;
@@ -63,12 +60,17 @@ impl RoutingStrategy for RoutingKeyRoutingStrategy {
6360
}
6461
}
6562

66-
struct HashRoutingMurmurStrategy {
63+
#[derive(Clone)]
64+
pub struct HashRoutingMurmurStrategy {
6765
routing_extractor: &'static dyn Fn(Message) -> String,
6866
}
6967

70-
impl RoutingStrategy for HashRoutingMurmurStrategy {
71-
async fn routes(&self, message: Message, metadata: &mut impl Metadata) -> Vec<String> {
68+
impl HashRoutingMurmurStrategy {
69+
pub async fn routes(
70+
&self,
71+
message: Message,
72+
metadata: &mut DefaultSuperStreamMetadata,
73+
) -> Vec<String> {
7274
let mut streams: Vec<String> = Vec::new();
7375

7476
let key = (self.routing_extractor)(message);
@@ -83,3 +85,9 @@ impl RoutingStrategy for HashRoutingMurmurStrategy {
8385
return streams;
8486
}
8587
}
88+
89+
#[derive(Clone)]
90+
pub enum RoutingStrategy {
91+
HashRoutingStrategy(HashRoutingMurmurStrategy),
92+
RoutingKeyStrategy(RoutingKeyRoutingStrategy),
93+
}

src/superstream_producer.rs

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,77 +2,85 @@ use crate::{
22
client::Client,
33
environment::Environment,
44
error::{ProducerCreateError, ProducerPublishError},
5-
producer::{ConfirmationStatus, Producer},
6-
superstream::{RouteType, RoutingStrategy},
5+
producer::{ConfirmationStatus, Producer, NoDedup},
6+
superstream::{DefaultSuperStreamMetadata, RoutingStrategy},
77
};
88
use rabbitmq_stream_protocol::message::Message;
9+
use std::collections::HashMap;
910
use std::future::Future;
1011
use std::marker::PhantomData;
1112
use std::sync::Arc;
1213

1314
type FilterValueExtractor = Arc<dyn Fn(&Message) -> String + 'static + Send + Sync>;
1415

1516
#[derive(Clone)]
16-
pub struct NoDedup {}
17-
18-
pub struct Dedup {}
19-
20-
#[derive(Clone)]
21-
pub struct SuperStreamProducer<T>(Arc<SuperStreamProducerInternal>, PhantomData<T>);
17+
pub struct SuperStreamProducer<T>(Arc<SuperStreamProducerInternal<T>>, PhantomData<T>);
2218

2319
/// Builder for [`SuperStreamProducer`]
24-
pub struct SuperStreamProducerBuilder<'a> {
20+
pub struct SuperStreamProducerBuilder<T> {
2521
pub(crate) environment: Environment,
2622
pub(crate) name: Option<String>,
2723
pub filter_value_extractor: Option<FilterValueExtractor>,
28-
pub routing_strategy: &'a dyn RoutingStrategy,
24+
pub routing_strategy: RoutingStrategy,
25+
pub(crate) data: PhantomData<T>,
2926
}
3027

31-
pub struct SuperStreamProducerInternal<'a> {
28+
pub struct SuperStreamProducerInternal<T> {
29+
pub(crate) environment: Environment,
3230
client: Client,
3331
super_stream: String,
3432
publish_version: u16,
35-
producers: Vec<Producer()>,
33+
producers: HashMap<String, Producer<T>>,
3634
filter_value_extractor: Option<FilterValueExtractor>,
37-
routing_strategy: &'a dyn RoutingStrategy,
35+
super_stream_metadata: DefaultSuperStreamMetadata,
36+
routing_strategy: RoutingStrategy,
3837
}
3938

4039
impl SuperStreamProducer<NoDedup> {
4140
pub async fn send<Fut>(
4241
&self,
4342
message: Message,
44-
cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut + Send + Sync + 'static,
45-
) -> Result<(), ProducerPublishError>
46-
where
43+
cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut + Send + Sync + 'static + Clone,
44+
) where
4745
Fut: Future<Output = ()> + Send + Sync + 'static,
4846
{
49-
self.do_send(message, cb).await
50-
}
51-
}
47+
let routes = match self.0.routing_strategy.clone() {
48+
RoutingStrategy::HashRoutingStrategy(routing_strategy) => {
49+
routing_strategy
50+
.routes(message.clone(), &mut self.0.super_stream_metadata.clone())
51+
.await
52+
}
53+
RoutingStrategy::RoutingKeyStrategy(routing_strategy) => {
54+
routing_strategy
55+
.routes(message.clone(), &mut self.0.super_stream_metadata.clone())
56+
.await
57+
}
58+
};
5259

53-
impl SuperStreamProducer<Dedup> {
54-
pub async fn send<Fut>(
55-
&self,
56-
message: Message,
57-
cb: impl Fn(Result<ConfirmationStatus, ProducerPublishError>) -> Fut + Send + Sync + 'static,
58-
) -> Result<(), ProducerPublishError>
59-
where
60-
Fut: Future<Output = ()> + Send + Sync + 'static,
61-
{
62-
self.do_send(message, cb).await
60+
for route in routes.into_iter() {
61+
if !self.0.producers.contains_key(route.as_str()) {
62+
let producer = self.0.environment.producer().build(route.as_str()).await;
63+
self.0.producers.clone().insert(route.clone(), producer.unwrap().clone());
64+
}
65+
66+
let producer = self.0.producers.get(route.as_str()).unwrap();
67+
_ = producer
68+
.send(message.clone(), cb.clone())
69+
.await;
70+
}
6371
}
6472
}
6573

6674
impl<T> SuperStreamProducerBuilder<T> {
6775
pub async fn build(
6876
self,
6977
super_stream: &str,
70-
route_type: RouteType,
78+
route_type: RoutingStrategy,
7179
) -> Result<SuperStreamProducer<T>, ProducerCreateError> {
7280
// Connect to the user specified node first, then look for the stream leader.
7381
// The leader is the recommended node for writing, because writing to a replica will redundantly pass these messages
7482
// to the leader anyway - it is the only one capable of writing.
75-
let mut client = self.environment.create_client().await?;
83+
let client = self.environment.create_client().await?;
7684

7785
let mut publish_version = 1;
7886

@@ -84,14 +92,21 @@ impl<T> SuperStreamProducerBuilder<T> {
8492
}
8593
}
8694

87-
let producers = Vec::new();
95+
let producers = HashMap::new();
8896

8997
let super_stream_producer = SuperStreamProducerInternal {
98+
environment: self.environment.clone(),
9099
super_stream: super_stream.to_string(),
91100
client,
92101
publish_version,
93102
filter_value_extractor: self.filter_value_extractor,
94103
routing_strategy: route_type,
104+
super_stream_metadata: DefaultSuperStreamMetadata {
105+
super_stream: super_stream.to_string(),
106+
client: self.environment.create_client().await?,
107+
partitions: Vec::new(),
108+
routes: Vec::new(),
109+
},
95110
producers,
96111
};
97112

0 commit comments

Comments
 (0)