Skip to content

Commit bcf431a

Browse files
authored
queue: support priority queue (#72)
Signed-off-by: glorv <glorvs@163.com>
1 parent 0bdf475 commit bcf431a

11 files changed

+664
-35
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ repository = "https://github.com/tikv/yatp/"
1010

1111
[dependencies]
1212
crossbeam-deque = "0.8"
13+
crossbeam-skiplist = "0.1"
14+
crossbeam-utils = "0.8"
1315
dashmap = "5.1"
1416
fail = "0.5"
1517
lazy_static = "1"

benches/chained_spawn.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ mod yatp_callback {
3535

3636
mod yatp_future {
3737
use criterion::*;
38-
use std::sync::mpsc;
38+
use std::sync::{mpsc, Arc};
39+
use yatp::queue::priority::TaskPriorityProvider;
3940
use yatp::task::future::TaskCell;
4041
use yatp::Remote;
4142

@@ -73,6 +74,20 @@ mod yatp_future {
7374
let pool = yatp::Builder::new("chained_spawn").build_multilevel_future_pool();
7475
chained_spawn(b, pool, iter_count)
7576
}
77+
78+
pub fn chained_spawn_priority(b: &mut Bencher<'_>, iter_count: usize) {
79+
struct ConstantPriorityPrivider;
80+
81+
impl TaskPriorityProvider for ConstantPriorityPrivider {
82+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
83+
// return a constant value so the queue workes the same as FIFO queue.
84+
0
85+
}
86+
}
87+
let pool = yatp::Builder::new("chained_spawn")
88+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
89+
chained_spawn(b, pool, iter_count)
90+
}
7691
}
7792

7893
mod tokio {
@@ -153,6 +168,9 @@ pub fn chained_spawn(b: &mut Criterion) {
153168
i,
154169
|b, i| yatp_future::chained_spawn_multilevel(b, *i),
155170
);
171+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
172+
yatp_future::chained_spawn_priority(b, *i)
173+
});
156174
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
157175
tokio::chained_spawn(b, *i)
158176
});

benches/ping_pong.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ mod yatp_future {
5353
use std::sync::atomic::*;
5454
use std::sync::*;
5555
use tokio::sync::oneshot;
56+
use yatp::queue::priority::TaskPriorityProvider;
5657
use yatp::task::future::TaskCell;
5758

5859
fn ping_pong(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, ping_count: usize) {
@@ -105,6 +106,19 @@ mod yatp_future {
105106
let pool = yatp::Builder::new("ping_pong").build_multilevel_future_pool();
106107
ping_pong(b, pool, ping_count)
107108
}
109+
110+
pub fn ping_pong_priority(b: &mut Bencher<'_>, ping_count: usize) {
111+
struct ConstantPriorityPrivider;
112+
impl TaskPriorityProvider for ConstantPriorityPrivider {
113+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
114+
// return a constant value so the queue workes the same as FIFO queue.
115+
0
116+
}
117+
}
118+
let pool = yatp::Builder::new("ping_pong")
119+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
120+
ping_pong(b, pool, ping_count)
121+
}
108122
}
109123

110124
mod tokio {
@@ -219,6 +233,9 @@ pub fn ping_pong(b: &mut Criterion) {
219233
i,
220234
|b, i| yatp_future::ping_pong_multilevel(b, *i),
221235
);
236+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
237+
yatp_future::ping_pong_priority(b, *i)
238+
});
222239
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
223240
tokio::ping_pong(b, *i)
224241
});

benches/spawn_many.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ mod yatp_future {
3535
use criterion::*;
3636
use std::sync::atomic::*;
3737
use std::sync::*;
38+
use yatp::queue::priority::TaskPriorityProvider;
3839
use yatp::task::future::TaskCell;
3940

4041
fn spawn_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, spawn_count: usize) {
@@ -68,6 +69,19 @@ mod yatp_future {
6869
let pool = yatp::Builder::new("spawn_many").build_multilevel_future_pool();
6970
spawn_many(b, pool, spawn_count)
7071
}
72+
73+
pub fn spawn_many_priority(b: &mut Bencher<'_>, spawn_count: usize) {
74+
struct ConstantPriorityPrivider;
75+
impl TaskPriorityProvider for ConstantPriorityPrivider {
76+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
77+
// return a constant value so the queue workes the same as FIFO queue.
78+
0
79+
}
80+
}
81+
let pool = yatp::Builder::new("spawn_many")
82+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
83+
spawn_many(b, pool, spawn_count)
84+
}
7185
}
7286

7387
mod threadpool {
@@ -174,6 +188,9 @@ pub fn spawn_many(b: &mut Criterion) {
174188
i,
175189
|b, i| yatp_future::spawn_many_multilevel(b, *i),
176190
);
191+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
192+
yatp_future::spawn_many_priority(b, *i)
193+
});
177194
group.bench_with_input(BenchmarkId::new("threadpool", i), i, |b, i| {
178195
threadpool::spawn_many(b, *i)
179196
});

