Skip to content

Commit 4749c18

Browse files
Update
1 parent d8f51d1 commit 4749c18

File tree

2 files changed

+287
-0
lines changed

2 files changed

+287
-0
lines changed
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
mod metrics;
2+
3+
use futures::channel::mpsc::Sender;
4+
use futures::prelude::*;
5+
use slog::{debug, o, trace};
6+
use std::sync::Weak;
7+
use tokio::runtime::Runtime;
8+
9+
/// A wrapper over a runtime handle which can spawn async and blocking tasks.
10+
#[derive(Clone)]
11+
pub struct TaskExecutor {
12+
/// The handle to the runtime on which tasks are spawned
13+
runtime: Weak<Runtime>,
14+
/// The receiver exit future which on receiving shuts down the task
15+
exit: exit_future::Exit,
16+
/// Sender given to tasks, so that if they encounter a state in which execution cannot
17+
/// continue they can request that everything shuts down.
18+
///
19+
/// The task must provide a reason for shutting down.
20+
signal_tx: Sender<&'static str>,
21+
22+
log: slog::Logger,
23+
}
24+
25+
impl TaskExecutor {
26+
/// Create a new task executor.
27+
///
28+
/// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from
29+
/// a [`RuntimeContext`](struct.RuntimeContext.html)
30+
pub fn new(
31+
runtime: Weak<Runtime>,
32+
exit: exit_future::Exit,
33+
log: slog::Logger,
34+
signal_tx: Sender<&'static str>,
35+
) -> Self {
36+
Self {
37+
runtime,
38+
exit,
39+
signal_tx,
40+
log,
41+
}
42+
}
43+
44+
/// Clones the task executor adding a service name.
45+
pub fn clone_with_name(&self, service_name: String) -> Self {
46+
TaskExecutor {
47+
runtime: self.runtime.clone(),
48+
exit: self.exit.clone(),
49+
signal_tx: self.signal_tx.clone(),
50+
log: self.log.new(o!("service" => service_name)),
51+
}
52+
}
53+
54+
/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit`. The task is canceled
55+
/// when the corresponding exit_future `Signal` is fired/dropped.
56+
///
57+
/// This function generates prometheus metrics on number of tasks and task duration.
58+
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, name: &'static str) {
59+
let exit = self.exit.clone();
60+
let log = self.log.clone();
61+
62+
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
63+
// Task is shutdown before it completes if `exit` receives
64+
let int_gauge_1 = int_gauge.clone();
65+
let future = future::select(Box::pin(task), exit).then(move |either| {
66+
match either {
67+
future::Either::Left(_) => trace!(log, "Async task completed"; "task" => name),
68+
future::Either::Right(_) => {
69+
debug!(log, "Async task shutdown, exit received"; "task" => name)
70+
}
71+
}
72+
int_gauge_1.dec();
73+
futures::future::ready(())
74+
});
75+
76+
int_gauge.inc();
77+
if let Some(runtime) = self.runtime.upgrade() {
78+
runtime.spawn(future);
79+
} else {
80+
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
81+
}
82+
}
83+
}
84+
85+
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit`
86+
/// like [spawn](#method.spawn).
87+
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
88+
/// ensure that the task gets canceled appropriately.
89+
/// This function generates prometheus metrics on number of tasks and task duration.
90+
///
91+
/// This is useful in cases where the future to be spawned needs to do additional cleanup work when
92+
/// the task is completed/canceled (e.g. writing local variables to disk) or the task is created from
93+
/// some framework which does its own cleanup (e.g. a hyper server).
94+
pub fn spawn_without_exit(
95+
&self,
96+
task: impl Future<Output = ()> + Send + 'static,
97+
name: &'static str,
98+
) {
99+
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
100+
let int_gauge_1 = int_gauge.clone();
101+
let future = task.then(move |_| {
102+
int_gauge_1.dec();
103+
futures::future::ready(())
104+
});
105+
106+
int_gauge.inc();
107+
if let Some(runtime) = self.runtime.upgrade() {
108+
runtime.spawn(future);
109+
} else {
110+
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
111+
}
112+
}
113+
}
114+
115+
/// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future.
116+
/// This function generates prometheus metrics on number of tasks and task duration.
117+
pub fn spawn_blocking<F>(&self, task: F, name: &'static str)
118+
where
119+
F: FnOnce() + Send + 'static,
120+
{
121+
let log = self.log.clone();
122+
123+
if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) {
124+
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::BLOCKING_TASKS_COUNT, &[name])
125+
{
126+
let int_gauge_1 = int_gauge.clone();
127+
let timer = metric.start_timer();
128+
let join_handle = if let Some(runtime) = self.runtime.upgrade() {
129+
runtime.spawn_blocking(task)
130+
} else {
131+
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
132+
return;
133+
};
134+
135+
let future = async move {
136+
match join_handle.await {
137+
Ok(_) => trace!(log, "Blocking task completed"; "task" => name),
138+
Err(e) => debug!(log, "Blocking task failed"; "error" => %e),
139+
};
140+
timer.observe_duration();
141+
int_gauge_1.dec();
142+
};
143+
144+
int_gauge.inc();
145+
if let Some(runtime) = self.runtime.upgrade() {
146+
runtime.spawn(future);
147+
} else {
148+
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
149+
}
150+
}
151+
}
152+
}
153+
/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional
154+
/// join handle to the future.
155+
/// The task is canceled when the corresponding exit_future `Signal` is fired/dropped.
156+
///
157+
/// This function generates prometheus metrics on number of tasks and task duration.
158+
pub fn spawn_handle<R: Send + 'static>(
159+
&self,
160+
task: impl Future<Output = R> + Send + 'static,
161+
name: &'static str,
162+
) -> Option<tokio::task::JoinHandle<Option<R>>> {
163+
let exit = self.exit.clone();
164+
let log = self.log.clone();
165+
166+
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
167+
// Task is shutdown before it completes if `exit` receives
168+
let int_gauge_1 = int_gauge.clone();
169+
let future = future::select(Box::pin(task), exit).then(move |either| {
170+
let result = match either {
171+
future::Either::Left((task, _)) => {
172+
trace!(log, "Async task completed"; "task" => name);
173+
Some(task)
174+
}
175+
future::Either::Right(_) => {
176+
debug!(log, "Async task shutdown, exit received"; "task" => name);
177+
None
178+
}
179+
};
180+
int_gauge_1.dec();
181+
futures::future::ready(result)
182+
});
183+
184+
int_gauge.inc();
185+
if let Some(runtime) = self.runtime.upgrade() {
186+
Some(runtime.spawn(future))
187+
} else {
188+
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
189+
None
190+
}
191+
} else {
192+
None
193+
}
194+
}
195+
196+
/// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future returning
197+
/// a join handle to the future.
198+
/// If the runtime doesn't exist, this will return None.
199+
/// The Future returned behaves like the standard JoinHandle which can return an error if the
200+
/// task failed.
201+
/// This function generates prometheus metrics on number of tasks and task duration.
202+
pub fn spawn_blocking_handle<F, R>(
203+
&self,
204+
task: F,
205+
name: &'static str,
206+
) -> Option<impl Future<Output = Result<R, tokio::task::JoinError>>>
207+
where
208+
F: FnOnce() -> R + Send + 'static,
209+
R: Send + 'static,
210+
{
211+
let log = self.log.clone();
212+
213+
if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) {
214+
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::BLOCKING_TASKS_COUNT, &[name])
215+
{
216+
let int_gauge_1 = int_gauge;
217+
let timer = metric.start_timer();
218+
let join_handle = if let Some(runtime) = self.runtime.upgrade() {
219+
runtime.spawn_blocking(task)
220+
} else {
221+
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
222+
return None;
223+
};
224+
225+
Some(async move {
226+
let result = match join_handle.await {
227+
Ok(result) => {
228+
trace!(log, "Blocking task completed"; "task" => name);
229+
Ok(result)
230+
}
231+
Err(e) => {
232+
debug!(log, "Blocking task ended unexpectedly"; "error" => %e);
233+
Err(e)
234+
}
235+
};
236+
timer.observe_duration();
237+
int_gauge_1.dec();
238+
result
239+
})
240+
} else {
241+
None
242+
}
243+
} else {
244+
None
245+
}
246+
}
247+
248+
pub fn runtime(&self) -> Weak<Runtime> {
249+
self.runtime.clone()
250+
}
251+
252+
/// Returns a copy of the `exit_future::Exit`.
253+
pub fn exit(&self) -> exit_future::Exit {
254+
self.exit.clone()
255+
}
256+
257+
/// Get a channel to request shutting down.
258+
pub fn shutdown_sender(&self) -> Sender<&'static str> {
259+
self.signal_tx.clone()
260+
}
261+
262+
/// Returns a reference to the logger.
263+
pub fn log(&self) -> &slog::Logger {
264+
&self.log
265+
}
266+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/// Handles async task metrics
2+
use lazy_static::lazy_static;
3+
pub use lighthouse_metrics::*;
4+
5+
lazy_static! {
6+
pub static ref ASYNC_TASKS_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
7+
"async_tasks_count",
8+
"Total number of async tasks spawned using spawn",
9+
&["async_task_count"]
10+
);
11+
pub static ref BLOCKING_TASKS_COUNT: Result<IntGaugeVec> = try_create_int_gauge_vec(
12+
"blocking_tasks_count",
13+
"Total number of async tasks spawned using spawn_blocking",
14+
&["blocking_task_count"]
15+
);
16+
pub static ref BLOCKING_TASKS_HISTOGRAM: Result<HistogramVec> = try_create_histogram_vec(
17+
"blocking_tasks_histogram",
18+
"Time taken by blocking tasks",
19+
&["blocking_task_hist"]
20+
);
21+
}

0 commit comments

Comments
 (0)