Skip to content

Commit f965df6

Browse files
committed
update local concurrency interface
1 parent 0097548 commit f965df6

File tree

6 files changed

+35
-14
lines changed

6 files changed

+35
-14
lines changed

twmq/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,13 @@ where
327327
Ok(count)
328328
}
329329

330-
pub async fn work(self: Arc<Self>, local_concurrency: usize) -> Result<(), TwmqError> {
330+
pub async fn work(self: Arc<Self>) -> Result<(), TwmqError> {
331331
// Local semaphore to limit concurrency per instance
332-
let semaphore = Arc::new(Semaphore::new(local_concurrency));
332+
let semaphore = Arc::new(Semaphore::new(self.options.local_concurrency));
333333

334334
// Start worker
335335
tokio::spawn(async move {
336-
let mut interval = tokio::time::interval(Duration::from_millis(100));
336+
let mut interval = tokio::time::interval(self.options.polling_interval);
337337

338338
loop {
339339
interval.tick().await;

twmq/src/queue.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use std::time::Duration;
22

3+
#[derive(Clone, Debug)]
34
pub struct QueueOptions {
45
pub max_success: usize,
56
pub max_failed: usize,
67
pub lease_duration: Duration,
8+
pub local_concurrency: usize,
9+
10+
pub polling_interval: Duration,
711

812
/// If true, always poll for jobs even if there are no available permits
913
/// This is important, because polling is how delayed and timed out jobs are handled
@@ -17,6 +21,8 @@ impl Default for QueueOptions {
1721
Self {
1822
max_success: 1000,
1923
max_failed: 10000,
24+
local_concurrency: 100,
25+
polling_interval: Duration::from_millis(100),
2026
lease_duration: Duration::from_secs(30),
2127
always_poll: false,
2228
}

twmq/tests/basic.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ async fn test_queue_push_and_process_job() {
9595
let worker_queue_ref = Arc::clone(&queue);
9696
let worker_handle = tokio::spawn(async move {
9797
// The worker will loop internally, so we expect it to pick up the job
98-
if let Err(e) = worker_queue_ref.work(1).await {
98+
if let Err(e) = worker_queue_ref.work().await {
9999
eprintln!("Worker for queue {} failed: {:?}", queue_name_clone, e);
100100
}
101101
});

twmq/tests/basic_hook.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use twmq::{
1717
DurableExecution, Queue,
1818
hooks::TransactionContext,
1919
job::{Job, JobResult, JobStatus, RequeuePosition},
20+
queue::QueueOptions,
2021
};
2122

2223
// Helper to clean up Redis keys for a given queue name pattern
@@ -203,12 +204,15 @@ async fn test_cross_queue_job_scheduling() {
203204
println!("Creating main queue: {}", main_queue_name);
204205
println!("Creating webhook queue: {}", webhook_queue_name);
205206

207+
let mut queue_options = QueueOptions::default();
208+
queue_options.local_concurrency = 1;
209+
206210
// Create webhook queue
207211
let webhook_queue = Arc::new(
208212
Queue::<WebhookJobPayload, WebhookJobOutput, WebhookJobErrorData, ()>::new(
209213
REDIS_URL,
210214
&webhook_queue_name,
211-
None,
215+
Some(queue_options.clone()),
212216
(),
213217
)
214218
.await
@@ -220,7 +224,7 @@ async fn test_cross_queue_job_scheduling() {
220224
Queue::<MainJobPayload, TestJobOutput, TestJobErrorData, Arc<WebhookQueue>>::new(
221225
REDIS_URL,
222226
&main_queue_name,
223-
None,
227+
Some(queue_options),
224228
webhook_queue.clone(),
225229
)
226230
.await
@@ -252,7 +256,7 @@ async fn test_cross_queue_job_scheduling() {
252256
let main_worker = {
253257
let queue = main_queue.clone();
254258
tokio::spawn(async move {
255-
if let Err(e) = queue.work(1).await {
259+
if let Err(e) = queue.work().await {
256260
eprintln!("Main worker failed: {:?}", e);
257261
}
258262
})
@@ -261,7 +265,7 @@ async fn test_cross_queue_job_scheduling() {
261265
let webhook_worker = {
262266
let queue = webhook_queue.clone();
263267
tokio::spawn(async move {
264-
if let Err(e) = queue.work(1).await {
268+
if let Err(e) = queue.work().await {
265269
eprintln!("Webhook worker failed: {:?}", e);
266270
}
267271
})

twmq/tests/lease_expiry.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ async fn test_job_lease_expiry() {
141141
max_success: 1000,
142142
max_failed: 1000,
143143
lease_duration,
144+
polling_interval: Duration::from_millis(100),
145+
local_concurrency: 1,
144146
always_poll: true,
145147
};
146148

@@ -189,7 +191,7 @@ async fn test_job_lease_expiry() {
189191
let worker = {
190192
let queue = queue.clone();
191193
tokio::spawn(async move {
192-
if let Err(e) = queue.work(1).await {
194+
if let Err(e) = queue.work().await {
193195
tracing::error!("Lease worker failed: {:?}", e);
194196
}
195197
})
@@ -290,7 +292,9 @@ async fn test_multiple_job_lease_expiry() {
290292
let queue_options = QueueOptions {
291293
max_success: 1000,
292294
max_failed: 1000,
295+
local_concurrency: 3,
293296
lease_duration,
297+
polling_interval: Duration::from_millis(100),
294298
always_poll: true,
295299
};
296300

@@ -328,7 +332,7 @@ async fn test_multiple_job_lease_expiry() {
328332
let worker = {
329333
let queue = queue.clone();
330334
tokio::spawn(async move {
331-
if let Err(e) = queue.work(3).await {
335+
if let Err(e) = queue.work().await {
332336
tracing::error!("Multi-lease worker failed: {:?}", e);
333337
}
334338
})

twmq/tests/nack.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use twmq::{
1414
DurableExecution, Queue,
1515
hooks::TransactionContext,
1616
job::{Job, JobResult, JobStatus, RequeuePosition},
17+
queue::QueueOptions,
1718
redis::aio::ConnectionManager,
1819
};
1920

@@ -168,11 +169,14 @@ async fn test_job_retry_attempts() {
168169
tracing::info!("Creating retry queue: {}", queue_name);
169170
tracing::info!("Job should succeed after {} attempts", desired_attempts);
170171

172+
let mut queue_options = QueueOptions::default();
173+
queue_options.local_concurrency = 1;
174+
171175
let queue = Arc::new(
172176
Queue::<RetryJobPayload, RetryJobOutput, RetryJobErrorData, ()>::new(
173177
REDIS_URL,
174178
&queue_name,
175-
None,
179+
Some(queue_options),
176180
(),
177181
)
178182
.await
@@ -202,7 +206,7 @@ async fn test_job_retry_attempts() {
202206
let worker = {
203207
let queue = queue.clone();
204208
tokio::spawn(async move {
205-
if let Err(e) = queue.work(1).await {
209+
if let Err(e) = queue.work().await {
206210
tracing::error!("Retry worker failed: {:?}", e);
207211
}
208212
})
@@ -297,11 +301,14 @@ async fn test_different_retry_counts() {
297301
// Reset counters
298302
RETRY_JOB_FINAL_SUCCESS.store(false, Ordering::SeqCst);
299303

304+
let mut queue_options = QueueOptions::default();
305+
queue_options.local_concurrency = 1;
306+
300307
let queue = Arc::new(
301308
Queue::<RetryJobPayload, RetryJobOutput, RetryJobErrorData, ()>::new(
302309
REDIS_URL,
303310
&queue_name,
304-
None,
311+
Some(queue_options),
305312
(),
306313
)
307314
.await
@@ -326,7 +333,7 @@ async fn test_different_retry_counts() {
326333
let worker = {
327334
let queue = queue.clone();
328335
tokio::spawn(async move {
329-
if let Err(e) = queue.work(1).await {
336+
if let Err(e) = queue.work().await {
330337
tracing::error!("Worker failed: {:?}", e);
331338
}
332339
})

0 commit comments

Comments
 (0)