Skip to content

Commit 7dea641

Browse files
committed
add test for delays, add twmq namespace
1 parent 3496205 commit 7dea641

File tree

6 files changed

+517
-23
lines changed

6 files changed

+517
-23
lines changed

twmq/src/lib.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ pub mod job;
44
pub mod queue;
55

66
use std::marker::PhantomData;
7-
use std::process::Output;
87
use std::sync::Arc;
98
use std::time::{Duration, SystemTime, UNIX_EPOCH};
109

@@ -156,43 +155,43 @@ where
156155
}
157156

158157
pub fn pending_list_name(&self) -> String {
159-
format!("{}:pending", self.name())
158+
format!("twmq:{}:pending", self.name())
160159
}
161160

162161
pub fn active_hash_name(&self) -> String {
163-
format!("{}:active", self.name)
162+
format!("twmq:{}:active", self.name)
164163
}
165164

166165
pub fn delayed_zset_name(&self) -> String {
167-
format!("{}:delayed", self.name)
166+
format!("twmq:{}:delayed", self.name)
168167
}
169168

170169
pub fn success_list_name(&self) -> String {
171-
format!("{}:success", self.name)
170+
format!("twmq:{}:success", self.name)
172171
}
173172

174173
pub fn failed_list_name(&self) -> String {
175-
format!("{}:failed", self.name)
174+
format!("twmq:{}:failed", self.name)
176175
}
177176

178177
pub fn job_data_hash_name(&self) -> String {
179-
format!("{}:jobs:data", self.name)
178+
format!("twmq:{}:jobs:data", self.name)
180179
}
181180

182181
pub fn job_meta_hash_name(&self, job_id: &str) -> String {
183-
format!("{}:job:{}:meta", self.name, job_id)
182+
format!("twmq:{}:job:{}:meta", self.name, job_id)
184183
}
185184

186185
pub fn job_errors_list_name(&self, job_id: &str) -> String {
187-
format!("{}:job:{}:errors", self.name, job_id)
186+
format!("twmq:{}:job:{}:errors", self.name, job_id)
188187
}
189188

190189
pub fn job_result_hash_name(&self) -> String {
191-
format!("{}:jobs:result", self.name)
190+
format!("twmq:{}:jobs:result", self.name)
192191
}
193192

194193
pub fn dedupe_set_name(&self) -> String {
195-
format!("{}:dedup", self.name)
194+
format!("twmq:{}:dedup", self.name)
196195
}
197196

198197
pub async fn push(&self, job_options: JobOptions<T>) -> Result<Job<T>, TwmqError> {
@@ -533,8 +532,8 @@ where
533532
local delayed_zset_name = KEYS[2]
534533
local pending_list_name = KEYS[3]
535534
local active_hash_name = KEYS[4]
535+
local job_data_hash_name = KEYS[5]
536536
537-
local job_data_hash_name = queue_id .. ':jobs:data'
538537
539538
local result_jobs = {}
540539
@@ -548,7 +547,7 @@ where
548547
for i = 1, #active_jobs, 2 do
549548
local job_id = active_jobs[i]
550549
local lease_expiry = tonumber(active_jobs[i + 1])
551-
local job_meta_hash_name = queue_id .. ':job:' .. job_id .. ':meta'
550+
local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta'
552551
553552
-- Check if lease has expired
554553
if lease_expiry < now then
@@ -568,7 +567,7 @@ where
568567
for i, job_id in ipairs(delayed_jobs) do
569568
-- Check position information
570569
571-
local job_meta_hash_name = queue_id .. ':job:' .. job_id .. ':meta'
570+
local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta'
572571
local reentry_position = redis.call('HGET', job_meta_hash_name, 'reentry_position') or 'last'
573572
574573
-- Remove from delayed
@@ -605,7 +604,7 @@ where
605604
-- Only process if we have data
606605
if job_data then
607606
-- Update metadata
608-
local job_meta_hash_name = queue_id .. ':job:' .. job_id .. ':meta'
607+
local job_meta_hash_name = 'twmq:' .. queue_id .. ':job:' .. job_id .. ':meta'
609608
610609
611610
redis.call('HSET', job_meta_hash_name, 'processed_at', now)
@@ -637,6 +636,7 @@ where
637636
.key(self.delayed_zset_name())
638637
.key(self.pending_list_name())
639638
.key(self.active_hash_name())
639+
.key(self.job_data_hash_name())
640640
.arg(now)
641641
.arg(batch_size)
642642
.arg(self.options.lease_duration.as_secs())
@@ -714,8 +714,8 @@ where
714714
715715
if #job_ids_to_delete > 0 then
716716
for _, j_id in ipairs(job_ids_to_delete) do
717-
local job_meta_hash = queue_id .. ':job:' .. j_id .. ':meta'
718-
local errors_list_name = queue_id .. ':job:' .. j_id .. ':errors'
717+
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
718+
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
719719
720720
redis.call('HDEL', job_data_hash, j_id)
721721
redis.call('DEL', job_meta_hash)
@@ -841,8 +841,8 @@ where
841841
842842
if #job_ids_to_delete > 0 then
843843
for _, j_id in ipairs(job_ids_to_delete) do
844-
local errors_list_name = queue_id .. ':job:' .. j_id .. ':errors'
845-
local job_meta_hash = queue_id .. ':job:' .. j_id .. ':meta'
844+
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
845+
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
846846
847847
redis.call('HDEL', job_data_hash, j_id)
848848
redis.call('DEL', job_meta_hash)

twmq/tests/basic.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/";
1818
// Helper to clean up Redis keys for a given queue name pattern
1919
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
2020
let mut conn = conn_manager.clone();
21-
let keys_pattern = format!("{}:*", queue_name);
21+
let keys_pattern = format!("twmq:{}:*", queue_name);
22+
2223
let keys: Vec<String> = redis::cmd("KEYS")
2324
.arg(&keys_pattern)
2425
.query_async(&mut conn)

twmq/tests/basic_hook.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use twmq::{
2323
// Helper to clean up Redis keys for a given queue name pattern
2424
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
2525
let mut conn = conn_manager.clone();
26-
let keys_pattern = format!("{}:*", queue_name);
26+
let keys_pattern = format!("twmq:{}:*", queue_name);
27+
2728
let keys: Vec<String> = redis::cmd("KEYS")
2829
.arg(&keys_pattern)
2930
.query_async(&mut conn)

0 commit comments

Comments
 (0)