-
Notifications
You must be signed in to change notification settings - Fork 21
Dynamic send #276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Dynamic send #276
Changes from 6 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
5245b14
Implement dynamic send. no timeout
allevo bca3425
fmt. stop loop if producer is closed. Fix integration test
allevo a165190
-
allevo e6ceb90
Use accumulator for batch_send
allevo 974ce40
Remove println
allevo e171dcc
Add bin to test latency performance
allevo 2818257
Add test. fmt & clippy
allevo 88519ce
Fix typo in method name
allevo 4dd577c
Test drop connection
allevo ce6d1df
Add timeout to send_with_confirm
allevo a8752b9
Fix fmt & clippy
allevo 38b956a
Ignore bin on coverage
allevo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
#![allow(dead_code)] | ||
|
||
use std::{ | ||
sync::{ | ||
atomic::{AtomicU32, Ordering}, | ||
Arc, | ||
}, | ||
time::{Duration, Instant, SystemTime, UNIX_EPOCH}, | ||
}; | ||
|
||
use rabbitmq_stream_client::{ | ||
types::{ByteCapacity, Message, OffsetSpecification}, | ||
Environment, | ||
}; | ||
use tokio::{sync::mpsc::UnboundedSender, time::sleep}; | ||
use tokio_stream::StreamExt; | ||
|
||
static ONE_SECOND: Duration = Duration::from_secs(1); | ||
static ONE_MINUTE: Duration = Duration::from_secs(60); | ||
|
||
struct Metric { | ||
created_at: u128, | ||
received_at: SystemTime, | ||
} | ||
|
||
#[derive(Debug)] | ||
struct Stats { | ||
average_latency: f32, | ||
messages_received: usize, | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
let stream_name = "perf-stream"; | ||
|
||
let environment = Environment::builder().build().await.unwrap(); | ||
let _ = environment.delete_stream(stream_name).await; | ||
environment | ||
.stream_creator() | ||
.max_length(ByteCapacity::GB(5)) | ||
.create(stream_name) | ||
.await | ||
.unwrap(); | ||
|
||
let environment = Arc::new(environment); | ||
|
||
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); | ||
let consumer_env = environment.clone(); | ||
let consumer_handler = tokio::spawn(async move { | ||
start_consumer(consumer_env, stream_name, sender).await; | ||
}); | ||
|
||
let produced_messages = AtomicU32::new(0); | ||
let producer_env = environment.clone(); | ||
let producer_handler = tokio::spawn(async move { | ||
start_producer(producer_env, stream_name, &produced_messages).await; | ||
}); | ||
|
||
let run_for = Duration::from_secs(5 * 60); | ||
|
||
tokio::spawn(async move { | ||
sleep(run_for).await; | ||
producer_handler.abort(); | ||
sleep(Duration::from_secs(1)).await; | ||
consumer_handler.abort(); | ||
}); | ||
|
||
let minutes = run_for.as_secs() / 60; | ||
|
||
let mut now = Instant::now(); | ||
// 5 minutes of metrics | ||
let mut metrics = Vec::with_capacity(50 * 60 * minutes as usize); | ||
while let Some(metric) = receiver.recv().await { | ||
if now.elapsed() > ONE_MINUTE { | ||
now = Instant::now(); | ||
|
||
let last_metrics = metrics; | ||
metrics = Vec::with_capacity(50 * 60 * minutes as usize); | ||
tokio::spawn(async move { | ||
let stats = calculate_stats(last_metrics).await; | ||
println!("stats: {:?}", stats); | ||
}); | ||
} | ||
metrics.push(metric); | ||
} | ||
|
||
let stats = calculate_stats(metrics).await; | ||
println!("stats: {:?}", stats); | ||
} | ||
|
||
async fn calculate_stats(metrics: Vec<Metric>) -> Stats { | ||
let mut total_latency = 0; | ||
let metric_count = metrics.len(); | ||
for metric in metrics { | ||
let created_at = SystemTime::UNIX_EPOCH + Duration::from_millis(metric.created_at as u64); | ||
let received_at = metric.received_at; | ||
let delta = received_at.duration_since(created_at).unwrap(); | ||
total_latency += delta.as_millis(); | ||
} | ||
|
||
Stats { | ||
average_latency: total_latency as f32 / metric_count as f32, | ||
messages_received: metric_count, | ||
} | ||
} | ||
|
||
async fn start_consumer( | ||
environment: Arc<Environment>, | ||
stream_name: &str, | ||
sender: UnboundedSender<Metric>, | ||
) { | ||
let mut consumer = environment | ||
.consumer() | ||
.offset(OffsetSpecification::First) | ||
.build(stream_name) | ||
.await | ||
.unwrap(); | ||
while let Some(Ok(delivery)) = consumer.next().await { | ||
let produced_at = delivery | ||
.message() | ||
.data() | ||
.map(|data| { | ||
u128::from_be_bytes([ | ||
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7], | ||
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15], | ||
]) | ||
}) | ||
.unwrap(); | ||
let metric = Metric { | ||
created_at: produced_at, | ||
received_at: SystemTime::now(), | ||
}; | ||
sender.send(metric).unwrap(); | ||
} | ||
} | ||
|
||
async fn start_producer( | ||
environment: Arc<Environment>, | ||
stream_name: &str, | ||
produced_messages: &AtomicU32, | ||
) { | ||
let message_per_second = 50_usize; | ||
let producer = environment.producer().build(stream_name).await.unwrap(); | ||
|
||
loop { | ||
let start = Instant::now(); | ||
let messages = create_messages(message_per_second); | ||
let messages_sent = messages.len() as u32; | ||
for message in messages { | ||
producer.send(message, |_| async {}).await.unwrap(); | ||
} | ||
produced_messages.fetch_add(messages_sent, Ordering::Relaxed); | ||
|
||
let elapsed = start.elapsed(); | ||
|
||
if ONE_SECOND > elapsed { | ||
sleep(ONE_SECOND - elapsed).await; | ||
} | ||
} | ||
} | ||
|
||
fn create_messages(message_count_per_batch: usize) -> Vec<Message> { | ||
(0..message_count_per_batch) | ||
.map(|_| { | ||
let start = SystemTime::now(); | ||
let since_the_epoch = start | ||
.duration_since(UNIX_EPOCH) | ||
.expect("Time went backwards"); | ||
let since_the_epoch = since_the_epoch.as_millis(); | ||
Message::builder() | ||
.body(since_the_epoch.to_be_bytes()) | ||
.build() | ||
}) | ||
.collect() | ||
} | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this file is doing perf-tests any possibility to move it in the tests folder? (maybe creating a folder like performances)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have different possibilities:
bench
folder where, typically, you put performance tests, commonly developed withcriterion
crate. Anyway, this actually isn't a benchmark in the same sense ofcriterion
which invokes the function multiple time and measure the duration.tests
folder and threat it as a integration test. Anyway, as a test, the output is not shown to the developer without the appropriate flag (--nocapture
).bin
and threat is as a binary, as in this PR. You can run it usingcargo run --release --bin perf-producer
and print the output easily.Anyway, I don't have a strong opinion on that and I can follow a different solution. let me know how you want to proceed.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion too. As a preference I would choose the bench folder. It can be also useful in case in the future we want to add other performance tests and group them together