Skip to content

Commit cac809b

Browse files
authored
Deserialize ClientOption with serde (#271)
* Move TlsConfiguration under client::options * Rework TlsConfiguration * ClientOption is deserializable * TlsConfigurationBuilder enables tls on every method * Add example. Implement ByteCapacity deserialization * Remove serde from default * Simplify ByteCapacity deserialization
1 parent 4f39945 commit cac809b

File tree

11 files changed

+1020
-327
lines changed

11 files changed

+1020
-327
lines changed

Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ keywords = ["AMQP", "IoT", "messaging", "streams"]
1010
categories = ["network-programming"]
1111
readme = "README.md"
1212

13-
13+
[package.metadata."docs.rs"]
14+
all-features = true
1415

1516
[workspace]
1617
members = [
@@ -37,11 +38,17 @@ async-trait = "0.1.51"
3738
rand = "0.8"
3839
dashmap = "6.1.0"
3940
murmur3 = "0.5.2"
41+
serde = { version = "1.0", features = ["derive"], optional = true }
4042

4143
[dev-dependencies]
4244
tracing-subscriber = "0.3.1"
4345
fake = { version = "3.0.0", features = ['derive'] }
4446
chrono = "0.4.26"
47+
serde_json = "1.0"
48+
49+
[features]
50+
default = []
51+
serde = ["dep:serde"]
4552

4653
[[example]]
4754
name="receive_super_stream"
@@ -55,3 +62,7 @@ path="examples/superstreams/send_super_stream_routing_key.rs"
5562
[[example]]
5663
name="send_super_stream"
5764
path="examples/superstreams/send_super_stream.rs"
65+
[[example]]
66+
name="environment_deserialization"
67+
path="examples/environment_deserialization.rs"
68+

Makefile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ fmt:
55
cargo fmt --all -- --check
66

77
clippy:
8-
cargo clippy --all -- -D warnings
8+
cargo clippy --all --all-features -- -D warnings
99

1010
build: fmt clippy
11+
cargo build --all-features
1112
cargo build
1213

1314
test: build
14-
cargo test --all -- --nocapture
15+
cargo test --all --all-features -- --nocapture
1516

1617
watch: build
1718
cargo watch -x 'test --all -- --nocapture'
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#[cfg(feature = "serde")]
2+
mod example {
3+
use tracing::{info, Level};
4+
use tracing_subscriber::FmtSubscriber;
5+
6+
use futures::StreamExt;
7+
use rabbitmq_stream_client::{
8+
types::{ByteCapacity, Message, OffsetSpecification},
9+
ClientOptions, Environment,
10+
};
11+
use serde::*;
12+
13+
#[derive(Deserialize)]
14+
struct QueueConfig {
15+
#[serde(flatten)]
16+
rabbitmq: ClientOptions,
17+
stream_name: String,
18+
capabity: ByteCapacity,
19+
producer_client_name: String,
20+
consumer_client_name: String,
21+
}
22+
23+
#[derive(Deserialize)]
24+
struct MyConfig {
25+
queue_config: QueueConfig,
26+
}
27+
28+
pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
29+
let subscriber = FmtSubscriber::builder()
30+
.with_max_level(Level::TRACE)
31+
.finish();
32+
33+
tracing::subscriber::set_global_default(subscriber)
34+
.expect("setting default subscriber failed");
35+
36+
// The configuration is loadable from a file.
37+
// Here, just for semplification, we use a JSON string.
38+
// NB: we use `serde` internally, so you can use any format supported by serde.
39+
let j = r#"
40+
{
41+
"queue_config": {
42+
"host": "localhost",
43+
"tls": {
44+
"enabled": false
45+
},
46+
"stream_name": "test",
47+
"capabity": "5GB",
48+
"producer_client_name": "producer",
49+
"consumer_client_name": "consumer"
50+
}
51+
}
52+
"#;
53+
let my_config: MyConfig = serde_json::from_str(j).unwrap();
54+
let environment = Environment::from_client_option(my_config.queue_config.rabbitmq).await?;
55+
56+
let message_count = 10;
57+
environment
58+
.stream_creator()
59+
.max_length(my_config.queue_config.capabity)
60+
.create(&my_config.queue_config.stream_name)
61+
.await?;
62+
63+
let producer = environment
64+
.producer()
65+
.client_provided_name(&my_config.queue_config.producer_client_name)
66+
.build(&my_config.queue_config.stream_name)
67+
.await?;
68+
69+
for i in 0..message_count {
70+
producer
71+
.send_with_confirm(Message::builder().body(format!("message{}", i)).build())
72+
.await?;
73+
}
74+
75+
producer.close().await?;
76+
77+
let mut consumer = environment
78+
.consumer()
79+
.client_provided_name(&my_config.queue_config.consumer_client_name)
80+
.offset(OffsetSpecification::First)
81+
.build(&my_config.queue_config.stream_name)
82+
.await
83+
.unwrap();
84+
85+
for _ in 0..message_count {
86+
let delivery = consumer.next().await.unwrap()?;
87+
info!(
88+
"Got message : {:?} with offset {}",
89+
delivery
90+
.message()
91+
.data()
92+
.map(|data| String::from_utf8(data.to_vec())),
93+
delivery.offset()
94+
);
95+
}
96+
97+
consumer.handle().close().await.unwrap();
98+
99+
environment.delete_stream("test").await?;
100+
101+
Ok(())
102+
}
103+
}
104+
#[cfg(not(feature = "serde"))]
105+
mod example {
106+
pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
107+
panic!("This example requires the 'serde' feature to be enabled. Add `--features serde` to the cargo command.");
108+
}
109+
}
110+
111+
#[tokio::main]
112+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
113+
example::run().await?;
114+
115+
Ok(())
116+
}

