Skip to content

Dynamic send #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ path="examples/superstreams/send_super_stream.rs"
name="environment_deserialization"
path="examples/environment_deserialization.rs"

[[bin]]
name = "perf-producer"
path = "src/bin/perf-producer.rs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this file is doing perf-tests any possibility to move it in the tests folder? (maybe creating a folder like performances)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have different possibilities:

  • put it under bench folder where, typically, you put performance tests, commonly developed with criterion crate. Anyway, this actually isn't a benchmark in the same sense of criterion which invokes the function multiple time and measure the duration.
  • put it under tests folder and threat it as a integration test. Anyway, as a test, the output is not shown to the developer without the appropriate flag (--nocapture).
  • put it under bin and threat is as a binary, as in this PR. You can run it using cargo run --release --bin perf-producer and print the output easily.

Anyway, I don't have a strong opinion on that and I can follow a different solution. let me know how you want to proceed.

Copy link
Contributor

@DanielePalaia DanielePalaia Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion too. As a preference I would choose the bench folder. It can be also useful in case in the future we want to add other performance tests and group them together

175 changes: 175 additions & 0 deletions src/bin/perf-producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#![allow(dead_code)]

use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

use rabbitmq_stream_client::{
types::{ByteCapacity, Message, OffsetSpecification},
Environment,
};
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_stream::StreamExt;

static ONE_SECOND: Duration = Duration::from_secs(1);
static ONE_MINUTE: Duration = Duration::from_secs(60);

struct Metric {
created_at: u128,
received_at: SystemTime,
}

#[derive(Debug)]
struct Stats {
average_latency: f32,
messages_received: usize,
}

#[tokio::main]
async fn main() {
let stream_name = "perf-stream";

let environment = Environment::builder().build().await.unwrap();
let _ = environment.delete_stream(stream_name).await;
environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream_name)
.await
.unwrap();

let environment = Arc::new(environment);

let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let consumer_env = environment.clone();
let consumer_handler = tokio::spawn(async move {
start_consumer(consumer_env, stream_name, sender).await;
});

let produced_messages = AtomicU32::new(0);
let producer_env = environment.clone();
let producer_handler = tokio::spawn(async move {
start_producer(producer_env, stream_name, &produced_messages).await;
});

let run_for = Duration::from_secs(5 * 60);

tokio::spawn(async move {
sleep(run_for).await;
producer_handler.abort();
sleep(Duration::from_secs(1)).await;
consumer_handler.abort();
});

let minutes = run_for.as_secs() / 60;

let mut now = Instant::now();
// 5 minutes of metrics
let mut metrics = Vec::with_capacity(50 * 60 * minutes as usize);
while let Some(metric) = receiver.recv().await {
if now.elapsed() > ONE_MINUTE {
now = Instant::now();

let last_metrics = metrics;
metrics = Vec::with_capacity(50 * 60 * minutes as usize);
tokio::spawn(async move {
let stats = calculate_stats(last_metrics).await;
println!("stats: {:?}", stats);
});
}
metrics.push(metric);
}

let stats = calculate_stats(metrics).await;
println!("stats: {:?}", stats);
}

Check warning on line 89 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L33-L89

Added lines #L33 - L89 were not covered by tests

async fn calculate_stats(metrics: Vec<Metric>) -> Stats {
let mut total_latency = 0;
let metric_count = metrics.len();
for metric in metrics {
let created_at = SystemTime::UNIX_EPOCH + Duration::from_millis(metric.created_at as u64);
let received_at = metric.received_at;
let delta = received_at.duration_since(created_at).unwrap();
total_latency += delta.as_millis();
}

Check warning on line 99 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L91-L99

Added lines #L91 - L99 were not covered by tests

Stats {
average_latency: total_latency as f32 / metric_count as f32,
messages_received: metric_count,
}
}

Check warning on line 105 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L101-L105

Added lines #L101 - L105 were not covered by tests

async fn start_consumer(
environment: Arc<Environment>,
stream_name: &str,
sender: UnboundedSender<Metric>,
) {
let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream_name)
.await
.unwrap();
while let Some(Ok(delivery)) = consumer.next().await {
let produced_at = delivery
.message()
.data()
.map(|data| {
u128::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
])
})
.unwrap();
let metric = Metric {
created_at: produced_at,
received_at: SystemTime::now(),
};
sender.send(metric).unwrap();

Check warning on line 133 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L107-L133

Added lines #L107 - L133 were not covered by tests
}
}

Check warning on line 135 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L135

Added line #L135 was not covered by tests

async fn start_producer(
environment: Arc<Environment>,
stream_name: &str,
produced_messages: &AtomicU32,
) {
let message_per_second = 50_usize;
let producer = environment.producer().build(stream_name).await.unwrap();

Check warning on line 143 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L137-L143

Added lines #L137 - L143 were not covered by tests

loop {
let start = Instant::now();
let messages = create_messages(message_per_second);
let messages_sent = messages.len() as u32;
for message in messages {
producer.send(message, |_| async {}).await.unwrap();

Check warning on line 150 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L146-L150

Added lines #L146 - L150 were not covered by tests
}
produced_messages.fetch_add(messages_sent, Ordering::Relaxed);

let elapsed = start.elapsed();

if ONE_SECOND > elapsed {
sleep(ONE_SECOND - elapsed).await;
}

Check warning on line 158 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L152-L158

Added lines #L152 - L158 were not covered by tests
}
}

fn create_messages(message_count_per_batch: usize) -> Vec<Message> {
(0..message_count_per_batch)
.map(|_| {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
let since_the_epoch = since_the_epoch.as_millis();
Message::builder()
.body(since_the_epoch.to_be_bytes())
.build()
})
.collect()
}

Check warning on line 175 in src/bin/perf-producer.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/perf-producer.rs#L162-L175

Added lines #L162 - L175 were not covered by tests
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@
M: FnOnce(u32) -> R,
{
let Some((correlation_id, mut receiver)) = self.dispatcher.response_channel() else {
trace!("Connection si closed here");
trace!("Connection is closed here");

Check warning on line 625 in src/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/client/mod.rs#L625

Added line #L625 was not covered by tests
return Err(ClientError::ConnectionClosed);
};

Expand Down
2 changes: 0 additions & 2 deletions src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use crate::types::OffsetSpecification;
use crate::{client::TlsConfiguration, producer::NoDedup};
Expand Down Expand Up @@ -197,7 +196,6 @@ impl Environment {
environment: self.clone(),
name: None,
batch_size: 100,
batch_publishing_delay: Duration::from_millis(100),
data: PhantomData,
filter_value_extractor: None,
client_provided_name: String::from("rust-stream-producer"),
Expand Down
Loading
Loading