From 1dbb06cbb894f0ddd71019815f58745d58e62c9d Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Fri, 7 Feb 2025 16:37:56 +0100 Subject: [PATCH 1/5] Update rustls-pemfile --- Cargo.toml | 2 +- src/client/mod.rs | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 42dc063a..4b1ca457 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ members = [ [dependencies] tokio-rustls = {version="0.24.1", features=["dangerous_configuration"]} -rustls-pemfile = "1.0.4" +rustls-pemfile = "2.2.0" rabbitmq-stream-protocol = { version = "0.7", path = "protocol" } tokio = { version = "1.29.1", features = ["full"] } tokio-util = { version = "0.7.3", features = ["codec"] } diff --git a/src/client/mod.rs b/src/client/mod.rs index 98bab85b..a1efbf4e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -765,10 +765,12 @@ impl Client { let mut roots = rustls::RootCertStore::empty(); let cert_bytes = std::fs::read(root_ca_cert.unwrap()); - let root_cert_store = rustls_pemfile::certs(&mut cert_bytes.unwrap().as_ref()).unwrap(); + let root_cert_store: Result, _> = + rustls_pemfile::certs(&mut cert_bytes.unwrap().as_ref()).collect(); + let root_cert_store = root_cert_store.unwrap(); root_cert_store - .iter() + .into_iter() .for_each(|cert| roots.add(&rustls::Certificate(cert.to_vec())).unwrap()); Ok(roots) } @@ -777,8 +779,11 @@ impl Client { client_cert: &Path, ) -> std::io::Result> { let mut pem = BufReader::new(File::open(client_cert)?); - let certs = rustls_pemfile::certs(&mut pem)?; - let certs = certs.into_iter().map(rustls::Certificate); + let certs: Result, _> = rustls_pemfile::certs(&mut pem).collect(); + let certs = certs?; + let certs = certs + .into_iter() + .map(|cert| rustls::Certificate(cert.to_vec())); Ok(certs.collect()) } @@ -786,8 +791,11 @@ impl Client { client_private_key: &Path, ) -> std::io::Result> { let mut pem = BufReader::new(File::open(client_private_key)?); - let keys = rustls_pemfile::pkcs8_private_keys(&mut pem)?; - let keys = keys.into_iter().map(PrivateKey); + let keys: Result, _> = rustls_pemfile::pkcs8_private_keys(&mut pem).collect(); + let keys = keys?; + let keys = keys + .into_iter() + .map(|c| PrivateKey(c.secret_pkcs8_der().to_vec())); Ok(keys.collect()) } From ca865ae67a59f9605cbbbeea11076949b3e439ef Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Fri, 7 Feb 2025 17:16:06 +0100 Subject: [PATCH 2/5] Update tokio-rustls --- Cargo.toml | 2 +- src/client/mod.rs | 75 +++++++++++++++++++++++++++++------------------ 2 files changed, 47 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4b1ca457..e7d89044 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ members = [ [dependencies] -tokio-rustls = {version="0.24.1", features=["dangerous_configuration"]} +tokio-rustls = { version = "0.26.1" } rustls-pemfile = "2.2.0" rabbitmq-stream-protocol = { version = "0.7", path = "protocol" } tokio = { version = "1.29.1", features = ["full"] } diff --git a/src/client/mod.rs b/src/client/mod.rs index a1efbf4e..e2e40dc9 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -19,8 +19,6 @@ use pin_project::pin_project; use rabbitmq_stream_protocol::commands::exchange_command_versions::{ ExchangeCommandVersionsRequest, ExchangeCommandVersionsResponse, }; -use rustls::PrivateKey; -use rustls::ServerName; use std::{fs::File, io::BufReader, path::Path}; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; @@ -28,6 +26,7 @@ use tokio::io::ReadBuf; use tokio::sync::RwLock; use tokio::{net::TcpStream, sync::Notify}; use tokio_rustls::client::TlsStream; +use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer, ServerName}; use tokio_rustls::rustls::ClientConfig; use tokio_rustls::{rustls, TlsConnector}; @@ -547,7 +546,7 @@ impl Client { } let connector = TlsConnector::from(Arc::new(config)); - let domain = ServerName::try_from(broker.host.as_str()).unwrap(); + let domain = ServerName::try_from(broker.host.clone()).unwrap(); let conn = connector.connect(domain, stream).await?; GenericTcpStream::SecureTcp(conn) } else { @@ -771,31 +770,26 @@ impl Client { root_cert_store .into_iter() - .for_each(|cert| roots.add(&rustls::Certificate(cert.to_vec())).unwrap()); + .for_each(|cert| roots.add(cert).unwrap()); Ok(roots) } async fn build_client_certificates( client_cert: &Path, - ) -> std::io::Result> { + ) -> std::io::Result>> { let mut pem = BufReader::new(File::open(client_cert)?); - let certs: Result, _> = rustls_pemfile::certs(&mut pem).collect(); - let certs = certs?; - let certs = certs - .into_iter() - .map(|cert| rustls::Certificate(cert.to_vec())); - Ok(certs.collect()) + rustls_pemfile::certs(&mut pem) + .map(|c| c.map(|c| c.into_owned())) + .collect() } async fn build_client_private_keys( client_private_key: &Path, - ) -> std::io::Result> { + ) -> std::io::Result>> { let mut pem = BufReader::new(File::open(client_private_key)?); let keys: Result, _> = rustls_pemfile::pkcs8_private_keys(&mut pem).collect(); let keys = keys?; - let keys = keys - .into_iter() - .map(|c| PrivateKey(c.secret_pkcs8_der().to_vec())); + let keys = keys.into_iter().map(|c| PrivateKeyDer::from(c)); Ok(keys.collect()) } @@ -808,7 +802,6 @@ impl Client { if client_certificate_path.is_empty() { config = ClientConfig::builder() - .with_safe_defaults() .with_root_certificates(roots.clone()) .with_no_client_auth(); } else { @@ -820,7 +813,6 @@ impl Client { Self::build_client_private_keys(Path::new(&broker.tls.get_client_keys_path())) .await?; config = ClientConfig::builder() - .with_safe_defaults() .with_root_certificates(roots.clone()) .with_client_auth_cert(client_certs, client_keys.into_iter().next().unwrap()) .unwrap(); @@ -831,30 +823,55 @@ impl Client { async fn build_tls_client_configuration_untrusted() -> Result { mod danger { - use std::time::SystemTime; - - use tokio_rustls::rustls; - use tokio_rustls::rustls::client::{ServerCertVerified, ServerCertVerifier}; + use rustls::client::danger::HandshakeSignatureValid; + use rustls::client::danger::ServerCertVerified; + use tokio_rustls::rustls::{ + self, client::danger::ServerCertVerifier, pki_types::ServerName, + }; + #[derive(Debug)] pub struct NoCertificateVerification {} impl ServerCertVerifier for NoCertificateVerification { + fn verify_tls12_signature( + &self, + _: &[u8], + _: &rustls::pki_types::CertificateDer<'_>, + _: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _: &[u8], + _: &rustls::pki_types::CertificateDer<'_>, + _: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + // I know know if this is correct + vec![] + } + fn verify_server_cert( &self, - _end_entity: &rustls::Certificate, - _intermediates: &[rustls::Certificate], - _server_name: &rustls::ServerName, - _scts: &mut dyn Iterator, - _ocsp_response: &[u8], - _now: SystemTime, - ) -> Result { + _: &rustls::pki_types::CertificateDer<'_>, + _: &[rustls::pki_types::CertificateDer<'_>], + _: &ServerName<'_>, + _: &[u8], + _: rustls::pki_types::UnixTime, + ) -> Result + { Ok(ServerCertVerified::assertion()) } } } let config = ClientConfig::builder() - .with_safe_defaults() + .dangerous() .with_custom_certificate_verifier(Arc::new(danger::NoCertificateVerification {})) .with_no_client_auth(); From 8c32a3f4688d1ff1431c4d682099f034cb550033 Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Fri, 7 Feb 2025 19:22:35 +0100 Subject: [PATCH 3/5] Fix clippy --- src/client/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index e2e40dc9..6cf0530f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -779,7 +779,7 @@ impl Client { ) -> std::io::Result>> { let mut pem = BufReader::new(File::open(client_cert)?); rustls_pemfile::certs(&mut pem) - .map(|c| c.map(|c| c.into_owned())) + .map(|c| c.map(CertificateDer::into_owned)) .collect() } @@ -789,7 +789,7 @@ impl Client { let mut pem = BufReader::new(File::open(client_private_key)?); let keys: Result, _> = rustls_pemfile::pkcs8_private_keys(&mut pem).collect(); let keys = keys?; - let keys = keys.into_iter().map(|c| PrivateKeyDer::from(c)); + let keys = keys.into_iter().map(PrivateKeyDer::from); Ok(keys.collect()) } From 3479111659964508ac3f815c49aa45ac38c5f827 Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Mon, 10 Feb 2025 10:24:21 +0100 Subject: [PATCH 4/5] Fix test --- src/client/mod.rs | 17 ++++++- tests/integration/consumer_test.rs | 69 +++++++++++++-------------- tests/integration/environment_test.rs | 1 + 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 6cf0530f..ed04c7b2 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -852,8 +852,23 @@ impl Client { } fn supported_verify_schemes(&self) -> Vec { + use rustls::SignatureScheme; // I know know if this is correct - vec![] + vec![ + SignatureScheme::RSA_PKCS1_SHA1, + SignatureScheme::ECDSA_SHA1_Legacy, + SignatureScheme::RSA_PKCS1_SHA256, + SignatureScheme::ECDSA_NISTP256_SHA256, + SignatureScheme::RSA_PKCS1_SHA384, + SignatureScheme::ECDSA_NISTP384_SHA384, + SignatureScheme::RSA_PKCS1_SHA512, + SignatureScheme::ECDSA_NISTP521_SHA512, + SignatureScheme::RSA_PSS_SHA256, + SignatureScheme::RSA_PSS_SHA384, + SignatureScheme::RSA_PSS_SHA512, + SignatureScheme::ED25519, + SignatureScheme::ED448, + ] } fn verify_server_cert( diff --git a/tests/integration/consumer_test.rs b/tests/integration/consumer_test.rs index 9b8d7db1..dfcaa56b 100644 --- a/tests/integration/consumer_test.rs +++ b/tests/integration/consumer_test.rs @@ -11,10 +11,8 @@ use rabbitmq_stream_client::{ types::{Delivery, Message, OffsetSpecification, SuperStreamConsumer}, Consumer, FilterConfiguration, NoDedup, Producer, }; -use std::collections::HashMap; use crate::producer_test::routing_key_strategy_value_extractor; -use rabbitmq_stream_client::types::MessageContext; use rabbitmq_stream_client::types::{ HashRoutingMurmurStrategy, RoutingKeyRoutingStrategy, RoutingStrategy, }; @@ -97,7 +95,7 @@ async fn super_stream_consumer_test() { for n in 0..message_count { let msg = Message::builder().body(format!("message{}", n)).build(); let _ = super_stream_producer - .send(msg, |confirmation_status| async move {}) + .send(msg, |_confirmation_status| async move {}) .await .unwrap(); } @@ -105,7 +103,7 @@ async fn super_stream_consumer_test() { let mut received_messages = 0; let handle = super_stream_consumer.handle(); - while let _ = super_stream_consumer.next().await.unwrap() { + while let Some(_) = super_stream_consumer.next().await { received_messages = received_messages + 1; if received_messages == 10 { break; @@ -499,7 +497,6 @@ async fn consumer_test_with_filtering() { #[tokio::test(flavor = "multi_thread")] async fn super_stream_consumer_test_with_filtering() { let env = TestEnvironment::create_super_stream().await; - let reference: String = Faker.fake(); let message_count = 10; let mut super_stream_producer = env @@ -722,12 +719,12 @@ async fn super_stream_single_active_consumer_test() { for n in 0..message_count { let msg = Message::builder().body(format!("message{}", n)).build(); let _ = super_stream_producer - .send(msg, |confirmation_status| async move {}) + .send(msg, |_confirmation_status| async move {}) .await .unwrap(); } - let mut received_messages = Arc::new(AtomicU32::new(1)); + let received_messages = Arc::new(AtomicU32::new(1)); let handle_consumer_1 = super_stream_consumer.handle(); let handle_consumer_2 = super_stream_consumer_2.handle(); let handle_consumer_3 = super_stream_consumer_3.handle(); @@ -737,7 +734,7 @@ async fn super_stream_single_active_consumer_test() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let _ = super_stream_consumer.next().await.unwrap() { + while let Some(_) = super_stream_consumer.next().await { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -751,7 +748,7 @@ async fn super_stream_single_active_consumer_test() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let _ = super_stream_consumer_2.next().await.unwrap() { + while let Some(_) = super_stream_consumer_2.next().await { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -765,7 +762,7 @@ async fn super_stream_single_active_consumer_test() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let _ = super_stream_consumer_3.next().await.unwrap() { + while let Some(_) = super_stream_consumer_3.next().await { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -804,21 +801,21 @@ async fn super_stream_single_active_consumer_test_with_callback() { let notify_received_messages = Arc::new(Notify::new()); - let mut result_stream_name_1 = Arc::new(Mutex::new(String::from(""))); - let mut result_stream_name_2 = Arc::new(Mutex::new(String::from(""))); - let mut result_stream_name_3 = Arc::new(Mutex::new(String::from(""))); + let result_stream_name_1 = Arc::new(Mutex::new(String::from(""))); + let result_stream_name_2 = Arc::new(Mutex::new(String::from(""))); + let result_stream_name_3 = Arc::new(Mutex::new(String::from(""))); - let mut result_name_1 = Arc::new(Mutex::new(String::from(""))); - let mut result_name_2 = Arc::new(Mutex::new(String::from(""))); - let mut result_name_3 = Arc::new(Mutex::new(String::from(""))); + let result_name_1 = Arc::new(Mutex::new(String::from(""))); + let result_name_2 = Arc::new(Mutex::new(String::from(""))); + let result_name_3 = Arc::new(Mutex::new(String::from(""))); - let mut result_stream_name_outer = result_stream_name_1.clone(); - let mut result_stream_name_2_outer = result_stream_name_2.clone(); - let mut result_stream_name_3_outer = result_stream_name_3.clone(); + let result_stream_name_outer = result_stream_name_1.clone(); + let result_stream_name_2_outer = result_stream_name_2.clone(); + let result_stream_name_3_outer = result_stream_name_3.clone(); - let mut result_name_1_outer = result_name_1.clone(); - let mut result_name_2_outer = result_name_2.clone(); - let mut result_name_3_outer = result_name_3.clone(); + let result_name_1_outer = result_name_1.clone(); + let result_name_2_outer = result_name_2.clone(); + let result_name_3_outer = result_name_3.clone(); let mut super_stream_consumer: SuperStreamConsumer = env .env @@ -826,9 +823,9 @@ async fn super_stream_single_active_consumer_test_with_callback() { .name(super_stream_consumer_name) .enable_single_active_consumer(true) .offset(OffsetSpecification::First) - .consumer_update(move |active, message_context| { - let mut result_stream_name_int = result_stream_name_outer.clone(); - let mut result_consumer_name_int = result_name_1_outer.clone(); + .consumer_update(move |_active, message_context| { + let result_stream_name_int = result_stream_name_outer.clone(); + let result_consumer_name_int = result_name_1_outer.clone(); async move { *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); @@ -846,9 +843,9 @@ async fn super_stream_single_active_consumer_test_with_callback() { .name("super-stream-with-sac-enabled") .enable_single_active_consumer(true) .offset(OffsetSpecification::First) - .consumer_update(move |active, message_context| { - let mut result_stream_name_int = result_stream_name_2_outer.clone(); - let mut result_consumer_name_int = result_name_2_outer.clone(); + .consumer_update(move |_active, message_context| { + let result_stream_name_int = result_stream_name_2_outer.clone(); + let result_consumer_name_int = result_name_2_outer.clone(); async move { *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); @@ -865,9 +862,9 @@ async fn super_stream_single_active_consumer_test_with_callback() { .name("super-stream-with-sac-enabled") .enable_single_active_consumer(true) .offset(OffsetSpecification::First) - .consumer_update(move |active, message_context| { - let mut result_stream_name_int = result_stream_name_3_outer.clone(); - let mut result_consumer_name_int = result_name_3_outer.clone(); + .consumer_update(move |_active, message_context| { + let result_stream_name_int = result_stream_name_3_outer.clone(); + let result_consumer_name_int = result_name_3_outer.clone(); async move { *result_stream_name_int.lock().unwrap() = message_context.stream().clone(); *result_consumer_name_int.lock().unwrap() = message_context.name().clone(); @@ -881,12 +878,12 @@ async fn super_stream_single_active_consumer_test_with_callback() { for n in 0..message_count { let msg = Message::builder().body(format!("message{}", n)).build(); let _ = super_stream_producer - .send(msg, |confirmation_status| async move {}) + .send(msg, |_confirmation_status| async move {}) .await .unwrap(); } - let mut received_messages = Arc::new(AtomicU32::new(1)); + let received_messages = Arc::new(AtomicU32::new(1)); let handle_consumer_1 = super_stream_consumer.handle(); let handle_consumer_2 = super_stream_consumer_2.handle(); let handle_consumer_3 = super_stream_consumer_3.handle(); @@ -896,7 +893,7 @@ async fn super_stream_single_active_consumer_test_with_callback() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let _ = super_stream_consumer.next().await.unwrap() { + while let Some(_) = super_stream_consumer.next().await { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -910,7 +907,7 @@ async fn super_stream_single_active_consumer_test_with_callback() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let _ = super_stream_consumer_2.next().await.unwrap() { + while let Some(_) = super_stream_consumer_2.next().await { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); @@ -924,7 +921,7 @@ async fn super_stream_single_active_consumer_test_with_callback() { task::spawn(async move { let received_messages_int = received_message_outer.clone(); let notify_received_messages_inner = notify_received_messages_outer.clone(); - while let _ = super_stream_consumer_3.next().await.unwrap() { + while let Some(_) = super_stream_consumer_3.next().await { let rec_msg = received_messages_int.fetch_add(1, Ordering::Relaxed); if message_count == rec_msg { notify_received_messages_inner.notify_one(); diff --git a/tests/integration/environment_test.rs b/tests/integration/environment_test.rs index d8cc87be..8ce64e70 100644 --- a/tests/integration/environment_test.rs +++ b/tests/integration/environment_test.rs @@ -166,6 +166,7 @@ async fn environment_tls_connection_trust_certificates() { .build() .await; + env.as_ref().unwrap(); assert!(matches!(env, Ok(Environment { .. }))); } From e7941d4fdfb8b9c0acd89f73f5219f876d49fd74 Mon Sep 17 00:00:00 2001 From: Tommaso Allevi Date: Mon, 10 Feb 2025 10:57:37 +0100 Subject: [PATCH 5/5] Remove line --- tests/integration/environment_test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/environment_test.rs b/tests/integration/environment_test.rs index 8ce64e70..d8cc87be 100644 --- a/tests/integration/environment_test.rs +++ b/tests/integration/environment_test.rs @@ -166,7 +166,6 @@ async fn environment_tls_connection_trust_certificates() { .build() .await; - env.as_ref().unwrap(); assert!(matches!(env, Ok(Environment { .. }))); }