Skip to content

Commit fe0969d

Browse files
authored
Merge pull request #19 from BinChengZhao/feature/stable
Version 0.8.0
2 parents b6ad9db + 3b0a335 commit fe0969d

File tree

11 files changed

+192
-70
lines changed

11 files changed

+192
-70
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
/target/
44
/examples/target/
55
/examples/*.lock
6-
6+
.DS_Store
77

88
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
99
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
# Version 0.8.0
2+
3+
v0.8.0 New features:
4+
5+
1. Optimized the api for custom tokio runtime, better api definition, better experience.
6+
7+
Api changes.
8+
1. Added: `tokio_runtime_by_default` & `tokio_runtime_by_custom` & `tokio_runtime_shared_by_custom` .
9+
10+
2. Private: `tokio_runtime`.
11+
12+
2. Optimized the method of canceling task instances, no error log is recorded when canceling a task failure after timeout.
13+
14+
3. Fix the compile error about `quit_one_task_handler` under nightly version.
115
# Version 0.7.0
216

317
v0.7.0 New features:

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "delay_timer"
3-
version = "0.7.2"
3+
version = "0.8.0"
44
authors = ["binchengZhao <binchengZhao@outlook.com>"]
55
edition = "2018"
66
repository = "https://github.com/BinChengZhao/delay-timer"

examples/cycle_tokio_task.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use std::thread::{current, park, Thread};
88

99
fn main() -> Result<()> {
1010
// Build an DelayTimer that uses the default configuration of the Tokio runtime internally.
11-
let delay_timer = DelayTimerBuilder::default().tokio_runtime(None).build();
11+
let delay_timer = DelayTimerBuilder::default()
12+
.tokio_runtime_by_default()
13+
.build();
1214

1315
// Develop a task that runs in an asynchronous cycle (using a custom asynchronous template).
1416
delay_timer.add_task(build_task_customized_async_task()?)?;

examples/dynamic_cancel.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ fn async_cancel() -> Result<()> {
3737

3838
// Build an DelayTimer that uses the Customize a tokio runtime.
3939
let delay_timer = DelayTimerBuilder::default()
40-
.tokio_runtime(Some(tokio_rt.clone()))
40+
.tokio_runtime_shared_by_custom(tokio_rt.clone())
4141
.build();
4242

4343
tokio_rt.block_on(async {

examples/generic.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use anyhow::Result;
2+
use delay_timer::prelude::*;
3+
use smol::Timer;
4+
use std::any::{type_name, Any};
5+
use std::thread::park_timeout;
6+
use std::time::Duration;
7+
use surf;
8+
9+
// cargo run --package delay_timer --example generic --features=full
10+
11+
fn main() -> Result<()> {
12+
let delay_timer = DelayTimerBuilder::default().enable_status_report().build();
13+
14+
// Develop tasks with generic parameters. that runs in an asynchronous cycle.
15+
delay_timer.add_task(build_generic_task_async_request(Dog)?)?;
16+
17+
// Give a little time to observe the output.
18+
park_timeout(Duration::from_secs(30));
19+
20+
// No new tasks are accepted; running tasks are not affected.
21+
delay_timer.stop_delay_timer()?;
22+
23+
Ok(())
24+
}
25+
26+
fn build_generic_task_async_request<T: Animal>(animal: T) -> Result<Task, TaskError> {
27+
let mut task_builder = TaskBuilder::default();
28+
29+
let other_animal = Cat;
30+
let int_animal = 1;
31+
32+
let body = create_async_fn_body!((animal, other_animal, int_animal){
33+
if let Ok(mut res) = surf::get("https://httpbin.org/get").await {
34+
dbg!(res.body_string().await.unwrap_or_default());
35+
animal_ref.call();
36+
other_animal_ref.call();
37+
<i32 as Animal>::call(&int_animal_ref);
38+
39+
40+
Timer::after(Duration::from_secs(3)).await;
41+
dbg!("Task2 is done.");
42+
}
43+
});
44+
45+
task_builder
46+
.set_frequency(Frequency::CountDown(15, "* * * * * * *"))
47+
.set_task_id(2)
48+
.set_maximum_running_time(5)
49+
.spawn(body)
50+
}
51+
52+
trait ThreadSafe: Any + Sized + Clone + Send + Sync + 'static {}
53+
impl<T: Any + Sized + Clone + Send + Sync + 'static> ThreadSafe for T {}
54+
55+
trait Animal: ThreadSafe {
56+
fn call(&self);
57+
}
58+
59+
impl<T: ThreadSafe> Animal for T {
60+
fn call(&self) {
61+
println!("this is {}", type_name::<T>());
62+
}
63+
}
64+
65+
#[derive(Clone, Copy)]
66+
struct Dog;
67+
68+
#[derive(Clone, Copy)]
69+
struct Cat;

src/entity.rs

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl fmt::Debug for SharedHeader {
116116

117117
#[derive(Clone, Default, Debug)]
118118
pub(crate) struct RuntimeInstance {
119-
//smol have no instance.
119+
// smol have no instance.
120120
#[cfg(feature = "tokio-support")]
121121
pub(crate) inner: Option<Arc<Runtime>>,
122122
pub(crate) kind: RuntimeKind,
@@ -335,29 +335,27 @@ cfg_tokio_support!(
335335
///
336336
/// This function requires the `tokio-support` feature of the `delay_timer`
337337
/// crate to be enabled.
338-
impl DelayTimerBuilder{
339-
340-
fn assign_task_by_tokio(&mut self, timer: Timer,event_handle: EventHandle) {
338+
impl DelayTimerBuilder {
339+
fn assign_task_by_tokio(&mut self, timer: Timer, event_handle: EventHandle) {
341340
self.run_async_schedule_by_tokio(timer);
342341
self.run_event_handle_by_tokio(event_handle);
343342
}
344343

345-
fn run_async_schedule_by_tokio(&self, mut timer: Timer){
346-
if let Some(ref tokio_runtime_ref) = self.shared_header.runtime_instance.inner{
347-
let tokio_runtime = tokio_runtime_ref.clone();
348-
Builder::new()
349-
.name("async_schedule_tokio".into())
350-
.spawn(move || {
351-
tokio_runtime.block_on(async {
352-
timer.async_schedule().await;
353-
})
354-
})
355-
.expect("async_schedule can't start.");
356-
}
357-
344+
fn run_async_schedule_by_tokio(&self, mut timer: Timer) {
345+
if let Some(ref tokio_runtime_ref) = self.shared_header.runtime_instance.inner {
346+
let tokio_runtime = tokio_runtime_ref.clone();
347+
Builder::new()
348+
.name("async_schedule_tokio".into())
349+
.spawn(move || {
350+
tokio_runtime.block_on(async {
351+
timer.async_schedule().await;
352+
})
353+
})
354+
.expect("async_schedule can't start.");
355+
}
358356
}
359357

360-
fn run_event_handle_by_tokio(&self, mut event_handle: EventHandle){
358+
fn run_event_handle_by_tokio(&self, mut event_handle: EventHandle) {
361359
if let Some(ref tokio_runtime_ref) = self.shared_header.runtime_instance.inner {
362360
let tokio_runtime = tokio_runtime_ref.clone();
363361
Builder::new()
@@ -369,44 +367,58 @@ cfg_tokio_support!(
369367
})
370368
.expect("event_handle_handle_by_tokio can't start.");
371369
}
370+
}
372371

373-
}
374-
375-
/// With this API, let DelayTimer internally use the user custom TokioRuntime.
376-
/// If None is given, a custom TokioRuntime is generated internally.
377-
pub fn tokio_runtime(mut self, rt:Option<Arc<Runtime>>) ->Self {
378-
self.shared_header.register_tokio_runtime(rt);
379-
self
372+
/// With this API, `DelayTimer` use default `TokioRuntime` is generated internally.
373+
pub fn tokio_runtime_by_default(self) -> Self {
374+
self.tokio_runtime(None)
380375
}
381-
}
382376

383-
impl SharedHeader {
384-
pub(crate) fn tokio_support() -> Option<Runtime> {
385-
TokioBuilder::new_multi_thread().enable_all()
386-
.thread_name_fn(|| {
387-
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
388-
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
389-
format!("tokio-{}", id)
390-
})
391-
.on_thread_start(|| {
392-
info!("tokio-thread started");
393-
})
394-
.build()
395-
.ok()
396-
}
377+
/// With this API, `DelayTimer` internally use the user customized and independent `TokioRuntime`.
378+
pub fn tokio_runtime_by_custom(self, rt: Runtime) -> Self {
379+
self.tokio_runtime(Some(Arc::new(rt)))
380+
}
397381

398-
pub(crate) fn register_tokio_runtime(&mut self,mut rt:Option<Arc<Runtime>>) {
382+
/// With this api, `DelayTimer` internal will share a `TokioRuntime` with the user .
383+
pub fn tokio_runtime_shared_by_custom(self, rt: Arc<Runtime>) -> Self {
384+
self.tokio_runtime(Some(rt))
385+
}
399386

400-
if rt.is_none(){
401-
rt = Some(Arc::new(Self::tokio_support().expect("init tokioRuntime is fail.")));
402-
}
387+
/// With this API, `DelayTimer` internally use the user custom TokioRuntime.
388+
/// If None is given, a default TokioRuntime is generated internally.
389+
pub(crate) fn tokio_runtime(mut self, rt: Option<Arc<Runtime>>) -> Self {
390+
self.shared_header.register_tokio_runtime(rt);
391+
self
392+
}
393+
}
403394

404-
self.runtime_instance.inner = rt;
405-
self.runtime_instance.kind = RuntimeKind::Tokio;
406-
}
395+
impl SharedHeader {
396+
pub(crate) fn tokio_support() -> Option<Runtime> {
397+
TokioBuilder::new_multi_thread()
398+
.enable_all()
399+
.thread_name_fn(|| {
400+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
401+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
402+
format!("tokio-{}", id)
403+
})
404+
.on_thread_start(|| {
405+
info!("tokio-thread started");
406+
})
407+
.build()
408+
.ok()
407409
}
408410

411+
pub(crate) fn register_tokio_runtime(&mut self, mut rt: Option<Arc<Runtime>>) {
412+
if rt.is_none() {
413+
rt = Some(Arc::new(
414+
Self::tokio_support().expect("init tokioRuntime is fail."),
415+
));
416+
}
409417

418+
self.runtime_instance.inner = rt;
419+
self.runtime_instance.kind = RuntimeKind::Tokio;
420+
}
421+
}
410422

411423
);
412424

src/timer/event_handle.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ impl EventHandle {
148148
}
149149
);
150150

151-
//handle all event.
152-
//TODO:Add TestUnit.
151+
// handle all event.
152+
// TODO: Add TestUnit.
153153
pub(crate) async fn lauch(&mut self) {
154154
self.init_sub_workers();
155155
self.handle_event().await;
@@ -231,11 +231,11 @@ impl EventHandle {
231231
remove_result
232232
}
233233
TimerEvent::CancelTask(task_id, record_id) => {
234-
self.cancel_task(task_id, record_id, state::instance::CANCELLED)
234+
self.cancel_task::<true>(task_id, record_id, state::instance::CANCELLED)
235235
}
236236

237237
TimerEvent::TimeoutTask(task_id, record_id) => {
238-
self.cancel_task(task_id, record_id, state::instance::TIMEOUT)
238+
self.cancel_task::<false>(task_id, record_id, state::instance::TIMEOUT)
239239
}
240240

241241
TimerEvent::AppendTaskHandle(task_id, delay_task_handler_box) => {
@@ -384,13 +384,37 @@ impl EventHandle {
384384
))
385385
}
386386

387-
pub(crate) fn cancel_task(&mut self, task_id: u64, record_id: i64, state: usize) -> Result<()> {
387+
// The `INITIATIVE` mark indicates whether the cancellation was initiated by an outside party.
388+
389+
// `INITIATIVE` = true
390+
// External initiative to cancel the action,
391+
// If the cancellation fails then the error log record needs to be kept.
392+
393+
// `INITIATIVE` = false
394+
// Passive cancellation at runtime (e.g., timeout) indicates that
395+
// The task instance has completed or has been actively cancelled,
396+
// And no error logging is required.
397+
pub(crate) fn cancel_task<const INITIATIVE: bool>(
398+
&mut self,
399+
task_id: u64,
400+
record_id: i64,
401+
state: usize,
402+
) -> Result<()> {
388403
if let Some(mut task_mark_ref_mut) = self.shared_header.task_flag_map.get_mut(&task_id) {
389404
let task_mark = task_mark_ref_mut.value_mut();
390405

391406
// The cancellation operation is executed first, and then the outside world is notified of the cancellation event.
392407
// If the operation object does not exist in the middle, it should return early.
393-
self.task_trace.quit_one_task_handler(task_id, record_id)?;
408+
409+
let quit_result = self.task_trace.quit_one_task_handler(task_id, record_id);
410+
411+
if quit_result.is_err() {
412+
if INITIATIVE {
413+
quit_result?;
414+
} else {
415+
return Ok(());
416+
}
417+
}
394418

395419
if task_mark.task_instances_chain_maintainer.is_some() {
396420
// Here the user can be notified that the task instance has disappeared via `Instance`.
@@ -401,6 +425,7 @@ impl EventHandle {
401425

402426
return Ok(());
403427
}
428+
404429
Err(anyhow!(
405430
"Fn : `cancel_task`, Without the `task_mark_ref_mut` for task_id :{}, record_id : {}, state : {}",
406431
task_id,

src/timer/runtime_trace/sweeper.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ impl RecyclingBins {
145145
match self.recycle_unit_sources.recv().await {
146146
Ok(recycle_unit) => {
147147
let mut recycle_unit_heap = self.recycle_unit_heap.lock().await;
148+
148149
(&mut recycle_unit_heap).push(Reverse(recycle_unit));
149150
}
150151

src/timer/runtime_trace/task_handle.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ impl TaskTrace {
4848
task_id, record_id
4949
)
5050
})?;
51-
52-
let mut list_mut_cursor = task_handler_list.cursor_back_mut();
51+
52+
let mut list_mut_cursor = task_handler_list.cursor_front_mut();
5353

5454
let mut task_handler_box_ref: &mut DelayTaskHandlerBox;
5555
loop {
@@ -74,7 +74,7 @@ impl TaskTrace {
7474
list_mut_cursor.move_next();
7575
}
7676

77-
//remove current task_handler_box.
77+
// remove current task_handler_box.
7878
list_mut_cursor
7979
.remove_current()
8080
.map(|mut task_handler_box| task_handler_box.quit())

0 commit comments

Comments
 (0)