Skip to content

Commit 738559d

Browse files
authored
Merge pull request #15 from Norio4/add_perform_in
Add public function perform_in() for Client
2 parents 7888661 + 8c1d0e8 commit 738559d

File tree

4 files changed

+135
-18
lines changed

4 files changed

+135
-18
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ serde = "1.0"
2121
serde_json = "1.0"
2222
r2d2 = "0.8"
2323
r2d2_redis = "0.14"
24+
chrono = "0.4.19"

README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,43 @@ sidekiq = "0.9"
2727
* <https://github.com/jkcclemens/paste>
2828
* <https://github.com/spk/maman>
2929

30+
31+
## Examples
32+
33+
```
34+
use sidekiq::{Job, Value};
35+
use sidekiq::{Client, ClientOpts, create_redis_pool};
36+
use chrono::Duration;
37+
38+
let ns = "test";
39+
let client_opts = ClientOpts {
40+
namespace: Some(ns.to_string()),
41+
..Default::default()
42+
};
43+
let pool = create_redis_pool().unwrap();
44+
let client = Client::new(pool, client_opts);
45+
let class = "MyClass".to_string();
46+
47+
// basic job
48+
let job = Job::new(class, vec![sidekiq::Value::Null], Default::default());
49+
match client.push(job) {
50+
Ok(_) => {},
51+
Err(err) => {
52+
println!("Sidekiq push failed: {}", err);
53+
},
54+
}
55+
56+
// scheduled-jobs (perform_in)
57+
let job = Job::new(class, vec![sidekiq::Value::Null], Default::default());
58+
let interval = Duration::hours(1);
59+
match client.perform_in(interval, job) {
60+
Ok(_) => {},
61+
Err(err) => {
62+
println!("Sidekiq push failed: {}", err);
63+
},
64+
}
65+
```
66+
3067
## REFERENCES
3168

3269
* <http://julienblanchard.com/2015/using-resque-with-rust/>

src/sidekiq/mod.rs

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use rand::{thread_rng, Rng};
1010
use serde::ser::SerializeStruct;
1111
use serde::{Serialize, Serializer};
1212

13+
use chrono::{Duration, Local};
14+
1315
const REDIS_URL_ENV: &str = "REDIS_URL";
1416
const REDIS_URL_DEFAULT: &str = "redis://127.0.0.1/";
1517
pub type RedisPooledConnection = r2d2::PooledConnection<RedisConnectionManager>;
@@ -204,35 +206,83 @@ impl Client {
204206
}
205207
}
206208

209+
fn calc_at(&self, interval: Duration) -> Option<f64> {
210+
let div: f64 = 1_000_f64;
211+
let maximum_interval: f64 = 1_000_000_000_f64;
212+
let interval_millsec: f64 = interval.num_milliseconds() as f64 / div;
213+
let now_millisec: f64 = Local::now().timestamp_millis() as f64 / div;
214+
215+
let start_at: f64 = if interval_millsec < maximum_interval {
216+
now_millisec + interval_millsec
217+
} else {
218+
interval_millsec
219+
};
220+
221+
if start_at <= now_millisec {
222+
None
223+
} else {
224+
Some(start_at)
225+
}
226+
}
227+
228+
pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
229+
self.raw_push(&[job], self.calc_at(interval))
230+
}
231+
207232
pub fn push(&self, job: Job) -> Result<(), ClientError> {
208-
self.raw_push(&[job])
233+
self.raw_push(&[job], None)
209234
}
210235

211236
pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
212-
self.raw_push(jobs)
237+
self.raw_push(jobs, None)
213238
}
214239

215-
fn raw_push(&self, payloads: &[Job]) -> Result<(), ClientError> {
240+
fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
216241
let payload = &payloads[0];
217242
let to_push = payloads
218243
.iter()
219244
.map(|entry| serde_json::to_string(&entry).unwrap())
220245
.collect::<Vec<_>>();
221-
match self.connect() {
222-
Ok(mut conn) => redis::pipe()
223-
.atomic()
224-
.cmd("SADD")
225-
.arg("queues")
226-
.arg(payload.queue.to_string())
227-
.ignore()
228-
.cmd("LPUSH")
229-
.arg(self.queue_name(&payload.queue))
230-
.arg(to_push)
231-
.query(&mut *conn)
232-
.map_err(|err| ClientError {
233-
kind: ErrorKind::Redis(err),
234-
}),
235-
Err(err) => Err(err),
246+
247+
if at.is_none() {
248+
match self.connect() {
249+
Ok(mut conn) => redis::pipe()
250+
.atomic()
251+
.cmd("SADD")
252+
.arg("queues")
253+
.arg(payload.queue.to_string())
254+
.ignore()
255+
.cmd("LPUSH")
256+
.arg(self.queue_name(&payload.queue))
257+
.arg(to_push)
258+
.query(&mut *conn)
259+
.map_err(|err| ClientError {
260+
kind: ErrorKind::Redis(err),
261+
}),
262+
Err(err) => Err(err),
263+
}
264+
} else {
265+
match self.connect() {
266+
Ok(mut conn) => redis::pipe()
267+
.atomic()
268+
.cmd("ZADD")
269+
.arg(self.schedule_queue_name())
270+
.arg(at.unwrap().to_string())
271+
.arg(to_push)
272+
.query(&mut *conn)
273+
.map_err(|err| ClientError {
274+
kind: ErrorKind::Redis(err),
275+
}),
276+
Err(err) => Err(err),
277+
}
278+
}
279+
}
280+
281+
fn schedule_queue_name(&self) -> String {
282+
if let Some(ref ns) = self.namespace {
283+
format!("{}:schedule", ns)
284+
} else {
285+
format!("schedule")
236286
}
237287
}
238288

tests/lib.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use std::time::{SystemTime, UNIX_EPOCH};
88
use serde_json::value::Value;
99
use sidekiq::{create_redis_pool, Client, ClientOpts, Job};
1010

11+
use chrono::Duration;
12+
1113
fn args() -> Vec<Value> {
1214
let value = json!({
1315
"code": 200,
@@ -82,3 +84,30 @@ fn test_client_push_bulk() {
8284
}
8385
};
8486
}
87+
88+
#[test]
89+
fn test_client_perform_in() {
90+
let class = "MyClass".to_string();
91+
let job = Job::new(class, args(), Default::default());
92+
let client = get_client();
93+
let interval = Duration::hours(1);
94+
match client.perform_in(interval, job) {
95+
Ok(_) => {}
96+
Err(err) => {
97+
println!("Sidekiq push failed: {}", err);
98+
unreachable!()
99+
}
100+
}
101+
102+
let class = "MyClass".to_string();
103+
let job = Job::new(class, args(), Default::default());
104+
let client = get_client();
105+
let interval = Duration::hours(0);
106+
match client.perform_in(interval, job) {
107+
Ok(_) => {}
108+
Err(err) => {
109+
println!("Sidekiq push failed: {}", err);
110+
unreachable!()
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)