Skip to content

Commit 49c7103

Browse files
authored
Filtering supported (#225)
* Filtering supported
1 parent fe54e8d commit 49c7103

File tree

23 files changed

+842
-34
lines changed

23 files changed

+842
-34
lines changed

examples/filtering.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use futures::StreamExt;
2+
use rabbitmq_stream_client::types::{Message, OffsetSpecification};
3+
use rabbitmq_stream_client::{Environment, FilterConfiguration};
4+
use tracing::info;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
8+
let environment = Environment::builder()
9+
.host("localhost")
10+
.port(5552)
11+
.build()
12+
.await?;
13+
14+
let message_count = 10;
15+
environment.stream_creator().create("test").await?;
16+
17+
let mut producer = environment
18+
.producer()
19+
.name("test_producer")
20+
.filter_value_extractor(|message| {
21+
String::from_utf8(message.data().unwrap().to_vec()).unwrap()
22+
})
23+
.build("test")
24+
.await?;
25+
26+
// publish filtering message
27+
for i in 0..message_count {
28+
producer
29+
.send_with_confirm(Message::builder().body(i.to_string()).build())
30+
.await?;
31+
}
32+
33+
producer.close().await?;
34+
35+
// publish filtering message
36+
let mut producer = environment
37+
.producer()
38+
.name("test_producer")
39+
.build("test")
40+
.await?;
41+
42+
// publish unset filter value
43+
for i in 0..message_count {
44+
producer
45+
.send_with_confirm(Message::builder().body(i.to_string()).build())
46+
.await?;
47+
}
48+
49+
producer.close().await?;
50+
51+
// filter configuration: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
52+
let filter_configuration =
53+
FilterConfiguration::new(vec!["1".to_string()], false).post_filter(|message| {
54+
String::from_utf8(message.data().unwrap().to_vec()).unwrap_or("".to_string())
55+
== "1".to_string()
56+
});
57+
// let filter_configuration = FilterConfiguration::new(vec!["1".to_string()], true);
58+
59+
let mut consumer = environment
60+
.consumer()
61+
.offset(OffsetSpecification::First)
62+
.filter_input(Some(filter_configuration))
63+
.build("test")
64+
.await
65+
.unwrap();
66+
67+
let task = tokio::task::spawn(async move {
68+
loop {
69+
let delivery = consumer.next().await.unwrap().unwrap();
70+
info!(
71+
"Got message : {:?} with offset {}",
72+
delivery
73+
.message()
74+
.data()
75+
.map(|data| String::from_utf8(data.to_vec())),
76+
delivery.offset()
77+
);
78+
}
79+
});
80+
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(3), task).await;
81+
82+
environment.delete_stream("test").await?;
83+
Ok(())
84+
}

examples/raw_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3939
Message::builder()
4040
.body(format!("message {}", i).as_bytes().to_vec())
4141
.build(),
42+
1,
4243
)
4344
.await
4445
.unwrap();

protocol/src/codec/decoder.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,18 @@ impl Decoder for PublishedMessage {
106106
let (input, publishing_id) = u64::decode(input)?;
107107
let (input, body) = read_vec::<u8>(input)?;
108108
let (_, message) = Message::decode(&body)?;
109-
Ok((input, PublishedMessage::new(publishing_id, message)))
109+
Ok((input, PublishedMessage::new(publishing_id, message, None)))
110+
}
111+
112+
fn decode_version_2(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
113+
let (input, publishing_id) = u64::decode(input)?;
114+
let (input, body) = read_vec::<u8>(input)?;
115+
let (input, filter_value) = <Option<String>>::decode(input)?;
116+
let (_, message) = Message::decode(&body)?;
117+
Ok((
118+
input,
119+
PublishedMessage::new(publishing_id, message, filter_value),
120+
))
110121
}
111122
}
112123

protocol/src/codec/encoder.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,21 @@ impl Encoder for PublishedMessage {
109109
self.message.encode(writer)?;
110110
Ok(())
111111
}
112+
113+
fn encoded_size_version_2(&self) -> u32 {
114+
self.publishing_id.encoded_size()
115+
+ self.filter_value.encoded_size()
116+
+ 4
117+
+ self.message.encoded_size()
118+
}
119+
120+
fn encode_version_2(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
121+
self.publishing_id.encode(writer)?;
122+
self.filter_value.encode(writer)?;
123+
self.message.encoded_size().encode(writer)?;
124+
self.message.encode(writer)?;
125+
Ok(())
126+
}
112127
}
113128

