Skip to content

Commit c629e34

Browse files
committed
fix comments and add benchmark
Signed-off-by: glorv <glorvs@163.com>
1 parent c9c1050 commit c629e34

File tree

6 files changed

+77
-17
lines changed

6 files changed

+77
-17
lines changed

benches/chained_spawn.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ mod yatp_callback {
3535

3636
mod yatp_future {
3737
use criterion::*;
38-
use std::sync::mpsc;
38+
use std::sync::{mpsc, Arc};
39+
use yatp::queue::priority::TaskPriorityProvider;
3940
use yatp::task::future::TaskCell;
4041
use yatp::Remote;
4142

@@ -73,6 +74,20 @@ mod yatp_future {
7374
let pool = yatp::Builder::new("chained_spawn").build_multilevel_future_pool();
7475
chained_spawn(b, pool, iter_count)
7576
}
77+
78+
pub fn chained_spawn_priority(b: &mut Bencher<'_>, iter_count: usize) {
79+
struct ConstantPriorityPrivider;
80+
81+
impl TaskPriorityProvider for ConstantPriorityPrivider {
82+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
83+
// return a constant value so the queue workes the same as FIFO queue.
84+
0
85+
}
86+
}
87+
let pool = yatp::Builder::new("chained_spawn")
88+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
89+
chained_spawn(b, pool, iter_count)
90+
}
7691
}
7792

7893
mod tokio {
@@ -153,6 +168,9 @@ pub fn chained_spawn(b: &mut Criterion) {
153168
i,
154169
|b, i| yatp_future::chained_spawn_multilevel(b, *i),
155170
);
171+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
172+
yatp_future::chained_spawn_priority(b, *i)
173+
});
156174
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
157175
tokio::chained_spawn(b, *i)
158176
});

benches/ping_pong.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ mod yatp_future {
5353
use std::sync::atomic::*;
5454
use std::sync::*;
5555
use tokio::sync::oneshot;
56+
use yatp::queue::priority::TaskPriorityProvider;
5657
use yatp::task::future::TaskCell;
5758

5859
fn ping_pong(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, ping_count: usize) {
@@ -105,6 +106,19 @@ mod yatp_future {
105106
let pool = yatp::Builder::new("ping_pong").build_multilevel_future_pool();
106107
ping_pong(b, pool, ping_count)
107108
}
109+
110+
pub fn ping_pong_priority(b: &mut Bencher<'_>, ping_count: usize) {
111+
struct ConstantPriorityPrivider;
112+
impl TaskPriorityProvider for ConstantPriorityPrivider {
113+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
114+
// return a constant value so the queue workes the same as FIFO queue.
115+
0
116+
}
117+
}
118+
let pool = yatp::Builder::new("ping_pong")
119+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
120+
ping_pong(b, pool, ping_count)
121+
}
108122
}
109123

110124
mod tokio {
@@ -219,6 +233,9 @@ pub fn ping_pong(b: &mut Criterion) {
219233
i,
220234
|b, i| yatp_future::ping_pong_multilevel(b, *i),
221235
);
236+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
237+
yatp_future::ping_pong_priority(b, *i)
238+
});
222239
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
223240
tokio::ping_pong(b, *i)
224241
});

benches/spawn_many.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mod yatp_future {
3535
use criterion::*;
3636
use std::sync::atomic::*;
3737
use std::sync::*;
38+
use yatp::queue::priority::TaskPriorityProvider;
3839
use yatp::task::future::TaskCell;
3940

4041
fn spawn_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, spawn_count: usize) {
@@ -68,6 +69,19 @@ mod yatp_future {
6869
let pool = yatp::Builder::new("spawn_many").build_multilevel_future_pool();
6970
spawn_many(b, pool, spawn_count)
7071
}
72+
73+
pub fn spawn_many_priority(b: &mut Bencher<'_>, spawn_count: usize) {
74+
struct ConstantPriorityPrivider;
75+
impl TaskPriorityProvider for ConstantPriorityPrivider {
76+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
77+
// return a constant value so the queue workes the same as FIFO queue.
78+
0
79+
}
80+
}
81+
let pool = yatp::Builder::new("spawn_many")
82+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
83+
spawn_many(b, pool, spawn_count)
84+
}
7185
}
7286

