Skip to content

Commit 5142a60

Browse files
jbrhawkw
andauthored
feat(subscriber) add an environment-configured console_subscriber::init (console-rs#53)
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
1 parent b345084 commit 5142a60

File tree

7 files changed

+155
-20
lines changed

7 files changed

+155
-20
lines changed

console-subscriber/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@ tonic = { version = "0.4", features = ["transport"] }
1414
console-api = { path = "../console-api", features = ["transport"]}
1515
tracing-core = "0.1.18"
1616
tracing = "0.1.26"
17-
tracing-subscriber = { version = "0.2.17", default-features = false, features = ["fmt", "registry"] }
17+
tracing-subscriber = { version = "0.2.17", default-features = false, features = ["fmt", "registry", "env-filter"] }
1818
futures = { version = "0.3", default-features = false }
1919

2020
[dev-dependencies]
2121

2222
tokio = { version = "^1.7", features = ["full", "rt-multi-thread"]}
2323
futures = "0.3"
24-
25-
tracing-subscriber = { version = "0.2.17", features = ["fmt", "registry", "env-filter"] }

console-subscriber/examples/app.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,14 @@
11
use std::time::Duration;
2-
use tracing_subscriber::prelude::*;
32

43
#[tokio::main]
54
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
6-
let (layer, server) = console_subscriber::TasksLayer::builder()
7-
.retention(Duration::from_secs(60))
8-
.build();
9-
let filter =
10-
tracing_subscriber::EnvFilter::from_default_env().add_directive("tokio=trace".parse()?);
11-
tracing_subscriber::registry()
12-
.with(tracing_subscriber::fmt::layer())
13-
.with(filter)
14-
.with(layer)
15-
.init();
5+
console_subscriber::init();
166

17-
let serve = tokio::spawn(async move { server.serve().await.expect("server failed") });
187
let task1 = tokio::spawn(spawn_tasks(1, 10));
198
let task2 = tokio::spawn(spawn_tasks(10, 100));
209
let result = tokio::try_join! {
2110
task1,
2211
task2,
23-
serve
2412
};
2513
result?;
2614

console-subscriber/src/aggregator.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use tokio::sync::{mpsc, Notify};
55
use futures::FutureExt;
66
use std::{
77
collections::HashMap,
8-
mem,
98
ops::{Deref, DerefMut},
109
sync::{
1110
atomic::{AtomicBool, Ordering::*},
@@ -201,7 +200,7 @@ impl Aggregator {
201200
fn publish(&mut self) {
202201
let new_metadata = if !self.new_metadata.is_empty() {
203202
Some(proto::RegisterMetadata {
204-
metadata: mem::replace(&mut self.new_metadata, Vec::new()),
203+
metadata: std::mem::take(&mut self.new_metadata),
205204
})
206205
} else {
207206
None

console-subscriber/src/builder.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use super::{Server, TasksLayer};
2-
use std::{net::SocketAddr, time::Duration};
2+
use std::{
3+
net::{SocketAddr, ToSocketAddrs},
4+
time::Duration,
5+
};
36

47
/// Builder for configuring [`TasksLayer`]s.
58
#[derive(Clone, Debug)]
@@ -101,4 +104,39 @@ impl Builder {
101104
pub fn build(self) -> (TasksLayer, Server) {
102105
TasksLayer::build(self)
103106
}
107+
108+
/// Configures this builder from a standard set of environment variables:
109+
///
110+
/// | **Environment Variable** | **Purpose** | **Default Value** |
111+
/// |-------------------------------------|---------------------------------------------------------------------------|-------------------|
112+
/// | `TOKIO_CONSOLE_RETENTION_SECS` | The number of seconds to accumulate completed tracing data | 3600s (1h) |
113+
/// | `TOKIO_CONSOLE_BIND` | a HOST:PORT description, such as `localhost:1234` | `127.0.0.1:6669` |
114+
/// | `TOKIO_CONSOLE_PUBLISH_INTERVAL_MS` | The number of milliseconds to wait between sending updates to the console | 1000ms (1s) |
115+
pub fn with_default_env(mut self) -> Self {
116+
if let Ok(retention) = std::env::var("TOKIO_CONSOLE_RETENTION_SECS") {
117+
self.retention = Duration::from_secs(
118+
retention
119+
.parse()
120+
.expect("TOKIO_CONSOLE_RETENTION_SECS must be an integer"),
121+
);
122+
}
123+
124+
if let Ok(bind) = std::env::var("TOKIO_CONSOLE_BIND") {
125+
self.server_addr = bind
126+
.to_socket_addrs()
127+
.expect("TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321")
128+
.next()
129+
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND");
130+
}
131+
132+
if let Ok(interval) = std::env::var("TOKIO_CONSOLE_PUBLISH_INTERVAL_MS") {
133+
self.publish_interval = Duration::from_millis(
134+
interval
135+
.parse()
136+
.expect("TOKIO_CONSOLE_PUBLISH_INTERVAL_MS must be an integer"),
137+
);
138+
}
139+
140+
self
141+
}
104142
}

console-subscriber/src/init.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use crate::TasksLayer;
2+
use std::thread;
3+
use tokio::runtime;
4+
use tracing_subscriber::{fmt, layer::Layered, prelude::*, EnvFilter, Registry};
5+
6+
type ConsoleSubscriberLayer = Layered<TasksLayer, Layered<EnvFilter, Registry>>;
7+
8+
/// Starts the console subscriber server on its own thread.
9+
///
10+
/// This function represents the easiest way to get started using
11+
/// tokio-console.
12+
///
13+
/// **Note**: this function sets the [default `tracing` subscriber][default]
14+
/// for your application. If you need to add additional layers to this
15+
/// subscriber, see [`build`].
16+
///
17+
/// [default]: https://docs.rs/tracing/latest/tracing/dispatcher/index.html#setting-the-default-subscriber
18+
///
19+
/// ## Configuration
20+
///
21+
/// Tokio console subscriber is configured with sensible defaults for most
22+
/// use cases. If you need to tune these parameters, several environmental
23+
/// configuration variables are available:
24+
///
25+
/// | **Environment Variable** | **Purpose** | **Default Value** |
26+
/// |-------------------------------------|---------------------------------------------------------------------------|-------------------|
27+
/// | `TOKIO_CONSOLE_RETENTION_SECS` | The number of seconds to accumulate completed tracing data | 3600s (1h) |
28+
/// | `TOKIO_CONSOLE_BIND` | A HOST:PORT description, such as `localhost:1234` | `127.0.0.1:6669` |
29+
/// | `TOKIO_CONSOLE_PUBLISH_INTERVAL_MS` | The number of milliseconds to wait between sending updates to the console | 1000ms (1s) |
30+
/// | `RUST_LOG` | Configure the tracing filter. See [`EnvFilter`] for further information | `tokio=trace` |
31+
///
32+
/// ## Further customization
33+
///
34+
/// To add additional layers or replace the format layer, replace
35+
/// `console_subscriber::init` with:
36+
///
37+
/// ```rust
38+
/// use tracing_subscriber::prelude::*;
39+
/// console_subscriber::build()
40+
/// .with(tracing_subscriber::fmt::layer())
41+
/// // .with(..potential additional layer..)
42+
/// .init();
43+
/// ```
44+
45+
pub fn init() {
46+
build().with(fmt::layer()).init()
47+
}
48+
49+
/// Returns a new `tracing` [subscriber] configured with a [`TasksLayer`]
50+
/// and a [filter] that enables the spans and events required by the console.
51+
///
52+
/// Unlike [`init`], this function does not set the default subscriber, allowing
53+
/// additional [`Layer`s] to be added.
54+
///
55+
/// [subscriber]: https://docs.rs/tracing/latest/tracing/subscriber/trait.Subscriber.html
56+
/// [filter]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html
57+
///
58+
/// ## Configuration
59+
///
60+
/// `console_subscriber::build` supports all of the environmental
61+
/// configuration described at [`console_subscriber::init`][init]
62+
///
63+
/// ## Differences from `init`
64+
///
65+
/// **Note**: In order to support customizing the format `build` does
66+
/// not attach a [`tracing_subscriber::fmt::layer`], unlike [`init`].
67+
///
68+
/// Additionally, you must call
69+
/// [`init`][tracing_subscriber::util::SubscriberInitExt::init] on the
70+
/// final layer in order to register the subscriber.
71+
///
72+
/// ## Examples
73+
///
74+
/// ```rust
75+
/// use tracing_subscriber::prelude::*;
76+
/// console_subscriber::build()
77+
/// .with(tracing_subscriber::fmt::layer())
78+
/// // .with(...)
79+
/// .init();
80+
/// ```
81+
82+
#[must_use = "build() without init() will not set the default tracing subscriber"]
83+
pub fn build() -> ConsoleSubscriberLayer {
84+
let (layer, server) = TasksLayer::builder().with_default_env().build();
85+
86+
let filter = EnvFilter::from_default_env().add_directive("tokio=trace".parse().unwrap());
87+
88+
let console_subscriber = tracing_subscriber::registry().with(filter).with(layer);
89+
90+
thread::Builder::new()
91+
.name("console_subscriber".into())
92+
.spawn(move || {
93+
let runtime = runtime::Builder::new_current_thread()
94+
.enable_io()
95+
.enable_time()
96+
.build()
97+
.expect("console subscriber runtime initialization failed");
98+
99+
runtime.block_on(async move {
100+
server
101+
.serve()
102+
.await
103+
.expect("console subscriber server failed")
104+
});
105+
})
106+
.expect("console subscriber could not spawn thread");
107+
108+
console_subscriber
109+
}

console-subscriber/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ use aggregator::Aggregator;
2323
mod builder;
2424
pub use builder::Builder;
2525

26+
mod init;
27+
pub use init::{build, init};
28+
2629
pub struct TasksLayer {
2730
task_meta: AtomicPtr<Metadata<'static>>,
2831
blocking_meta: AtomicPtr<Metadata<'static>>,

console/src/tasks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ impl State {
198198

199199
impl Task {
200200
pub(crate) fn kind(&self) -> &str {
201-
&self.kind
201+
self.kind
202202
}
203203

204204
pub(crate) fn id_hex(&self) -> &str {

0 commit comments

Comments
 (0)