Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
823 changes: 428 additions & 395 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ version = "0.2.0"
edition = "2024"

[dependencies]
libc = "0.2.170"
anyhow = { version = "1.0.99", features = ["backtrace"] }
libc = "0.2.175"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
# reqwest instead of libcurl, rustls instead of expecting openssl, the rest are Sentry features we want
sentry = { version = "0.36.0", default-features = false, features = ["tracing", "reqwest", "rustls", "backtrace", "panic"] }
sentry-tracing = "0.36.0"
uuid = { version = "1.15.0", features = ["v4"] }
serde = { version = "1.0.218", features = ["derive"] }
serde_json = "1.0.140"
sentry = { version = "0.42.0", default-features = false, features = ["tracing", "reqwest", "rustls", "backtrace", "panic"] }
sentry-tracing = "0.42.0"
uuid = { version = "1.18.1", features = ["v4"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.143"

[dev-dependencies]
mockall = "0.13.1"
tempfile = "3.19.0"
tempfile = "3.22.0"

[profile.release]
opt-level = 3
Expand Down
27 changes: 22 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,33 @@
Acolyte is a lightweight resource monitoring tool designed to collect statistics in containerized environments,
particularly Kubernetes.

Acolyte monitors CPU, memory, and GPU utilization and writes the data to JSON files for easy consumption by other
services. It's designed to run alongside your application in the same container and built with compatibility in mind.
Acolyte monitors CPU, memory, and GPU utilization and writes the data as JSON to rotated files or standard output for
easy consumption by other services.
It's designed to run alongside your application in the same container and built with compatibility in mind.

Acolyte is configured through environment variables:
Acolyte is configured through environment variables – by default, you don't necessarily have to configure anything.

### General

* `RUST_LOG`: log level e.g. debug; default: info
* `ACOLYTE_STATS_DIR`: directory where stat files are written; default: /tmp/acolyte/stats
* `ACOLYTE_NO_RESTART`: if set, Acolyte will not restart itself if it encounters an error; default: false

### Collection

* `ACOLYTE_STAT_INTERVAL_MS`: interval between stats collection in milliseconds; default: 5000
* `ACOLYTE_MAX_STATS_ENTRIES`: maximum number of stat files to keep; default: 12
* `ACOLYTE_CPU_SAMPLE_RATE_MS`: sample window for CPU usage in milliseconds; default: 100

### Output

* `ACOLYTE_OUTPUT_MODE`: `dir` (default): write to files in `ACOLYTE_STATS_DIR`, `stdout`: write to standard output

#### Stats directory

* `ACOLYTE_STATS_DIR`: directory where stat files are written; default: /tmp/acolyte/stats
* `ACOLYTE_MAX_STATS_ENTRIES`: maximum number of stat files to keep; default: 12

### Sentry

* `SENTRY_DSN`: optional Sentry DSN for error reporting
* `CLUSTER_NAME`: optional cluster identification for Sentry

Expand Down
100 changes: 100 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use crate::consts::ID_ENV_VAR;
use std::env;
use std::path::PathBuf;
use std::time::Duration;
use uuid::Uuid;

pub struct JsonlToStdoutConfig {
pub prefix: String,
}

pub struct StatsDirConfig {
pub dir: PathBuf,
pub max_stats_entries: usize,
}

pub enum OutputMode {
JsonlToStdout(JsonlToStdoutConfig),
StatsDir(StatsDirConfig),
}
pub struct Config {
pub sentry_dsn: Option<String>,
pub acolyte_id: Uuid,
pub cpu_sample_interval: Duration,
pub stat_interval: Duration,
pub cluster_name: String,
pub output_mode: OutputMode,
}

impl Config {
pub fn from_env() -> anyhow::Result<Self> {
Ok(Config {
sentry_dsn: get_sentry_dsn(),
acolyte_id: get_or_create_acolyte_id(),
cpu_sample_interval: get_cpu_sample_interval(),
stat_interval: get_stat_interval(),
output_mode: get_output_mode()?,
cluster_name: get_cluster_name(),
})
}
}

fn get_output_mode() -> anyhow::Result<OutputMode> {
let output_mode = env::var("ACOLYTE_OUTPUT_MODE").ok();
match output_mode.as_deref() {
Some("stdout") => {
let prefix = env::var("ACOLYTE_OUTPUT_PREFIX").unwrap_or_else(|_| "".to_string());
Ok(OutputMode::JsonlToStdout(JsonlToStdoutConfig { prefix }))
}
Some("dir") | None => Ok(OutputMode::StatsDir(StatsDirConfig {
dir: get_stats_dir(),
max_stats_entries: get_max_stats_entries(),
})),
Some(other) => Err(anyhow::anyhow!("Invalid ACOLYTE_OUTPUT_MODE: {other}.")),
}
}

fn get_sentry_dsn() -> Option<String> {
env::var("SENTRY_DSN").ok()
}

fn get_or_create_acolyte_id() -> Uuid {
match env::var(ID_ENV_VAR) {
Ok(id) => Uuid::parse_str(&id).unwrap_or_else(|_| Uuid::new_v4()),
Err(_) => Uuid::new_v4(),
}
}

fn get_stat_interval() -> Duration {
let secs = env::var("ACOLYTE_STAT_INTERVAL_MS")
.ok()
.and_then(|val| val.parse::<u64>().ok())
.unwrap_or(5000);
Duration::from_millis(secs)
}

fn get_cpu_sample_interval() -> Duration {
let ms = env::var("ACOLYTE_CPU_SAMPLE_RATE_MS")
.ok()
.and_then(|val| val.parse::<u64>().ok())
// 100 ms seems like a common interval to sample CPU usage
.unwrap_or(100);
Duration::from_millis(ms)
}

fn get_stats_dir() -> PathBuf {
env::var("ACOLYTE_STATS_DIR")
.unwrap_or_else(|_| "/tmp/acolyte/stats".to_string())
.into()
}

fn get_max_stats_entries() -> usize {
env::var("ACOLYTE_MAX_STATS_ENTRIES")
.unwrap_or_else(|_| "12".to_string())
.parse::<usize>()
.unwrap_or(12)
}

fn get_cluster_name() -> String {
env::var("CLUSTER_NAME").unwrap_or_else(|_| "Unknown".to_string())
}
3 changes: 3 additions & 0 deletions src/consts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub const ID_ENV_VAR: &str = "ACOLYTE_ID";
pub const MAX_RUN_ATTEMPTS: u8 = 5;
pub const RESTART_DELAY_SECS: u64 = 10;
9 changes: 7 additions & 2 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::path::PathBuf;
use std::time::Duration;
use uuid::Uuid;

pub const MAX_RUN_ATTEMPTS: u8 = 5;
pub const RESTART_DELAY_SECS: u64 = 10;
const NO_RESTART_ENV_VAR: &str = "ACOLYTE_NO_RESTART";

pub const RESTART_ENV_VAR: &str = "ACOLYTE_RESTART";
pub const ID_ENV_VAR: &str = "ACOLYTE_ID";
Expand All @@ -17,6 +16,12 @@ pub fn get_cluster_name() -> String {
env::var("CLUSTER_NAME").unwrap_or_else(|_| "Unknown".to_string())
}

pub fn is_no_restart() -> bool {
env::var(NO_RESTART_ENV_VAR)
.map(|v| !v.is_empty())
.unwrap_or(false)
}

pub fn get_restart_count() -> u8 {
env::var(RESTART_ENV_VAR)
.ok()
Expand Down
14 changes: 7 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod env;
pub mod config;
pub mod consts;
pub mod stats;
pub mod store;
pub mod utils;

use crate::config::Config;
use crate::stats::cgroup_v1::CgroupV1Source;
use crate::stats::cgroup_v2::CgroupV2Source;
use crate::stats::proc::ProcSource;
Expand All @@ -14,9 +16,7 @@ use std::path::PathBuf;
use std::thread;
use tracing::{debug, error};

pub fn run_acolyte() {
let stat_interval = env::get_stat_interval();

pub fn run_acolyte(config: &Config) {
let sources = get_sources();

loop {
Expand All @@ -28,7 +28,7 @@ pub fn run_acolyte() {

if let Some(cpu_usage) = sources
.iter()
.find_map(|source| source.get_cpu_usage().ok())
.find_map(|source| source.get_cpu_usage(config.cpu_sample_interval).ok())
{
stats_entry.cpu_usage = cpu_usage.normalize(stats_entry.num_cpus);
}
Expand All @@ -55,11 +55,11 @@ pub fn run_acolyte() {
}

debug!("New stats entry: {:?}", stats_entry);
if let Err(e) = store::write_stats_entry(stats_entry) {
if let Err(e) = store::write_stats_entry(stats_entry, config) {
error!("Failed to write stats entry: {}", e);
}

thread::sleep(stat_interval);
thread::sleep(config.stat_interval);
}
}

Expand Down
67 changes: 46 additions & 21 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,88 @@
use acolyte::env;
use acolyte::config::Config;
use acolyte::consts::{ID_ENV_VAR, MAX_RUN_ATTEMPTS, RESTART_DELAY_SECS};
use anyhow::Context;
use libc::{SIG_IGN, SIGHUP};
use std::time::Duration;
use std::{os::unix::process::CommandExt, panic, process, thread};
use std::{env, os::unix::process::CommandExt, panic, process, thread};
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
use uuid::Uuid;

const NO_RESTART_ENV_VAR: &str = "ACOLYTE_NO_RESTART";
const RESTART_ENV_VAR: &str = "ACOLYTE_RESTART";
fn is_no_restart() -> bool {
env::var(NO_RESTART_ENV_VAR)
.map(|v| !v.is_empty())
.unwrap_or(false)
}

fn get_restart_count() -> u8 {
env::var(RESTART_ENV_VAR)
.ok()
.and_then(|count_str| count_str.parse::<u8>().ok())
.unwrap_or(0)
}

fn main() {
nohup();

init_logging();

let acolyte_id = env::get_or_create_acolyte_id();
let sentry_guard = init_sentry(&acolyte_id);
let config = Config::from_env().context("Failed to load config").unwrap();
let sentry_guard = init_sentry(&config);
if sentry_guard.is_some() {
info!("Sentry initialized");
} else {
warn!("Sentry NOT initialized");
}

let restart_count = env::get_restart_count();
if is_no_restart() {
info!("No-restart mode enabled; running Acolyte without restart logic");
acolyte::run_acolyte(&config);
process::exit(0);
} else {
run_with_restart(&config);
}
}

fn run_with_restart(config: &Config) {
let restart_count = get_restart_count();
if restart_count > 0 {
info!(
"Restarting Acolyte - waiting {} seconds (attempt {}/{})",
env::RESTART_DELAY_SECS,
RESTART_DELAY_SECS,
restart_count + 1,
env::MAX_RUN_ATTEMPTS
MAX_RUN_ATTEMPTS
);
thread::sleep(Duration::from_secs(env::RESTART_DELAY_SECS));
thread::sleep(Duration::from_secs(RESTART_DELAY_SECS));
}

info!(
"Starting Acolyte {} (attempt {}/{})",
acolyte_id,
config.acolyte_id,
restart_count + 1,
env::MAX_RUN_ATTEMPTS
MAX_RUN_ATTEMPTS
);

let run_result = panic::catch_unwind(acolyte::run_acolyte);
let run_result = panic::catch_unwind(|| acolyte::run_acolyte(config));
if run_result.is_ok() {
process::exit(0);
} else {
let next_count = restart_count + 1;

if next_count >= env::MAX_RUN_ATTEMPTS {
if next_count >= MAX_RUN_ATTEMPTS {
error!(
"Maximum run attempts ({}) reached after crash. Exiting.",
env::MAX_RUN_ATTEMPTS
MAX_RUN_ATTEMPTS
);
process::exit(1);
}

warn!("Acolyte crashed. Executing new process...");
let current_exe = std::env::current_exe().expect("Failed to get current executable path");
let err = process::Command::new(current_exe)
.env(env::RESTART_ENV_VAR, next_count.to_string())
.env(env::ID_ENV_VAR, acolyte_id.to_string())
.env(RESTART_ENV_VAR, next_count.to_string())
.env(ID_ENV_VAR, config.acolyte_id.to_string())
.args(std::env::args().skip(1))
.exec();

Expand All @@ -82,9 +108,9 @@ fn init_logging() {
.init();
}

fn init_sentry(acolyte_id: &Uuid) -> Option<sentry::ClientInitGuard> {
let dsn = env::get_sentry_dsn()?;
fn init_sentry(config: &Config) -> Option<sentry::ClientInitGuard> {
let release = sentry::release_name!();
let dsn = config.sentry_dsn.clone();
let guard = sentry::init((
dsn,
sentry::ClientOptions {
Expand All @@ -94,9 +120,8 @@ fn init_sentry(acolyte_id: &Uuid) -> Option<sentry::ClientInitGuard> {
));

sentry::configure_scope(|scope| {
scope.set_tag("acolyte_id", acolyte_id);
let cluster_name = env::get_cluster_name();
scope.set_tag("cluster.name", cluster_name);
scope.set_tag("acolyte_id", config.acolyte_id);
scope.set_tag("cluster.name", &config.cluster_name);
});

Some(guard)
Expand Down
Loading
Loading