7387
mod threadpool {
@@ -174,6 +188,9 @@ pub fn spawn_many(b: &mut Criterion) {
174188
i,
175189
|b, i| yatp_future::spawn_many_multilevel(b, *i),
176190
);
191+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
192+
yatp_future::spawn_many_priority(b, *i)
193+
});
177194
group.bench_with_input(BenchmarkId::new("threadpool", i), i, |b, i| {
178195
threadpool::spawn_many(b, *i)
179196
});

benches/yield_many.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ mod yatp_callback {
6161

6262
mod yatp_future {
6363
use criterion::*;
64-
use std::sync::mpsc;
64+
use std::sync::{mpsc, Arc};
65+
use yatp::queue::priority::TaskPriorityProvider;
6566
use yatp::task::future::TaskCell;
6667

6768
fn yield_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, yield_count: usize) {
@@ -94,6 +95,19 @@ mod yatp_future {
9495
let pool = yatp::Builder::new("yield_many").build_multilevel_future_pool();
9596
yield_many(b, pool, yield_count)
9697
}
98+
99+
pub fn yield_many_priority(b: &mut Bencher<'_>, yield_count: usize) {
100+
struct ConstantPriorityPrivider;
101+
impl TaskPriorityProvider for ConstantPriorityPrivider {
102+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
103+
// return a constant value so the queue workes the same as FIFO queue.
104+
0
105+
}
106+
}
107+
let pool = yatp::Builder::new("yield_count")
108+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
109+
spawn_many(b, pool, yield_count)
110+
}
97111
}
98112

99113
mod tokio {
@@ -167,6 +181,9 @@ pub fn yield_many(b: &mut Criterion) {
167181
i,
168182
|b, i| yatp_future::yield_many_multilevel(b, *i),
169183
);
184+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
185+
yatp_future::yield_many_priority(b, *i)
186+
});
170187
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
171188
tokio::yield_many(b, *i)
172189
});

src/queue/extras.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub struct Extras {
2828
pub(crate) fixed_level: Option<u8>,
2929
/// Number of execute times
3030
pub(crate) exec_times: u32,
31-
/// Extra metadata of this task. User can use this field to store arbitrary data. It is useful
31+
/// Extra metadata of this task. User can use this field to store arbitrary data. It is useful
3232
/// in some case to implement more complext `TaskPriorityProvider` in the priority task queue.
3333
pub(crate) metadata: Vec<u8>,
3434
}

src/queue/priority.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,13 @@ where
5959
}
6060
}
6161

62-
/// The local queue is just a proxy of the global queue.
63-
pub(crate) struct LocalQueue<T> {
64-
queue: Arc<QueueCore<T>>,
65-
task_manager: PriorityTaskManager,
66-
}
62+
/// priority queue does not have local queue, all tasks are always put in the global queue.
63+
pub(crate) type LocalQueue<T> = TaskInjector<T>;
6764

6865
impl<T> LocalQueue<T>
6966
where
7067
T: TaskCell + Send,
7168
{
72-
pub(super) fn push(&mut self, mut task_cell: T) {
73-
let priority = self.task_manager.prepare_before_push(&mut task_cell);
74-
set_schedule_time(&mut task_cell);
75-
self.queue.push(task_cell, priority);
76-
}
77-
7869
pub(super) fn pop(&mut self) -> Option<Pop<T>> {
7970
self.queue.pop()
8071
}
@@ -88,7 +79,7 @@ where
8879
pub trait TaskPriorityProvider: Send + Sync + 'static {
8980
/// Return a priority value of this task, all tasks in the priority
9081
/// queue is ordered by this value.
91-
fn get_priority(&self, extras: &Extras) -> u64;
82+
fn priority_of(&self, extras: &Extras) -> u64;
9283
}
9384

9485
#[derive(Clone)]
@@ -103,7 +94,7 @@ impl PriorityTaskManager {
10394
T: TaskCell,
10495
{
10596
self.level_manager.adjust_task_level(task_cell);
106-
self.priority_manager.get_priority(task_cell.mut_extras())
97+
self.priority_manager.priority_of(task_cell.mut_extras())
10798
}
10899
}
109100

@@ -415,7 +406,7 @@ mod tests {
415406
struct OrderByIdProvider;
416407

417408
impl TaskPriorityProvider for OrderByIdProvider {
418-
fn get_priority(&self, extras: &Extras) -> u64 {
409+
fn priority_of(&self, extras: &Extras) -> u64 {
419410
return extras.task_id();
420411
}
421412
}

0 commit comments

Comments
 (0)