diff --git a/hyperactor/src/init.rs b/hyperactor/src/init.rs index 4884b654..43a114fe 100644 --- a/hyperactor/src/init.rs +++ b/hyperactor/src/init.rs @@ -31,12 +31,24 @@ pub(crate) fn get_runtime() -> tokio::runtime::Handle { /// - Initialize logging defaults. /// - Store the provided tokio runtime handle for use by the hyperactor system. pub fn initialize(handle: tokio::runtime::Handle) { + initialize_with_log_prefix(handle, Option::None); +} + +/// Initialize the Hyperactor runtime. Specifically: +/// - Set up panic handling, so that we get consistent panic stack traces in Actors. +/// - Initialize logging defaults. +/// - Store the provided tokio runtime handle for use by the hyperactor system. +/// - Set the env var whose value should be used to prefix log messages. +pub fn initialize_with_log_prefix( + handle: tokio::runtime::Handle, + env_var_log_prefix: Option, +) { RUNTIME .set(handle) .expect("hyperactor::initialize must only be called once"); panic_handler::set_panic_hook(); - hyperactor_telemetry::initialize_logging(ClockKind::default()); + hyperactor_telemetry::initialize_logging(ClockKind::default(), env_var_log_prefix); } /// Initialize the Hyperactor runtime using the current tokio runtime handle. diff --git a/hyperactor/src/lib.rs b/hyperactor/src/lib.rs index 149efd81..876a529e 100644 --- a/hyperactor/src/lib.rs +++ b/hyperactor/src/lib.rs @@ -137,6 +137,8 @@ pub use hyperactor_telemetry::kv_pairs; pub use init::initialize; #[doc(inline)] pub use init::initialize_with_current_runtime; +#[doc(inline)] +pub use init::initialize_with_log_prefix; // Re-exported to make this available to callers of the `register!` macro. #[doc(hidden)] pub use inventory::submit; diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index a0914e64..899deebb 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -26,8 +26,8 @@ use signal_hook::consts::signal::SIGTERM; use crate::proc_mesh::mesh_agent::MeshAgent; -pub(crate) const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR"; -pub(crate) const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX"; +pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR"; +pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX"; /// A channel used by each process to receive its own stdout and stderr /// Because stdout and stderr can only be obtained by the parent process, /// they need to be streamed back to the process. diff --git a/hyperactor_telemetry/src/lib.rs b/hyperactor_telemetry/src/lib.rs index ba3fad91..07e57eb0 100644 --- a/hyperactor_telemetry/src/lib.rs +++ b/hyperactor_telemetry/src/lib.rs @@ -26,6 +26,10 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS"; /// Set to "1" to disable the recorder output. pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING"; +/// Environment variable to additionally enable stderr logging in Local environment. +/// Set to "1" to also dump logs to stderr when running in Local environment. +pub const MONARCH_DEBUG: &str = "MONARCH_DEBUG"; + #[cfg(fbcode_build)] mod meta; mod otel; @@ -57,6 +61,11 @@ use tracing_subscriber::Layer; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::Targets; use tracing_subscriber::fmt; +use tracing_subscriber::fmt::FormatEvent; +use tracing_subscriber::fmt::FormatFields; +use tracing_subscriber::fmt::format::Format; +use tracing_subscriber::fmt::format::Writer; +use tracing_subscriber::registry::LookupSpan; use crate::recorder::Recorder; @@ -77,27 +86,30 @@ impl TelemetryClock for DefaultTelemetryClock { } } -// Need to keep this around so that the tracing subscriber doesn't drop the writer. +fn file_appender(dir: &str, filename: &str) -> Box { + match RollingFileAppender::builder() + .rotation(Rotation::DAILY) + .filename_prefix(filename) + .filename_suffix("log") + .build(dir) + { + Ok(file) => Box::new(file), + Err(e) => { + tracing::warn!("unable to create custom log file: {}", e); + Box::new(std::io::stderr()) + } + } +} + +fn writer() -> Box { + match env::Env::current() { + env::Env::Test | env::Env::MastEmulator => Box::new(std::io::stderr()), + env::Env::Local => file_appender("/tmp/", "monarch_log"), + env::Env::Mast => file_appender("/logs/", "dedicated_log_monarch"), + } +} + lazy_static! { - static ref FILE_WRITER_GUARD: Arc<(NonBlocking, WorkerGuard)> = { - let writer: Box = match RollingFileAppender::builder() - .rotation(Rotation::DAILY) - .filename_prefix("dedicated_log_monarch") - .filename_suffix("log") - .build("/logs/") - { - Ok(file) => Box::new(file), - Err(e) => { - tracing::warn!("unable to create custom log file: {}", e); - Box::new(std::io::stderr()) - } - }; - return Arc::new( - tracing_appender::non_blocking::NonBlockingBuilder::default() - .lossy(false) - .finish(writer), - ); - }; static ref TELEMETRY_CLOCK: Arc>> = { Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {}))) }; } @@ -406,6 +418,60 @@ macro_rules! declare_static_histogram { }; } +static FILE_WRITER_GUARD: std::sync::OnceLock> = + std::sync::OnceLock::new(); + +/// A custom formatter that prepends prefix from env_var to log messages. +/// Uses either Glog formatter or default formatter based on environment. +struct PrefixedFormatter { + formatter: FormatterType, + prefix_env_var: Option, +} + +enum FormatterType { + Glog(Glog), + Default(Format), +} + +impl PrefixedFormatter { + fn new(formatter: FormatterType, prefix_env_var: Option) -> Self { + Self { + formatter, + prefix_env_var, + } + } +} + +impl FormatEvent for PrefixedFormatter +where + S: tracing::Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>, + mut writer: Writer<'_>, + event: &tracing::Event<'_>, + ) -> std::fmt::Result { + let prefix: String = if self.prefix_env_var.is_some() { + std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default() + } else { + "".to_string() + }; + + if prefix.is_empty() { + write!(writer, "[-]")?; + } else { + write!(writer, "[{}]", prefix)?; + } + + match &self.formatter { + FormatterType::Glog(inner) => inner.format_event(ctx, writer, event), + FormatterType::Default(inner) => inner.format_event(ctx, writer, event), + } + } +} + /// Set up logging based on the given execution environment. We specialize logging based on how the /// logs are consumed. The destination scuba table is specialized based on the execution environment. /// mast -> monarch_tracing/prod @@ -414,7 +480,10 @@ macro_rules! declare_static_histogram { /// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so /// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging() /// to get this behavior. -pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) { +pub fn initialize_logging( + clock: impl TelemetryClock + Send + 'static, + prefix_env_var: Option, +) { swap_telemetry_clock(clock); let file_log_level = match env::Env::current() { env::Env::Local => "info", @@ -422,10 +491,22 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) { env::Env::Mast => "info", env::Env::Test => "debug", }; - let file_writer: &NonBlocking = &FILE_WRITER_GUARD.0; + let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default() + .lossy(false) + .finish(writer()); + let writer_guard = Arc::new((non_blocking, guard)); + let _ = FILE_WRITER_GUARD.set(writer_guard.clone()); + + let file_formatter = match env::Env::current() { + env::Env::Mast => FormatterType::Glog(Glog::default().with_timer(LocalTime::default())), + _ => FormatterType::Default(Format::default().without_time().with_target(false)), + }; let file_layer = fmt::Layer::default() - .with_writer(file_writer.clone()) - .event_format(Glog::default().with_timer(LocalTime::default())) + .with_writer(writer_guard.0.clone()) + .event_format(PrefixedFormatter::new( + file_formatter, + prefix_env_var.clone(), + )) .fmt_fields(GlogFields::default().compact()) .with_ansi(false) .with_filter( @@ -447,7 +528,10 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) { }; let stderr_layer = fmt::Layer::default() .with_writer(std::io::stderr) - .event_format(Glog::default().with_timer(LocalTime::default())) + .event_format(PrefixedFormatter::new( + FormatterType::Default(Format::default().without_time().with_target(false)), + prefix_env_var, + )) .fmt_fields(GlogFields::default().compact()) .with_ansi(std::io::stderr().is_terminal()) .with_filter( @@ -478,13 +562,17 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) { } else { None }) + .with(if is_layer_enabled(MONARCH_DEBUG) { + Some(stderr_layer) + } else { + None + }) .with(if is_layer_enabled(DISABLE_RECORDER_TRACING) { Some(recorder().layer()) } else { None }) .with(file_layer) - .with(stderr_layer) .try_init() { tracing::debug!("logging already initialized for this process: {}", err); diff --git a/monarch_extension/src/lib.rs b/monarch_extension/src/lib.rs index b4621d7d..4f94c662 100644 --- a/monarch_extension/src/lib.rs +++ b/monarch_extension/src/lib.rs @@ -67,7 +67,10 @@ fn get_or_add_new_module<'py>( pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> { monarch_hyperactor::runtime::initialize(module.py())?; let runtime = monarch_hyperactor::runtime::get_tokio_runtime(); - ::hyperactor::initialize(runtime.handle().clone()); + ::hyperactor::initialize_with_log_prefix( + runtime.handle().clone(), + Some(::hyperactor_mesh::bootstrap::BOOTSTRAP_INDEX_ENV.to_string()), + ); monarch_hyperactor::shape::register_python_bindings(&get_or_add_new_module( module,