114129
impl Encoder for Vec<PublishedMessage> {
@@ -123,6 +138,20 @@ impl Encoder for Vec<PublishedMessage> {
123138
}
124139
Ok(())
125140
}
141+
142+
fn encoded_size_version_2(&self) -> u32 {
143+
4 + self
144+
.iter()
145+
.fold(0, |acc, v| acc + v.encoded_size_version_2())
146+
}
147+
148+
fn encode_version_2(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
149+
writer.write_u32::<BigEndian>(self.len() as u32)?;
150+
for x in self {
151+
x.encode_version_2(writer)?;
152+
}
153+
Ok(())
154+
}
126155
}
127156

128157
impl Encoder for &str {

protocol/src/codec/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,20 @@ pub mod encoder;
88
pub trait Encoder {
99
fn encoded_size(&self) -> u32;
1010
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError>;
11+
fn encoded_size_version_2(&self) -> u32 {
12+
self.encoded_size()
13+
}
14+
fn encode_version_2(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
15+
self.encode(writer)
16+
}
1117
}
1218

1319
pub trait Decoder
1420
where
1521
Self: Sized,
1622
{
1723
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError>;
24+
fn decode_version_2(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
25+
Decoder::decode(input)
26+
}
1827
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
use std::io::Write;
2+
3+
use crate::{
4+
codec::{decoder::read_vec, Decoder, Encoder},
5+
error::{DecodeError, EncodeError},
6+
protocol::commands::COMMAND_EXCHANGE_COMMAND_VERSIONS,
7+
response::{FromResponse, ResponseCode},
8+
};
9+
10+
use super::Command;
11+
use byteorder::{BigEndian, WriteBytesExt};
12+
13+
#[cfg(test)]
14+
use fake::Fake;
15+
16+
#[cfg_attr(test, derive(fake::Dummy))]
17+
#[derive(PartialEq, Eq, Debug)]
18+
pub struct ExchangeCommandVersion(u16, u16, u16);
19+
20+
impl ExchangeCommandVersion {
21+
pub fn new(key: u16, min_version: u16, max_version: u16) -> Self {
22+
ExchangeCommandVersion(key, min_version, max_version)
23+
}
24+
}
25+
26+
impl Encoder for ExchangeCommandVersion {
27+
fn encoded_size(&self) -> u32 {
28+
self.0.encoded_size() + self.1.encoded_size() + self.2.encoded_size()
29+
}
30+
31+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
32+
self.0.encode(writer)?;
33+
self.1.encode(writer)?;
34+
self.2.encode(writer)?;
35+
36+
Ok(())
37+
}
38+
}
39+
40+
impl Decoder for ExchangeCommandVersion {
41+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
42+
let (input, key) = u16::decode(input)?;
43+
let (input, min_version) = u16::decode(input)?;
44+
let (input, max_version) = u16::decode(input)?;
45+
Ok((input, ExchangeCommandVersion(key, min_version, max_version)))
46+
}
47+
}
48+
49+
impl Encoder for Vec<ExchangeCommandVersion> {
50+
fn encoded_size(&self) -> u32 {
51+
4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
52+
}
53+
54+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
55+
writer.write_u32::<BigEndian>(self.len() as u32)?;
56+
for x in self {
57+
x.encode(writer)?;
58+
}
59+
Ok(())
60+
}
61+
}
62+
63+
impl Decoder for Vec<ExchangeCommandVersion> {
64+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
65+
let (input, result) = read_vec(input)?;
66+
Ok((input, result))
67+
}
68+
}
69+
70+
#[cfg_attr(test, derive(fake::Dummy))]
71+
#[derive(PartialEq, Eq, Debug)]
72+
pub struct ExchangeCommandVersionsRequest {
73+
pub(crate) correlation_id: u32,
74+
commands: Vec<ExchangeCommandVersion>,
75+
}
76+
77+
impl ExchangeCommandVersionsRequest {
78+
pub fn new(correlation_id: u32, commands: Vec<ExchangeCommandVersion>) -> Self {
79+
Self {
80+
correlation_id,
81+
commands,
82+
}
83+
}
84+
}
85+
86+
impl Encoder for ExchangeCommandVersionsRequest {
87+
fn encoded_size(&self) -> u32 {
88+
self.correlation_id.encoded_size() + self.commands.encoded_size()
89+
}
90+
91+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
92+
self.correlation_id.encode(writer)?;
93+
self.commands.encode(writer)?;
94+
Ok(())
95+
}
96+
}
97+
98+
impl Command for ExchangeCommandVersionsRequest {
99+
fn key(&self) -> u16 {
100+
COMMAND_EXCHANGE_COMMAND_VERSIONS
101+
}
102+
}
103+
104+
impl Decoder for ExchangeCommandVersionsRequest {
105+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
106+
let (input, correlation_id) = u32::decode(input)?;
107+
let (input, commands) = <Vec<ExchangeCommandVersion>>::decode(input)?;
108+
Ok((
109+
input,
110+
ExchangeCommandVersionsRequest {
111+
correlation_id,
112+
commands,
113+
},
114+
))
115+
}
116+
}
117+
118+
#[cfg_attr(test, derive(fake::Dummy))]
119+
#[derive(PartialEq, Eq, Debug)]
120+
pub struct ExchangeCommandVersionsResponse {
121+
pub(crate) correlation_id: u32,
122+
response_code: ResponseCode,
123+
commands: Vec<ExchangeCommandVersion>,
124+
}
125+
126+
impl ExchangeCommandVersionsResponse {
127+
pub fn new(
128+
correlation_id: u32,
129+
response_code: ResponseCode,
130+
commands: Vec<ExchangeCommandVersion>,
131+
) -> Self {
132+
Self {
133+
correlation_id,
134+
response_code,
135+
commands,
136+
}
137+
}
138+
139+
pub fn code(&self) -> &ResponseCode {
140+
&self.response_code
141+
}
142+
143+
pub fn is_ok(&self) -> bool {
144+
self.response_code == ResponseCode::Ok
145+
}
146+
147+
pub fn key_version(&self, key_command: u16) -> (u16, u16) {
148+
for i in &self.commands {
149+
match i {
150+
ExchangeCommandVersion(match_key_command, min_version, max_version) => {
151+
if *match_key_command == key_command {
152+
return (*min_version, *max_version);
153+
}
154+
}
155+
}
156+
}
157+
158+
(1, 1)
159+
}
160+
}
161+
162+
impl Encoder for ExchangeCommandVersionsResponse {
163+
fn encoded_size(&self) -> u32 {
164+
self.correlation_id.encoded_size()
165+
+ self.response_code.encoded_size()
166+
+ self.commands.encoded_size()
167+
}
168+
169+
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
170+
self.correlation_id.encode(writer)?;
171+
self.response_code.encode(writer)?;
172+
self.commands.encode(writer)?;
173+
Ok(())
174+
}
175+
}
176+
177+
impl Decoder for ExchangeCommandVersionsResponse {
178+
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
179+
let (input, correlation_id) = u32::decode(input)?;
180+
let (input, response_code) = ResponseCode::decode(input)?;
181+
let (input, commands) = <Vec<ExchangeCommandVersion>>::decode(input)?;
182+
183+
Ok((
184+
input,
185+
ExchangeCommandVersionsResponse {
186+
correlation_id,
187+
response_code,
188+
commands,
189+
},
190+
))
191+
}
192+
}
193+
194+
impl FromResponse for ExchangeCommandVersionsResponse {
195+
fn from_response(response: crate::Response) -> Option<Self> {
196+
match response.kind {
197+
crate::ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
198+
Some(exchange_command_versions)
199+
}
200+
_ => None,
201+
}
202+
}
203+
}
204+
205+
#[cfg(test)]
206+
mod tests {
207+
208+
use crate::commands::tests::command_encode_decode_test;
209+
210+
use super::{ExchangeCommandVersionsRequest, ExchangeCommandVersionsResponse};
211+
212+
#[test]
213+
fn exchange_command_versions_request_test() {
214+
command_encode_decode_test::<ExchangeCommandVersionsRequest>();
215+
}
216+
217+
#[test]
218+
fn exchange_command_versions_response_test() {
219+
command_encode_decode_test::<ExchangeCommandVersionsResponse>();
220+
}
221+
}

0 commit comments

Comments
 (0)