Skip to content

Dynamic send #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .tarpaulin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ exclude-files=[
"benchmark/*",
"build.rs",
"src/lib.rs",
"src/bin/**/*.rs",
"tests/**/*",
"mod.rs"
]
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ tracing-subscriber = "0.3.1"
fake = { version = "3.0.0", features = ['derive'] }
chrono = "0.4.26"
serde_json = "1.0"
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }

[features]
default = []
Expand All @@ -66,3 +68,6 @@ path="examples/superstreams/send_super_stream.rs"
name="environment_deserialization"
path="examples/environment_deserialization.rs"

[[bin]]
name = "perf-producer"
path = "src/bin/perf-producer.rs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this file is doing perf-tests any possibility to move it in the tests folder? (maybe creating a folder like performances)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have different possibilities:

  • put it under bench folder where, typically, you put performance tests, commonly developed with criterion crate. Anyway, this actually isn't a benchmark in the same sense of criterion which invokes the function multiple time and measure the duration.
  • put it under tests folder and threat it as a integration test. Anyway, as a test, the output is not shown to the developer without the appropriate flag (--nocapture).
  • put it under bin and threat is as a binary, as in this PR. You can run it using cargo run --release --bin perf-producer and print the output easily.

Anyway, I don't have a strong opinion on that and I can follow a different solution. let me know how you want to proceed.

Copy link
Contributor

@DanielePalaia DanielePalaia Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion too. As a preference I would choose the bench folder. It can be also useful in case in the future we want to add other performance tests and group them together

2 changes: 1 addition & 1 deletion examples/send_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(&stream)
.create(stream)
.await;

if let Err(e) = create_response {
Expand Down
4 changes: 2 additions & 2 deletions examples/tls_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ async fn start_publisher(
env: Environment,
stream: &String,
) -> Result<(), Box<dyn std::error::Error>> {
let _ = env.stream_creator().create(&stream).await;
let _ = env.stream_creator().create(stream).await;

let producer = env.producer().batch_size(BATCH_SIZE).build(&stream).await?;
let producer = env.producer().batch_size(BATCH_SIZE).build(stream).await?;

let is_batch_send = true;
tokio::task::spawn(async move {
Expand Down
2 changes: 1 addition & 1 deletion protocol/src/message/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl MessageBuilder {
pub fn application_properties(self) -> ApplicationPropertiesBuider {
ApplicationPropertiesBuider(self)
}
pub fn publising_id(mut self, publishing_id: u64) -> Self {
pub fn publishing_id(mut self, publishing_id: u64) -> Self {
self.0.publishing_id = Some(publishing_id);
self
}
Expand Down
175 changes: 175 additions & 0 deletions src/bin/perf-producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#![allow(dead_code)]

use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
Environment,
};
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_stream::StreamExt;

static ONE_SECOND: Duration = Duration::from_secs(1);
static ONE_MINUTE: Duration = Duration::from_secs(60);

struct Metric {
created_at: u128,
received_at: SystemTime,
}

#[derive(Debug)]
struct Stats {
average_latency: f32,
messages_received: usize,
}

#[tokio::main]
async fn main() {
let stream_name = "perf-stream";

let environment = Environment::builder().build().await.unwrap();
let _ = environment.delete_stream(stream_name).await;
environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream_name)
.await
.unwrap();

let environment = Arc::new(environment);

let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let consumer_env = environment.clone();
let consumer_handler = tokio::spawn(async move {
start_consumer(consumer_env, stream_name, sender).await;
});

let produced_messages = AtomicU32::new(0);
let producer_env = environment.clone();
let producer_handler = tokio::spawn(async move {
start_producer(producer_env, stream_name, &produced_messages).await;
});

let run_for = Duration::from_secs(5 * 60);

tokio::spawn(async move {
sleep(run_for).await;
producer_handler.abort();
sleep(Duration::from_secs(1)).await;
consumer_handler.abort();
});

let minutes = run_for.as_secs() / 60;

let mut now = Instant::now();
// 5 minutes of metrics
let mut metrics = Vec::with_capacity(50 * 60 * minutes as usize);
while let Some(metric) = receiver.recv().await {
if now.elapsed() > ONE_MINUTE {
now = Instant::now();

let last_metrics = metrics;
metrics = Vec::with_capacity(50 * 60 * minutes as usize);
tokio::spawn(async move {
let stats = calculate_stats(last_metrics).await;
println!("stats: {:?}", stats);
});
}
metrics.push(metric);
}

let stats = calculate_stats(metrics).await;
println!("stats: {:?}", stats);
}

Check warning on line 89 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L33-L89

Added lines #L33 - L89 were not covered by tests

async fn calculate_stats(metrics: Vec<Metric>) -> Stats {
let mut total_latency = 0;
let metric_count = metrics.len();
for metric in metrics {
let created_at = SystemTime::UNIX_EPOCH + Duration::from_millis(metric.created_at as u64);
let received_at = metric.received_at;
let delta = received_at.duration_since(created_at).unwrap();
total_latency += delta.as_millis();
}

Check warning on line 99 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L91-L99

Added lines #L91 - L99 were not covered by tests

Stats {
average_latency: total_latency as f32 / metric_count as f32,
messages_received: metric_count,
}
}

Check warning on line 105 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L101-L105

Added lines #L101 - L105 were not covered by tests

async fn start_consumer(
environment: Arc<Environment>,
stream_name: &str,
sender: UnboundedSender<Metric>,
) {
let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream_name)
.await
.unwrap();
while let Some(Ok(delivery)) = consumer.next().await {
let produced_at = delivery
.message()
.data()
.map(|data| {
u128::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
])
})
.unwrap();
let metric = Metric {
created_at: produced_at,
received_at: SystemTime::now(),
};
sender.send(metric).unwrap();

Check warning on line 133 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L107-L133

Added lines #L107 - L133 were not covered by tests
}
}

Check warning on line 135 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L135

Added line #L135 was not covered by tests

async fn start_producer(
environment: Arc<Environment>,
stream_name: &str,
produced_messages: &AtomicU32,
) {
let message_per_second = 50_usize;
let producer = environment.producer().build(stream_name).await.unwrap();

Check warning on line 143 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L137-L143

Added lines #L137 - L143 were not covered by tests

loop {
let start = Instant::now();
let messages = create_messages(message_per_second);
let messages_sent = messages.len() as u32;
for message in messages {
producer.send(message, |_| async {}).await.unwrap();

Check warning on line 150 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L146-L150

Added lines #L146 - L150 were not covered by tests
}
produced_messages.fetch_add(messages_sent, Ordering::Relaxed);

let elapsed = start.elapsed();

if ONE_SECOND > elapsed {
sleep(ONE_SECOND - elapsed).await;
}

Check warning on line 158 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L152-L158

Added lines #L152 - L158 were not covered by tests
}
}

fn create_messages(message_count_per_batch: usize) -> Vec<Message> {
(0..message_count_per_batch)
.map(|_| {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let since_the_epoch = since_the_epoch.as_millis();
Message::builder()
.body(since_the_epoch.to_be_bytes())
.build()
})
.collect()
}

Check warning on line 175 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L162-L175

Added lines #L162 - L175 were not covered by tests
28 changes: 22 additions & 6 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
use tokio_rustls::client::TlsStream;

use tokio_util::codec::Framed;
use tracing::trace;
use tracing::{trace, warn};

use crate::{error::ClientError, RabbitMQStreamResult};
pub use message::ClientMessage;
Expand Down Expand Up @@ -275,8 +275,12 @@
state.handler = Some(Arc::new(handler));
}

