Skip to content

feat: add heap profiling to dfdaemon #427

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 107 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions dragonfly-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,8 @@ tokio-rustls = "0.25.0-alpha.4"
http-body-util = "0.1.0"
futures-util = "0.3.30"
termion = "3.0.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { version = "0.5.4", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }

[target.'cfg(target_os = "linux")'.dependencies]
jemalloc_pprof = "0.1.0"
8 changes: 8 additions & 0 deletions dragonfly-client/src/bin/dfdaemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{error, info, Level};

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";

#[derive(Debug, Parser)]
#[command(
name = dfdaemon::NAME,
Expand Down
86 changes: 55 additions & 31 deletions dragonfly-client/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ const DEFAULT_PROFILER_SECONDS: u64 = 10;
// DEFAULT_PROFILER_FREQUENCY is the default frequency to start profiling.
const DEFAULT_PROFILER_FREQUENCY: i32 = 1000;

// PProfQueryParams is the query params to start profiling.
// PProfProfileQueryParams is the query params to start profiling.
#[derive(Deserialize, Serialize)]
#[serde(default)]
pub struct PProfQueryParams {
pub struct PProfProfileQueryParams {
// seconds is the seconds to start profiling.
pub seconds: u64,

// frequency is the frequency to start profiling.
pub frequency: i32,
}

// PProfQueryParams implements the default.
impl Default for PProfQueryParams {
// PProfProfileQueryParams implements the default.
impl Default for PProfProfileQueryParams {
fn default() -> Self {
Self {
seconds: DEFAULT_PROFILER_SECONDS,
Expand Down Expand Up @@ -84,16 +84,24 @@ impl Stats {
// Clone the shutdown channel.
let mut shutdown = self.shutdown.clone();

// Create the stats route.
let stats_route = warp::path!("debug" / "pprof" / "profile")
// Create the pprof profile route.
let pprof_profile_route = warp::path!("debug" / "pprof" / "profile")
.and(warp::get())
.and(warp::query::<PProfQueryParams>())
.and_then(Self::stats_handler);
.and(warp::query::<PProfProfileQueryParams>())
.and_then(Self::pprof_profile_handler);

// Create the pprof heap route.
let pprof_heap_route = warp::path!("debug" / "pprof" / "heap")
.and(warp::get())
.and_then(Self::pprof_heap_handler);

// Create the pprof routes.
let pprof_routes = pprof_profile_route.or(pprof_heap_route);

// Start the stats server and wait for it to finish.
info!("stats server listening on {}", self.addr);
tokio::select! {
_ = warp::serve(stats_route).run(self.addr) => {
_ = warp::serve(pprof_routes).run(self.addr) => {
// Stats server ended.
info!("stats server ended");
}
Expand All @@ -105,35 +113,29 @@ impl Stats {
}

// stats_handler handles the stats request.
async fn stats_handler(query_params: PProfQueryParams) -> Result<impl Reply, Rejection> {
async fn pprof_profile_handler(
query_params: PProfProfileQueryParams,
) -> Result<impl Reply, Rejection> {
info!(
"start profiling for {} seconds with {} frequency",
query_params.seconds, query_params.frequency
);
let guard = match ProfilerGuard::new(query_params.frequency) {
Ok(guard) => guard,
Err(err) => {
error!("failed to create profiler guard: {}", err);
return Err(warp::reject::reject());
}
};

let guard = ProfilerGuard::new(query_params.frequency).map_err(|err| {
error!("failed to create profiler guard: {}", err);
warp::reject::reject()
})?;

tokio::time::sleep(Duration::from_secs(query_params.seconds)).await;
let report = match guard.report().build() {
Ok(report) => report,
Err(err) => {
error!("failed to build profiler report: {}", err);
return Err(warp::reject::reject());
}
};
let report = guard.report().build().map_err(|err| {
error!("failed to build profiler report: {}", err);
warp::reject::reject()
})?;

let profile = match report.pprof() {
Ok(profile) => profile,
Err(err) => {
error!("failed to get pprof profile: {}", err);
return Err(warp::reject::reject());
}
};
let profile = report.pprof().map_err(|err| {
error!("failed to get pprof profile: {}", err);
warp::reject::reject()
})?;

let mut body: Vec<u8> = Vec::new();
profile.write_to_vec(&mut body).map_err(|err| {
Expand All @@ -143,4 +145,26 @@ impl Stats {

Ok(body)
}

// pprof_heap_handler handles the pprof heap request.
async fn pprof_heap_handler() -> Result<impl Reply, Rejection> {
info!("start heap profiling");
#[cfg(target_os = "linux")]
{
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await;
if !prof_ctl.activated() {
return Err(warp::reject::reject());
}

let pprof = prof_ctl.dump_pprof().map_err(|err| {
error!("failed to dump pprof: {}", err);
warp::reject::reject()
})?;

Ok(pprof)
}

#[cfg(not(target_os = "linux"))]
Err::<warp::http::Error, Rejection>(warp::reject::reject())
}
}