Skip to content

Commit bd0b04c

Browse files
committed
chore: fix all tests case.
1 parent cdcdd2a commit bd0b04c

File tree

4 files changed

+39
-19
lines changed

4 files changed

+39
-19
lines changed

src/entity.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,32 @@ cfg_status_report!(
3838
pub(crate) type SharedMotivation = Arc<AtomicBool>;
3939
// Global IdGenerator.
4040
pub(crate) type SharedIdGenerator = Arc<AsyncMutex<SnowflakeIdGenerator>>;
41-
// Global sencond hand.
42-
pub(crate) type SencondHand = Arc<AtomicU64>;
4341
// Global Timestamp.
4442
pub(crate) type GlobalTime = Arc<AtomicU64>;
4543
// Shared task-wheel for operate.
4644
pub(crate) type SharedTaskWheel = Arc<DashMap<u64, Slot>>;
4745
// The slot currently used for storing global tasks.
4846
pub(crate) type SharedTaskFlagMap = Arc<DashMap<u64, TaskMark>>;
4947

48+
/// Global sencond hand.
49+
#[derive(Debug, Clone, Default)]
50+
pub(crate) struct SencondHand {
51+
pub(crate) inner: Arc<AtomicU64>,
52+
}
53+
54+
impl SencondHand {
55+
pub(crate) fn current_second_hand(&self) -> u64 {
56+
self.inner.load(Ordering::Acquire)
57+
}
58+
59+
pub(crate) fn next(&self) -> Result<u64, u64> {
60+
self.inner
61+
.fetch_update(Ordering::Release, Ordering::Relaxed, |x| {
62+
Some((x + 1) % DEFAULT_TIMER_SLOT_COUNT)
63+
})
64+
}
65+
}
66+
5067
/// Builds DelayTimer with custom configuration values.
5168
///
5269
/// Methods can be chained in order to set the configuration values. The
@@ -176,7 +193,7 @@ impl Default for SharedHeader {
176193
fn default() -> Self {
177194
let wheel_queue = EventHandle::init_task_wheel(DEFAULT_TIMER_SLOT_COUNT);
178195
let task_flag_map = Arc::new(DashMap::new());
179-
let second_hand = Arc::new(AtomicU64::new(0));
196+
let second_hand = SencondHand::default();
180197
let global_time = Arc::new(AtomicU64::new(timestamp()));
181198
let shared_motivation = Arc::new(AtomicBool::new(true));
182199
let runtime_instance = RuntimeInstance::default();
@@ -234,7 +251,7 @@ impl DelayTimerBuilder {
234251
}
235252

236253
fn assign_task(&mut self, event_handle: EventHandle, shared_header: SharedHeader) {
237-
let timer = Timer::new(self.get_timer_event_sender(), shared_header.clone());
254+
let timer = Timer::new(self.get_timer_event_sender(), shared_header);
238255

239256
self.run_async_schedule(timer);
240257

src/timer/event_handle.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,17 +273,20 @@ impl EventHandle {
273273

274274
// Add task to wheel_queue slot
275275
fn add_task(&mut self, mut task: Box<Task>) -> AnyResult<TaskMark> {
276-
let second_hand = self.shared_header.second_hand.load(Acquire);
276+
let second_hand = self.shared_header.second_hand.current_second_hand();
277277

278278
let exec_time: u64 = task
279279
.get_next_exec_timestamp()
280280
.ok_or_else(|| anyhow!("can't get_next_exec_timestamp in {}", &task.task_id))?;
281281

282282
let timestamp = self.shared_header.global_time.load(Acquire);
283+
284+
// Put task on next slot.
283285
let time_seed: u64 = exec_time
284286
.checked_sub(timestamp)
285287
.unwrap_or(task.task_id % DEFAULT_TIMER_SLOT_COUNT)
286-
+ second_hand;
288+
+ second_hand
289+
+ 1;
287290
let slot_seed: u64 = time_seed % DEFAULT_TIMER_SLOT_COUNT;
288291

289292
let cylinder_line = time_seed / DEFAULT_TIMER_SLOT_COUNT;
@@ -357,7 +360,7 @@ impl EventHandle {
357360
};
358361
task.clear_cylinder_line();
359362

360-
let slot_seed = self.shared_header.second_hand.load(Acquire) + 1;
363+
let slot_seed = self.shared_header.second_hand.current_second_hand() + 1;
361364

362365
if let Some(mut slot) = self.shared_header.wheel_queue.get_mut(&slot_seed) {
363366
slot.value_mut().add_task(task);

src/timer/timer_core.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::entity::timestamp;
66
use crate::entity::RuntimeKind;
77

88
use std::mem::replace;
9-
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
9+
use std::sync::atomic::Ordering::{Acquire, Release};
1010
use std::time::Duration;
1111
use std::time::Instant;
1212

@@ -180,12 +180,7 @@ impl Timer {
180180
/// Offset the current slot by one when reading it,
181181
/// so event_handle can be easily inserted into subsequent slots.
182182
pub(crate) fn next_position(&mut self) -> u64 {
183-
self.shared_header
184-
.second_hand
185-
.fetch_update(Release, Relaxed, |x| {
186-
Some((x + 1) % DEFAULT_TIMER_SLOT_COUNT)
187-
})
188-
.unwrap_or_else(|e| e)
183+
self.shared_header.second_hand.next().unwrap_or_else(|e| e)
189184
}
190185

191186
/// Time goes on, the clock ticks.
@@ -270,7 +265,7 @@ impl Timer {
270265

271266
/// Access to the second-hand
272267
pub(crate) fn second_hand(&self) -> u64 {
273-
self.shared_header.second_hand.load(Acquire)
268+
self.shared_header.second_hand.current_second_hand()
274269
}
275270

276271
/// Send timer-event to event-handle.
@@ -440,6 +435,7 @@ mod tests {
440435
timer
441436
.shared_header
442437
.second_hand
438+
.inner
443439
.store(3599, Ordering::SeqCst);
444440
assert_eq!(timer.next_position(), 3599);
445441
assert_eq!(timer.next_position(), 0);

tests/simulation.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ use std::time::Duration;
1212
use smol::Timer;
1313

1414
// TODO: Please turn on `--features=full` before test.
15-
#[tokio::test]
16-
async fn test_instance_state() -> anyhow::Result<()> {
15+
#[test]
16+
fn test_instance_state() -> anyhow::Result<()> {
1717
let delay_timer = DelayTimer::new();
1818

1919
let body = || async {
20+
println!("create_async_fn_body:i'success");
2021
Timer::after(Duration::from_millis(100)).await;
2122
};
2223

@@ -25,16 +26,19 @@ async fn test_instance_state() -> anyhow::Result<()> {
2526
.set_task_id(1)
2627
.set_maximum_parallel_runnable_num(3)
2728
.spawn_async_routine(body)?;
29+
2830
let task_instance_chain = delay_timer.insert_task(task)?;
2931

3032
// Get the first task instance.
3133
let instance = task_instance_chain.next_with_wait()?;
3234

35+
3336
// The task was still running when the instance was first obtained.
3437
assert_eq!(instance.get_state(), instance::RUNNING);
3538

3639
// Unsolicited mission cancellation.
3740
instance.cancel_with_wait()?;
41+
3842
assert_eq!(instance.get_state(), instance::CANCELLED);
3943

4044
// Get the second task instance.
@@ -267,8 +271,8 @@ async fn test_maximum_parallel_runnable_num() -> AnyResult<()> {
267271
Ok(())
268272
}
269273

270-
#[tokio::test]
271-
async fn tests_countdown() -> AnyResult<()> {
274+
#[test]
275+
fn tests_countdown() -> AnyResult<()> {
272276
let delay_timer = DelayTimer::new();
273277
let share_num = Arc::new(AtomicI32::new(3));
274278
let share_num_bunshin = share_num.clone();

0 commit comments

Comments
 (0)