pub fn is_closed(&self) -> bool {
self.channel.is_closed()
}

pub async fn close(&self) -> RabbitMQStreamResult<()> {
if self.channel.is_closed() {
if self.is_closed() {
return Err(ClientError::AlreadyClosed);
}
let _: CloseResponse = self
Expand All @@ -286,12 +290,17 @@
.await?;

let mut state = self.state.write().await;

// This stop the tokio task that performs heartbeats
state.heartbeat_task.take();

drop(state);

self.force_drop_connection().await
}

async fn force_drop_connection(&self) -> RabbitMQStreamResult<()> {
self.channel.close().await
}

pub async fn subscribe(
&self,
subscription_id: u8,
Expand Down Expand Up @@ -622,7 +631,7 @@
M: FnOnce(u32) -> R,
{
let Some((correlation_id, mut receiver)) = self.dispatcher.response_channel() else {
trace!("Connection si closed here");
trace!("Connection is closed here");
return Err(ClientError::ConnectionClosed);
};

Expand Down Expand Up @@ -711,9 +720,16 @@
let heartbeat_task = tokio::spawn(async move {
loop {
trace!("Sending heartbeat");
let _ = channel.send(HeartBeatCommand::default().into()).await;
if channel
.send(HeartBeatCommand::default().into())
.await
.is_err()
{
break;

Check warning on line 728 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L728

Added line #L728 was not covered by tests
}
tokio::time::sleep(Duration::from_secs(heartbeat_interval.into())).await;
}
warn!("Heartbeat task stopped. Force closing connection");

Check warning on line 732 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L732

Added line #L732 was not covered by tests
})
.into();
state.heartbeat_task = Some(heartbeat_task);
Expand Down
7 changes: 6 additions & 1 deletion src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ impl ClientOptionsBuilder {
self
}

pub fn client_provided_name(mut self, client_provided_name: String) -> Self {
self.0.client_provided_name = client_provided_name;
self
}

pub fn collector(mut self, collector: Arc<dyn MetricsCollector>) -> Self {
self.0.collector = collector;
self
Expand Down Expand Up @@ -587,7 +592,7 @@ mod tests {
assert_eq!(options.heartbeat, 10000);
assert_eq!(options.max_frame_size, 1);
assert!(matches!(options.tls, TlsConfiguration::Untrusted));
assert_eq!(options.load_balancer_mode, true);
assert!(options.load_balancer_mode);
}

#[cfg(feature = "serde")]
Expand Down
3 changes: 1 addition & 2 deletions src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use crate::types::OffsetSpecification;
use crate::{client::TlsConfiguration, producer::NoDedup};
Expand Down Expand Up @@ -197,7 +196,6 @@ impl Environment {
environment: self.clone(),
name: None,
batch_size: 100,
batch_publishing_delay: Duration::from_millis(100),
data: PhantomData,
filter_value_extractor: None,
client_provided_name: String::from("rust-stream-producer"),
Expand Down Expand Up @@ -328,6 +326,7 @@ impl EnvironmentBuilder {
self.0.client_options.heartbeat = heartbeat;
self
}

pub fn metrics_collector(
mut self,
collector: impl MetricsCollector + 'static,
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pub enum ProducerPublishError {
Confirmation { stream: String },
#[error(transparent)]
Client(#[from] ClientError),
#[error("Failed to publish message, timeout")]
Timeout,
}

#[derive(Error, Debug)]
Expand Down
Loading
Loading