examples/simple-consumer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3333
.await
3434
.unwrap();
3535

36-
while let delivery = consumer.next().await.unwrap() {
37-
let delivery = delivery.unwrap();
36+
while let Some(Ok(delivery)) = consumer.next().await {
3837
println!(
3938
"Got message: {:#?} from stream: {} with offset: {}",
4039
delivery

examples/tls_producer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1717
// it will start a rabbitmq server with compatible TLS certificates
1818
let tls_configuration: TlsConfiguration = TlsConfiguration::builder()
1919
.add_root_certificates(String::from(".ci/certs/ca_certificate.pem"))
20-
.build();
20+
.build()?;
2121

2222
// Use this configuration if you want to trust the certificates
2323
// without providing the root certificate and the client certificates

src/byte_capacity.rs

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,96 @@ impl ByteCapacity {
2222
}
2323
}
2424
}
25+
26+
#[cfg(feature = "serde")]
27+
impl<'de> serde::Deserialize<'de> for ByteCapacity {
28+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
29+
where
30+
D: serde::Deserializer<'de>,
31+
{
32+
use serde::Deserialize;
33+
34+
#[derive(Deserialize)]
35+
#[serde(untagged)]
36+
enum StringOrNumber {
37+
String(String),
38+
Number(u64),
39+
}
40+
41+
macro_rules! match_suffix {
42+
($str: ident, $suf: expr, $variant: expr) => {
43+
if $str.ends_with($suf) {
44+
let num = $str
45+
.trim_end_matches($suf)
46+
.parse()
47+
.map_err(serde::de::Error::custom)?;
48+
return Ok(($variant)(num));
49+
}
50+
};
51+
}
52+
53+
let s_or_n = StringOrNumber::deserialize(deserializer)?;
54+
55+
match s_or_n {
56+
StringOrNumber::String(str) => {
57+
match_suffix!(str, "TB", ByteCapacity::TB);
58+
match_suffix!(str, "GB", ByteCapacity::GB);
59+
match_suffix!(str, "MB", ByteCapacity::MB);
60+
match_suffix!(str, "KB", ByteCapacity::KB);
61+
match_suffix!(str, "B", ByteCapacity::B);
62+
63+
let num = str.parse().map_err(|_| {
64+
serde::de::Error::custom(
65+
"Expect a number or a string with a TB|GB|MB|KB|B suffix",
66+
)
67+
})?;
68+
Ok(ByteCapacity::B(num))
69+
}
70+
StringOrNumber::Number(num) => Ok(ByteCapacity::B(num)),
71+
}
72+
}
73+
}
74+
75+
#[cfg(feature = "serde")]
76+
mod tests {
77+
#[test]
78+
fn test_deserilize_byte_capacity() {
79+
use crate::types::ByteCapacity;
80+
81+
assert!(matches!(
82+
serde_json::from_str::<ByteCapacity>("\"5GB\""),
83+
Ok(ByteCapacity::GB(5))
84+
));
85+
assert!(matches!(
86+
serde_json::from_str::<ByteCapacity>("\"5TB\""),
87+
Ok(ByteCapacity::TB(5))
88+
));
89+
assert!(matches!(
90+
serde_json::from_str::<ByteCapacity>("\"5MB\""),
91+
Ok(ByteCapacity::MB(5))
92+
));
93+
assert!(matches!(
94+
serde_json::from_str::<ByteCapacity>("\"5KB\""),
95+
Ok(ByteCapacity::KB(5))
96+
));
97+
assert!(matches!(
98+
serde_json::from_str::<ByteCapacity>("\"5B\""),
99+
Ok(ByteCapacity::B(5))
100+
));
101+
assert!(matches!(
102+
serde_json::from_str::<ByteCapacity>("\"5\""),
103+
Ok(ByteCapacity::B(5))
104+
));
105+
assert!(matches!(
106+
serde_json::from_str::<ByteCapacity>("5"),
107+
Ok(ByteCapacity::B(5))
108+
));
109+
let err = serde_json::from_str::<ByteCapacity>("\"Wrong string format\"")
110+
.err()
111+
.expect("Expect an error");
112+
assert_eq!(
113+
err.to_string(),
114+
"Expect a number or a string with a TB|GB|MB|KB|B suffix"
115+
);
116+
}
117+
}

0 commit comments

Comments
 (0)