Skip to content

Commit bddefb0

Browse files
committed
fix: issue #36.
1 parent 9ec9f9c commit bddefb0

File tree

11 files changed

+72
-42
lines changed

11 files changed

+72
-42
lines changed

benches/body.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,6 @@ fn bench_try_wait(b: &mut Bencher) {
6363
}
6464

6565
#[bench]
66-
fn bench_get_timestamp(b: &mut Bencher) {
67-
b.iter(|| get_timestamp());
66+
fn bench_timestamp(b: &mut Bencher) {
67+
b.iter(|| timestamp());
6868
}

examples/cycle_tokio_task.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ fn build_wake_task() -> Result<Task> {
5757

5858
pub async fn generate_closure_template() {
5959
let name: String = "'delay_timer-is-easy-to-use.'".into();
60-
let future_inner = async_template(get_timestamp() as i32, name.clone());
60+
let future_inner = async_template(timestamp() as i32, name.clone());
6161
future_inner.await.ok();
6262
}
6363

examples/demo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ fn build_task_customized_async_task() -> Result<Task, TaskError> {
103103
let body = move || {
104104
let name = name.clone();
105105
async move {
106-
async_template(get_timestamp() as i32, name).await;
106+
async_template(timestamp() as i32, name).await;
107107
}
108108
};
109109
#[allow(deprecated)]

examples/increase.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ fn get_wake_fn(
9090
unsafe {
9191
println!(
9292
"end time {}, result {}",
93-
get_timestamp(),
93+
timestamp(),
9494
(*local_run_flag).load(SeqCst)
9595
);
9696
}

src/entity.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl Default for SharedHeader {
177177
let wheel_queue = EventHandle::init_task_wheel(DEFAULT_TIMER_SLOT_COUNT);
178178
let task_flag_map = Arc::new(DashMap::new());
179179
let second_hand = Arc::new(AtomicU64::new(0));
180-
let global_time = Arc::new(AtomicU64::new(get_timestamp()));
180+
let global_time = Arc::new(AtomicU64::new(timestamp()));
181181
let shared_motivation = Arc::new(AtomicBool::new(true));
182182
let runtime_instance = RuntimeInstance::default();
183183
let id_generator = Arc::new(AsyncMutex::new(SnowflakeIdGenerator::new(1, 1)));
@@ -510,7 +510,7 @@ cfg_status_report!(
510510
// to get rid of system interference,
511511
// and this change can also be applied to snowflake-rs.
512512
/// get current OS SystemTime.
513-
pub fn get_timestamp() -> u64 {
513+
pub fn timestamp() -> u64 {
514514
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
515515
Ok(n) => n.as_secs(),
516516
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
@@ -522,7 +522,7 @@ pub fn get_timestamp() -> u64 {
522522
// to get rid of system interference,
523523
// and this change can also be applied to snowflake-rs.
524524
/// get current OS SystemTime.
525-
pub fn get_timestamp_micros() -> u128 {
525+
pub fn timestamp_micros() -> u128 {
526526
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
527527
Ok(n) => n.as_micros(),
528528
Err(_) => panic!("SystemTime before UNIX EPOCH!"),

src/prelude.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
//!
1111
//! The prelude may grow over time as additional items see ubiquitous use.
1212
13-
pub use crate::entity::{get_timestamp, get_timestamp_micros, DelayTimer, DelayTimerBuilder};
13+
pub use crate::entity::{timestamp, timestamp_micros, DelayTimer, DelayTimerBuilder};
1414
pub use crate::error::*;
1515
pub use crate::timer::runtime_trace::state::instance;
1616
pub use crate::timer::runtime_trace::task_handle::DelayTaskHandler;

src/timer/runtime_trace/sweeper.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ impl RecyclingBins {
9898
loop {
9999
let mut recycle_unit_heap = self.recycle_unit_heap.lock().await;
100100

101-
let now: u64 = get_timestamp();
101+
let now: u64 = timestamp();
102102
let mut duration: Option<Duration> = None;
103103
for _ in 0..200 {
104104
if let Some(recycle_flag) = (&recycle_unit_heap).peek().map(|r| r.0.deadline <= now)
@@ -185,7 +185,7 @@ mod tests {
185185

186186
#[test]
187187
fn test_task_valid() -> AnyResult<()> {
188-
use super::{get_timestamp, RecycleUnit, RecyclingBins, RuntimeKind, TimerEvent};
188+
use super::{timestamp, RecycleUnit, RecyclingBins, RuntimeKind, TimerEvent};
189189
use smol::{
190190
block_on,
191191
channel::{unbounded, TryRecvError},
@@ -216,7 +216,7 @@ mod tests {
216216
})
217217
});
218218

219-
let deadline = get_timestamp() + 5;
219+
let deadline = timestamp() + 5;
220220

221221
for i in 1..10 {
222222
recycle_unit_sender.try_send(RecycleUnit::new(deadline, i, (i * i) as i64))?;

src/timer/task.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,15 @@ impl<'a> TryFrom<(FrequencyUnify<'a>, ScheduleIteratorTimeZone)> for FrequencyIn
228228
return Err(FrequencyAnalyzeError::DisInitTime);
229229
}
230230

231-
let seconds_state: SecondsState = (get_timestamp()..).step_by(seconds as usize);
231+
let seconds_state: SecondsState = (timestamp()..).step_by(seconds as usize);
232232
FrequencyInner::SecondsCountDown(1, seconds_state)
233233
}
234234
FrequencyUnify::FrequencySeconds(FrequencySeconds::Repeated(seconds)) => {
235235
if seconds == 0 {
236236
return Err(FrequencyAnalyzeError::DisInitTime);
237237
}
238238

239-
let seconds_state: SecondsState = (get_timestamp()..).step_by(seconds as usize);
239+
let seconds_state: SecondsState = (timestamp()..).step_by(seconds as usize);
240240

241241
FrequencyInner::SecondsRepeated(seconds_state)
242242
}
@@ -245,7 +245,7 @@ impl<'a> TryFrom<(FrequencyUnify<'a>, ScheduleIteratorTimeZone)> for FrequencyIn
245245
return Err(FrequencyAnalyzeError::DisInitTime);
246246
}
247247

248-
let seconds_state: SecondsState = (get_timestamp()..).step_by(seconds as usize);
248+
let seconds_state: SecondsState = (timestamp()..).step_by(seconds as usize);
249249
FrequencyInner::SecondsCountDown(count_down, seconds_state)
250250
}
251251
};
@@ -513,7 +513,7 @@ impl TaskContext {
513513
.send(TimerEvent::FinishTask(FinishTaskBody {
514514
task_id: self.task_id,
515515
record_id: self.record_id,
516-
finish_time: get_timestamp(),
516+
finish_time: timestamp(),
517517
finish_output,
518518
}))
519519
.await
@@ -1008,7 +1008,7 @@ impl<'a> TaskBuilder<'a> {
10081008
10091009
pub fn set_frequency_once_by_timestamp_seconds(&mut self, timestamp_seconds: u64) -> &mut Self {
10101010
let duration = timestamp_seconds
1011-
.checked_sub(get_timestamp())
1011+
.checked_sub(timestamp())
10121012
.unwrap_or(ONE_SECOND);
10131013

10141014
self.frequency = FrequencyUnify::FrequencySeconds(FrequencySeconds::Once(duration));
@@ -1148,7 +1148,7 @@ mod tests {
11481148
.map(|i| {
11491149
debug_assert_eq!(
11501150
task.get_next_exec_timestamp().unwrap(),
1151-
get_timestamp() + (init_seconds * (i - 1))
1151+
timestamp() + (init_seconds * (i - 1))
11521152
);
11531153
})
11541154
.for_each(drop);
@@ -1180,7 +1180,7 @@ mod tests {
11801180
.map(|i| {
11811181
debug_assert_eq!(
11821182
task.get_next_exec_timestamp().unwrap(),
1183-
get_timestamp() + (init_minutes * (i - 1) * ONE_MINUTE)
1183+
timestamp() + (init_minutes * (i - 1) * ONE_MINUTE)
11841184
);
11851185
})
11861186
.for_each(drop);
@@ -1201,7 +1201,7 @@ mod tests {
12011201
.map(|i| {
12021202
debug_assert_eq!(
12031203
task.get_next_exec_timestamp().unwrap(),
1204-
get_timestamp() + (init_hours * (i - 1) * ONE_HOUR)
1204+
timestamp() + (init_hours * (i - 1) * ONE_HOUR)
12051205
);
12061206
})
12071207
.for_each(drop);
@@ -1222,7 +1222,7 @@ mod tests {
12221222
.map(|i| {
12231223
debug_assert_eq!(
12241224
task.get_next_exec_timestamp().unwrap(),
1225-
get_timestamp() + (init_days * (i - 1) * ONE_DAY)
1225+
timestamp() + (init_days * (i - 1) * ONE_DAY)
12261226
);
12271227
})
12281228
.for_each(drop);

src/timer/timer_core.rs

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! It is the core of the entire cycle scheduling task.
33
use crate::prelude::*;
44

5-
use crate::entity::get_timestamp;
5+
use crate::entity::timestamp;
66
use crate::entity::RuntimeKind;
77

88
use std::mem::replace;
@@ -15,10 +15,12 @@ use smol::Timer as smolTimer;
1515
pub(crate) const DEFAULT_TIMER_SLOT_COUNT: u64 = 3600;
1616

1717
/// The clock of timer core.
18+
#[derive(Debug)]
1819
struct Clock {
1920
inner: ClockInner,
2021
}
2122

23+
#[derive(Debug)]
2224
enum ClockInner {
2325
Sc(SmolClock),
2426

@@ -52,7 +54,7 @@ impl Clock {
5254
ClockInner::Sc(ref mut smol_clock) => smol_clock.tick().await,
5355

5456
ClockInner::Tc(ref mut tokio_clock) => tokio_clock.tick().await,
55-
}
57+
};
5658
}
5759
}
5860
use tokio::time::{self, interval_at, Interval};
@@ -142,14 +144,15 @@ pub enum TimerEvent {
142144
/// Take the initiative to perform once Task.
143145
AdvanceTask(u64),
144146
}
145-
#[derive(Clone, Debug)]
147+
#[derive(Debug)]
146148
/// delay-timer internal timer wheel core.
147149
pub struct Timer {
148150
/// Event sender that provides events to `EventHandle` processing.
149-
pub(crate) timer_event_sender: TimerEventSender,
151+
timer_event_sender: TimerEventSender,
150152
#[allow(dead_code)]
151153
status_report_sender: Option<AsyncSender<i32>>,
152-
pub(crate) shared_header: SharedHeader,
154+
shared_header: SharedHeader,
155+
clock: Clock,
153156
}
154157

155158
// In any case, the task is not executed in the Scheduler,
@@ -158,10 +161,14 @@ pub struct Timer {
158161
impl Timer {
159162
/// Initialize a timer wheel core.
160163
pub fn new(timer_event_sender: TimerEventSender, shared_header: SharedHeader) -> Self {
164+
let runtime_kind = shared_header.runtime_instance.kind;
165+
let clock = Clock::new(runtime_kind);
166+
161167
Timer {
162168
timer_event_sender,
163169
status_report_sender: None,
164170
shared_header,
171+
clock,
165172
}
166173
}
167174

@@ -181,25 +188,29 @@ impl Timer {
181188
.unwrap_or_else(|e| e)
182189
}
183190

191+
/// Time goes on, the clock ticks.
192+
pub(crate) async fn lapse(&mut self) {
193+
self.clock.tick().await;
194+
self.next_position();
195+
}
196+
184197
/// Return a future can pool it for Schedule all cycles task.
185198
pub(crate) async fn async_schedule(&mut self) {
186-
// if that overtime , i run it not block
187-
let mut second_hand;
188-
let mut next_second_hand;
189-
let mut timestamp;
199+
// if that overtime , run it not block
190200

191-
let runtime_kind = self.shared_header.runtime_instance.kind;
192-
let mut clock = Clock::new(runtime_kind);
201+
let mut second_hand = self.second_hand();
202+
let mut next_second_hand = second_hand + 1;
203+
let mut current_timestamp = timestamp();
193204

194205
loop {
195206
//TODO: replenish ending single, for stop current jod and thread.
196207
if !self.shared_header.shared_motivation.load(Acquire) {
197208
return;
198209
}
199210

200-
second_hand = self.next_position();
201-
timestamp = get_timestamp();
202-
self.shared_header.global_time.store(timestamp, Release);
211+
self.shared_header
212+
.global_time
213+
.store(current_timestamp, Release);
203214
let task_ids;
204215

205216
{
@@ -217,7 +228,7 @@ impl Timer {
217228
}
218229
}
219230

220-
trace!("timestamp: {}, task_ids: {:?}", timestamp, task_ids);
231+
trace!("timestamp: {}, task_ids: {:?}", current_timestamp, task_ids);
221232

222233
// Centralize task processing to avoid duplicate lock requests and releases.
223234
// FIXME: https://github.com/BinChengZhao/delay-timer/issues/29
@@ -234,8 +245,7 @@ impl Timer {
234245
}
235246

236247
if let Some(task) = task_option {
237-
next_second_hand = (second_hand + 1) % DEFAULT_TIMER_SLOT_COUNT;
238-
self.maintain_task(task, timestamp, next_second_hand)
248+
self.maintain_task(task, current_timestamp, next_second_hand)
239249
.await
240250
.map_err(|e| error!("{}", e))
241251
.ok();
@@ -250,10 +260,20 @@ impl Timer {
250260
}
251261
}
252262

253-
clock.tick().await;
263+
self.lapse().await;
264+
265+
second_hand = self.second_hand();
266+
next_second_hand = (second_hand + 1) % DEFAULT_TIMER_SLOT_COUNT;
267+
current_timestamp = timestamp();
254268
}
255269
}
256270

271+
/// Access to the second-hand
272+
pub(crate) fn second_hand(&self) -> u64 {
273+
self.shared_header.second_hand.load(Acquire)
274+
}
275+
276+
/// Send timer-event to event-handle.
257277
pub(crate) async fn send_timer_event(
258278
&mut self,
259279
task_id: u64,
@@ -347,6 +367,11 @@ impl Timer {
347367
.get_next_exec_timestamp()
348368
.ok_or_else(|| anyhow!("can't get_next_exec_timestamp in task :{}", task.task_id))?;
349369

370+
// cylinder_line = 24
371+
// slot_seed = 60
372+
// when-init: slot_seed+=1 == 61
373+
// when-on-slot61-exec: (task_excute_timestamp - timestamp + next_second_hand) % slot_seed == 61
374+
350375
// Time difference + next second hand % DEFAULT_TIMER_SLOT_COUNT
351376
let step = task_excute_timestamp.checked_sub(timestamp).unwrap_or(1) + next_second_hand;
352377
let cylinder_line = step / DEFAULT_TIMER_SLOT_COUNT;

src/utils/parse.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub mod shell_command {
6363
/// Abstraction of command methods in multiple libraries.
6464
pub trait CommandUnify<Child: ChildUnify>: Sized {
6565
/// Constructs a new Command for launching the program at path program.
66+
// FIXME:
6667
fn new<S: AsRef<OsStr>>(program: S) -> Self {
6768
Self::new(program.as_ref())
6869
}

0 commit comments

Comments
 (0)