Skip to content

Commit ba13c06

Browse files
committed
super_stream implementation
1 parent 4a7cb45 commit ba13c06

File tree

5 files changed

+100
-0
lines changed

5 files changed

+100
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ thiserror = "1.0"
3636
async-trait = "0.1.51"
3737
rand = "0.8"
3838
dashmap = "5.3.4"
39+
murmur3 = "0.5.2"
3940

4041
[dev-dependencies]
4142
tracing-subscriber = "0.3.1"

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ pub mod error;
7979
mod offset_specification;
8080
mod producer;
8181
mod stream_creator;
82+
mod superstream;
8283

8384
pub type RabbitMQStreamResult<T> = Result<T, error::ClientError>;
8485

src/superstream.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
use crate::{
2+
client::Client,
3+
};
4+
5+
use murmur3::murmur3_32;
6+
use std::any::Any;
7+
use rabbitmq_stream_protocol::message::Message;
8+
use std::io::Cursor;
9+
10+
trait Metadata {
11+
12+
async fn partitions(&mut self) -> Vec<String>;
13+
async fn routes(&mut self, routing_key: String) -> Vec<String>;
14+
15+
}
16+
17+
struct DefaultSuperStreamMetadata {
18+
super_stream: String,
19+
client: Client,
20+
partitions: Vec<String>,
21+
routes: Vec<String>,
22+
}
23+
24+
impl Metadata for DefaultSuperStreamMetadata {
25+
26+
async fn partitions(&mut self) -> Vec<String> {
27+
28+
if self.partitions.len() == 0 {
29+
30+
let response = self.client.partitions(self.super_stream.clone()).await;
31+
32+
self.partitions = response.unwrap().streams;
33+
34+
}
35+
36+
return self.partitions.clone()
37+
38+
}
39+
async fn routes(&mut self, routing_key: String) -> Vec<String> {
40+
41+
if self.routes.len() == 0 {
42+
43+
let response = self.client.route(routing_key, self.super_stream.clone()).await;
44+
45+
self.routes = response.unwrap().streams;
46+
47+
}
48+
49+
return self.routes.clone()
50+
51+
}
52+
53+
}
54+
55+
trait RoutingStrategy {
56+
async fn routes(&self, message: Message, metadata: & mut impl Metadata) -> Vec<String>;
57+
}
58+
59+
struct RoutingKeyRoutingStrategy {
60+
routing_extractor: &'static dyn Fn(Message) -> String,
61+
}
62+
63+
impl RoutingStrategy for RoutingKeyRoutingStrategy {
64+
65+
async fn routes(&self, message: Message, metadata: & mut impl Metadata) -> Vec<String> {
66+
67+
let key = (self.routing_extractor)(message);
68+
69+
let routes = metadata.routes(key).await;
70+
71+
return routes;
72+
73+
}
74+
}
75+
76+
struct HashRoutingMurmurStrategy {
77+
routing_extractor: &'static dyn Fn(Message) -> String,
78+
}
79+
80+
impl RoutingStrategy for HashRoutingMurmurStrategy {
81+
82+
async fn routes(&self, message: Message, metadata: & mut impl Metadata) -> Vec<String> {
83+
84+
let mut streams: Vec<String> = Vec::new();
85+
86+
let key = (self.routing_extractor)(message);
87+
let hash_result = murmur3_32(&mut Cursor::new(key), 104729);
88+
89+
let number_of_partitions = metadata.partitions().await.len();
90+
let route = hash_result.unwrap() % number_of_partitions as u32;
91+
let partitions: Vec<String> = metadata.partitions().await;
92+
let stream = partitions.into_iter().nth(route as usize).unwrap();
93+
streams.push(stream);
94+
95+
return streams
96+
97+
}
98+
}

src/superstream_consumer.rs

Whitespace-only changes.

src/superstream_producer.rs

Whitespace-only changes.

0 commit comments

Comments
 (0)