benches/yield_many.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ mod yatp_callback {
6161

6262
mod yatp_future {
6363
use criterion::*;
64-
use std::sync::mpsc;
64+
use std::sync::{mpsc, Arc};
65+
use yatp::queue::priority::TaskPriorityProvider;
6566
use yatp::task::future::TaskCell;
6667

6768
fn yield_many(b: &mut Bencher<'_>, pool: yatp::ThreadPool<TaskCell>, yield_count: usize) {
@@ -94,6 +95,19 @@ mod yatp_future {
9495
let pool = yatp::Builder::new("yield_many").build_multilevel_future_pool();
9596
yield_many(b, pool, yield_count)
9697
}
98+
99+
pub fn yield_many_priority(b: &mut Bencher<'_>, yield_count: usize) {
100+
struct ConstantPriorityPrivider;
101+
impl TaskPriorityProvider for ConstantPriorityPrivider {
102+
fn priority_of(&self, _extras: &yatp::queue::Extras) -> u64 {
103+
// return a constant value so the queue workes the same as FIFO queue.
104+
0
105+
}
106+
}
107+
let pool = yatp::Builder::new("yield_many")
108+
.build_priority_future_pool(Arc::new(ConstantPriorityPrivider));
109+
yield_many(b, pool, yield_count)
110+
}
97111
}
98112

99113
mod tokio {
@@ -167,6 +181,9 @@ pub fn yield_many(b: &mut Criterion) {
167181
i,
168182
|b, i| yatp_future::yield_many_multilevel(b, *i),
169183
);
184+
group.bench_with_input(BenchmarkId::new("yatp::future::priority", i), i, |b, i| {
185+
yatp_future::yield_many_priority(b, *i)
186+
});
170187
group.bench_with_input(BenchmarkId::new("tokio", i), i, |b, i| {
171188
tokio::yield_many(b, *i)
172189
});

src/pool.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
88
mod builder;
99
mod runner;
10-
mod spawn;
10+
pub(crate) mod spawn;
1111
mod worker;
1212

1313
pub use self::builder::{Builder, SchedConfig};

src/pool/builder.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use crate::pool::spawn::QueueCore;
44
use crate::pool::worker::WorkerThread;
55
use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool};
6-
use crate::queue::{self, multilevel, LocalQueue, QueueType, TaskCell};
6+
use crate::queue::{self, multilevel, priority, LocalQueue, QueueType, TaskCell};
77
use crate::task::{callback, future};
88
use std::sync::{
99
atomic::{AtomicUsize, Ordering},
@@ -290,6 +290,19 @@ impl Builder {
290290
self.build_with_queue_and_runner(QueueType::Multilevel(queue_builder), runner_builder)
291291
}
292292

293+
/// Spawn a priority future pool.
294+
///
295+
/// It setups the pool with priority queue. Caller must provide a `TaskPriorityProvider` implementation to generate the proper priority value for each spawned task.
296+
pub fn build_priority_future_pool(
297+
&self,
298+
priority_provider: Arc<dyn priority::TaskPriorityProvider>,
299+
) -> ThreadPool<future::TaskCell> {
300+
let fb = CloneRunnerBuilder(future::Runner::default());
301+
let queue_builder = priority::Builder::new(priority::Config::default(), priority_provider);
302+
let runner_builder = queue_builder.runner_builder(fb);
303+
self.build_with_queue_and_runner(QueueType::Priority(queue_builder), runner_builder)
304+
}
305+
293306
/// Spawns the thread pool immediately.
294307
///
295308
/// `queue_builder` is a closure that creates a task queue. It accepts the

src/queue.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
//! data structs.
1010
1111
pub mod multilevel;
12+
pub mod priority;
1213

1314
mod extras;
1415
mod single_level;
@@ -18,7 +19,7 @@ pub use self::extras::Extras;
1819
use std::time::Instant;
1920

2021
/// A cell containing a task and needed extra information.
21-
pub trait TaskCell {
22+
pub trait TaskCell: 'static {
2223
/// Gets mutable extra information.
2324
fn mut_extras(&mut self) -> &mut Extras;
2425
}
@@ -42,6 +43,7 @@ pub(crate) struct TaskInjector<T>(InjectorInner<T>);
4243
enum InjectorInner<T> {
4344
SingleLevel(single_level::TaskInjector<T>),
4445
Multilevel(multilevel::TaskInjector<T>),
46+
Priority(priority::TaskInjector<T>),
4547
}
4648

4749
impl<T: TaskCell + Send> TaskInjector<T> {
@@ -50,13 +52,15 @@ impl<T: TaskCell + Send> TaskInjector<T> {
5052
match &self.0 {
5153
InjectorInner::SingleLevel(q) => q.push(task_cell),
5254
InjectorInner::Multilevel(q) => q.push(task_cell),
55+
InjectorInner::Priority(q) => q.push(task_cell),
5356
}
5457
}
5558

5659
pub fn default_extras(&self) -> Extras {
5760
match self.0 {
5861
InjectorInner::SingleLevel(_) => Extras::single_level(),
5962
InjectorInner::Multilevel(_) => Extras::multilevel_default(),
63+
InjectorInner::Priority(_) => Extras::single_level(),
6064
}
6165
}
6266
}
@@ -80,6 +84,7 @@ pub(crate) struct LocalQueue<T>(LocalQueueInner<T>);
8084
enum LocalQueueInner<T> {
8185
SingleLevel(single_level::LocalQueue<T>),
8286
Multilevel(multilevel::LocalQueue<T>),
87+
Priority(priority::LocalQueue<T>),
8388
}
8489

8590
impl<T: TaskCell + Send> LocalQueue<T> {
@@ -88,6 +93,7 @@ impl<T: TaskCell + Send> LocalQueue<T> {
8893
match &mut self.0 {
8994
LocalQueueInner::SingleLevel(q) => q.push(task_cell),
9095
LocalQueueInner::Multilevel(q) => q.push(task_cell),
96+
LocalQueueInner::Priority(q) => q.push(task_cell),
9197
}
9298
}
9399

@@ -97,13 +103,15 @@ impl<T: TaskCell + Send> LocalQueue<T> {
97103
match &mut self.0 {
98104
LocalQueueInner::SingleLevel(q) => q.pop(),
99105
LocalQueueInner::Multilevel(q) => q.pop(),
106+
LocalQueueInner::Priority(q) => q.pop(),
100107
}
101108
}
102109

103110
pub fn default_extras(&self) -> Extras {
104111
match self.0 {
105112
LocalQueueInner::SingleLevel(_) => Extras::single_level(),
106113
LocalQueueInner::Multilevel(_) => Extras::multilevel_default(),
114+
LocalQueueInner::Priority(_) => Extras::single_level(),
107115
}
108116
}
109117

@@ -113,6 +121,7 @@ impl<T: TaskCell + Send> LocalQueue<T> {
113121
match &mut self.0 {
114122
LocalQueueInner::SingleLevel(q) => q.has_tasks_or_pull(),
115123
LocalQueueInner::Multilevel(q) => q.has_tasks_or_pull(),
124+
LocalQueueInner::Priority(q) => q.has_tasks_or_pull(),
116125
}
117126
}
118127
}
@@ -125,6 +134,8 @@ pub enum QueueType {
125134
///
126135
/// More to see: https://en.wikipedia.org/wiki/Multilevel_feedback_queue.
127136
Multilevel(multilevel::Builder),
137+
/// A concurrent prioirty queue.
138+
Priority(priority::Builder),
128139
}
129140

130141
impl Default for QueueType {
@@ -139,10 +150,17 @@ impl From<multilevel::Builder> for QueueType {
139150
}
140151
}
141152

153+
impl From<priority::Builder> for QueueType {
154+
fn from(b: priority::Builder) -> QueueType {
155+
QueueType::Priority(b)
156+
}
157+
}
158+
142159
pub(crate) fn build<T>(ty: QueueType, local_num: usize) -> (TaskInjector<T>, Vec<LocalQueue<T>>) {
143160
match ty {
144161
QueueType::SingleLevel => single_level(local_num),
145162
QueueType::Multilevel(b) => b.build(local_num),
163+
QueueType::Priority(b) => b.build(local_num),
146164
}
147165
}
148166

src/queue/extras.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ pub struct Extras {
2828
pub(crate) fixed_level: Option<u8>,
2929
/// Number of execute times
3030
pub(crate) exec_times: u32,
31+
/// Extra metadata of this task. User can use this field to store arbitrary data. It is useful
32+
/// in some case to implement more complext `TaskPriorityProvider` in the priority task queue.
33+
pub(crate) metadata: Vec<u8>,
3134
}
3235

3336
impl Extras {
@@ -43,6 +46,7 @@ impl Extras {
4346
current_level: 0,
4447
fixed_level: None,
4548
exec_times: 0,
49+
metadata: Vec::new(),
4650
}
4751
}
4852

@@ -62,9 +66,10 @@ impl Extras {
6266
task_id,
6367
running_time: Some(Arc::new(ElapsedTime::default())),
6468
total_running_time: None,
65-
current_level: 0,
69+
current_level: fixed_level.unwrap_or(0),
6670
fixed_level,
6771
exec_times: 0,
72+
metadata: Vec::new(),
6873
}
6974
}
7075

@@ -89,4 +94,19 @@ impl Extras {
8994
pub fn current_level(&self) -> u8 {
9095
self.current_level
9196
}
97+
98+
/// Gets the metadata of this task.
99+
pub fn metadata(&self) -> &[u8] {
100+
&self.metadata
101+
}
102+
103+
/// Gets the mutable metadata of this task.
104+
pub fn metadata_mut(&mut self) -> &mut Vec<u8> {
105+
&mut self.metadata
106+
}
107+
108+
/// Set the metadata of this task.
109+
pub fn set_metadata(&mut self, metadata: Vec<u8>) {
110+
self.metadata = metadata;
111+
}
92112
}

0 commit comments

Comments
 (0)