Skip to content

Commit ab83a22

Browse files
committed
fix throughput benchmark
1 parent 1ced39d commit ab83a22

File tree

1 file changed

+15
-3
lines changed

1 file changed

+15
-3
lines changed

twmq/benches/throughput.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
1010
use tokio::runtime::Runtime;
1111
use twmq::error::TwmqError;
1212
use twmq::job::JobError;
13+
use twmq::{BorrowedJob, UserCancellable};
1314

1415
use twmq::{
1516
DurableExecution, Queue,
@@ -44,6 +45,14 @@ impl From<TwmqError> for BenchmarkErrorData {
4445
}
4546
}
4647

48+
impl UserCancellable for BenchmarkErrorData {
49+
fn user_cancelled() -> Self {
50+
BenchmarkErrorData {
51+
reason: "Transaction cancelled by user".to_string(),
52+
}
53+
}
54+
}
55+
4756
// Shared metrics across all benchmark jobs
4857
#[derive(Clone)]
4958
pub struct BenchmarkMetrics {
@@ -100,7 +109,10 @@ impl DurableExecution for BenchmarkJobHandler {
100109
type ErrorData = BenchmarkErrorData;
101110
type JobData = BenchmarkJobData;
102111

103-
async fn process(&self, job: &Job<Self::JobData>) -> JobResult<Self::Output, Self::ErrorData> {
112+
async fn process(
113+
&self,
114+
job: &BorrowedJob<Self::JobData>,
115+
) -> JobResult<Self::Output, Self::ErrorData> {
104116
let start_time = SystemTime::now()
105117
.duration_since(UNIX_EPOCH)
106118
.unwrap()
@@ -119,7 +131,7 @@ impl DurableExecution for BenchmarkJobHandler {
119131
.fetch_add(processing_time, Ordering::SeqCst);
120132

121133
// Fresh random decision each processing attempt
122-
if rand::thread_rng().gen_bool(job.data.nack_probability) {
134+
if rand::thread_rng().gen_bool(job.job.data.nack_probability) {
123135
self.metrics.jobs_nacked.fetch_add(1, Ordering::SeqCst);
124136

125137
// Random position for nacks as requested
@@ -140,7 +152,7 @@ impl DurableExecution for BenchmarkJobHandler {
140152
self.metrics.jobs_succeeded.fetch_add(1, Ordering::SeqCst);
141153

142154
Ok(BenchmarkOutput {
143-
job_id: job.id.clone(),
155+
job_id: job.id().to_string(),
144156
processed_at: end_time,
145157
})
146158
}

0 commit comments

Comments
 (0)