Skip to content

Commit 368d0d5

Browse files
pablorfb-metafacebook-github-bot
authored andcommitted
Split Actor logs in their own files (#495)
Summary: Pull Request resolved: #495 - Add per proc suffix to file logger - Gate STDERR logging with flag Differential Revision: D78101221
1 parent 442a810 commit 368d0d5

File tree

5 files changed

+135
-30
lines changed

5 files changed

+135
-30
lines changed

hyperactor/src/init.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,24 @@ pub(crate) fn get_runtime() -> tokio::runtime::Handle {
3131
/// - Initialize logging defaults.
3232
/// - Store the provided tokio runtime handle for use by the hyperactor system.
3333
pub fn initialize(handle: tokio::runtime::Handle) {
34+
initialize_with_log_prefix(handle, Option::None);
35+
}
36+
37+
/// Initialize the Hyperactor runtime. Specifically:
38+
/// - Set up panic handling, so that we get consistent panic stack traces in Actors.
39+
/// - Initialize logging defaults.
40+
/// - Store the provided tokio runtime handle for use by the hyperactor system.
41+
/// - Set the env var whose value should be used to prefix log messages.
42+
pub fn initialize_with_log_prefix(
43+
handle: tokio::runtime::Handle,
44+
env_var_log_prefix: Option<String>,
45+
) {
3446
RUNTIME
3547
.set(handle)
3648
.expect("hyperactor::initialize must only be called once");
3749

3850
panic_handler::set_panic_hook();
39-
hyperactor_telemetry::initialize_logging(ClockKind::default());
51+
hyperactor_telemetry::initialize_logging(ClockKind::default(), env_var_log_prefix);
4052
}
4153

4254
/// Initialize the Hyperactor runtime using the current tokio runtime handle.

hyperactor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ pub use hyperactor_telemetry::kv_pairs;
137137
pub use init::initialize;
138138
#[doc(inline)]
139139
pub use init::initialize_with_current_runtime;
140+
#[doc(inline)]
141+
pub use init::initialize_with_log_prefix;
140142
// Re-exported to make this available to callers of the `register!` macro.
141143
#[doc(hidden)]
142144
pub use inventory::submit;

hyperactor_mesh/src/bootstrap.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use signal_hook::consts::signal::SIGTERM;
2626

2727
use crate::proc_mesh::mesh_agent::MeshAgent;
2828

29-
pub(crate) const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
30-
pub(crate) const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
29+
pub const BOOTSTRAP_ADDR_ENV: &str = "HYPERACTOR_MESH_BOOTSTRAP_ADDR";
30+
pub const BOOTSTRAP_INDEX_ENV: &str = "HYPERACTOR_MESH_INDEX";
3131
/// A channel used by each process to receive its own stdout and stderr
3232
/// Because stdout and stderr can only be obtained by the parent process,
3333
/// they need to be streamed back to the process.

hyperactor_telemetry/src/lib.rs

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
2626
/// Set to "1" to disable the recorder output.
2727
pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
2828

29+
/// Environment variable to additionally enable stderr logging in Local environment.
30+
/// Set to "1" to also dump logs to stderr when running in Local environment.
31+
pub const MONARCH_DEBUG: &str = "MONARCH_DEBUG";
32+
2933
#[cfg(fbcode_build)]
3034
mod meta;
3135
mod otel;
@@ -57,6 +61,11 @@ use tracing_subscriber::Layer;
5761
use tracing_subscriber::filter::LevelFilter;
5862
use tracing_subscriber::filter::Targets;
5963
use tracing_subscriber::fmt;
64+
use tracing_subscriber::fmt::FormatEvent;
65+
use tracing_subscriber::fmt::FormatFields;
66+
use tracing_subscriber::fmt::format::Format;
67+
use tracing_subscriber::fmt::format::Writer;
68+
use tracing_subscriber::registry::LookupSpan;
6069

6170
use crate::recorder::Recorder;
6271

@@ -77,27 +86,30 @@ impl TelemetryClock for DefaultTelemetryClock {
7786
}
7887
}
7988

80-
// Need to keep this around so that the tracing subscriber doesn't drop the writer.
89+
fn file_appender(dir: &str, filename: &str) -> Box<dyn Write + Send> {
90+
match RollingFileAppender::builder()
91+
.rotation(Rotation::DAILY)
92+
.filename_prefix(filename)
93+
.filename_suffix("log")
94+
.build(dir)
95+
{
96+
Ok(file) => Box::new(file),
97+
Err(e) => {
98+
tracing::warn!("unable to create custom log file: {}", e);
99+
Box::new(std::io::stderr())
100+
}
101+
}
102+
}
103+
104+
fn writer() -> Box<dyn Write + Send> {
105+
match env::Env::current() {
106+
env::Env::Test | env::Env::MastEmulator => Box::new(std::io::stderr()),
107+
env::Env::Local => file_appender("/tmp/", &format!("monarch_log")),
108+
env::Env::Mast => file_appender("/logs/", &format!("dedicated_log_monarch")),
109+
}
110+
}
111+
81112
lazy_static! {
82-
static ref FILE_WRITER_GUARD: Arc<(NonBlocking, WorkerGuard)> = {
83-
let writer: Box<dyn Write + Send> = match RollingFileAppender::builder()
84-
.rotation(Rotation::DAILY)
85-
.filename_prefix("dedicated_log_monarch")
86-
.filename_suffix("log")
87-
.build("/logs/")
88-
{
89-
Ok(file) => Box::new(file),
90-
Err(e) => {
91-
tracing::warn!("unable to create custom log file: {}", e);
92-
Box::new(std::io::stderr())
93-
}
94-
};
95-
return Arc::new(
96-
tracing_appender::non_blocking::NonBlockingBuilder::default()
97-
.lossy(false)
98-
.finish(writer),
99-
);
100-
};
101113
static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
102114
{ Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {}))) };
103115
}
@@ -406,6 +418,60 @@ macro_rules! declare_static_histogram {
406418
};
407419
}
408420

421+
static FILE_WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
422+
std::sync::OnceLock::new();
423+
424+
/// A custom formatter that prepends prefix from env_var to log messages.
425+
/// Uses either Glog formatter or default formatter based on environment.
426+
struct PrefixedFormatter {
427+
formatter: FormatterType,
428+
prefix_env_var: Option<String>,
429+
}
430+
431+
enum FormatterType {
432+
Glog(Glog<LocalTime>),
433+
Default(Format<tracing_subscriber::fmt::format::Full, ()>),
434+
}
435+
436+
impl PrefixedFormatter {
437+
fn new(formatter: FormatterType, prefix_env_var: Option<String>) -> Self {
438+
Self {
439+
formatter,
440+
prefix_env_var,
441+
}
442+
}
443+
}
444+
445+
impl<S, N> FormatEvent<S, N> for PrefixedFormatter
446+
where
447+
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
448+
N: for<'a> FormatFields<'a> + 'static,
449+
{
450+
fn format_event(
451+
&self,
452+
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
453+
mut writer: Writer<'_>,
454+
event: &tracing::Event<'_>,
455+
) -> std::fmt::Result {
456+
let prefix: String = if self.prefix_env_var.is_some() {
457+
std::env::var(self.prefix_env_var.clone().unwrap()).unwrap_or_default()
458+
} else {
459+
"".to_string()
460+
};
461+
462+
if prefix.is_empty() {
463+
write!(writer, "[-]")?;
464+
} else {
465+
write!(writer, "[{}]", prefix)?;
466+
}
467+
468+
match &self.formatter {
469+
FormatterType::Glog(inner) => inner.format_event(ctx, writer, event),
470+
FormatterType::Default(inner) => inner.format_event(ctx, writer, event),
471+
}
472+
}
473+
}
474+
409475
/// Set up logging based on the given execution environment. We specialize logging based on how the
410476
/// logs are consumed. The destination scuba table is specialized based on the execution environment.
411477
/// mast -> monarch_tracing/prod
@@ -414,18 +480,33 @@ macro_rules! declare_static_histogram {
414480
/// scuba logging won't normally be enabled for a unit test unless we are specifically testing logging, so
415481
/// you don't need to worry about your tests being flakey due to scuba logging. You have to manually call initialize_logging()
416482
/// to get this behavior.
417-
pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
483+
pub fn initialize_logging(
484+
clock: impl TelemetryClock + Send + 'static,
485+
prefix_env_var: Option<String>,
486+
) {
418487
swap_telemetry_clock(clock);
419488
let file_log_level = match env::Env::current() {
420489
env::Env::Local => "info",
421490
env::Env::MastEmulator => "info",
422491
env::Env::Mast => "info",
423492
env::Env::Test => "debug",
424493
};
425-
let file_writer: &NonBlocking = &FILE_WRITER_GUARD.0;
494+
let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
495+
.lossy(false)
496+
.finish(writer());
497+
let writer_guard = Arc::new((non_blocking, guard));
498+
let _ = FILE_WRITER_GUARD.set(writer_guard.clone());
499+
500+
let file_formatter = match env::Env::current() {
501+
env::Env::Mast => FormatterType::Glog(Glog::default().with_timer(LocalTime::default())),
502+
_ => FormatterType::Default(Format::default().without_time().with_target(false)),
503+
};
426504
let file_layer = fmt::Layer::default()
427-
.with_writer(file_writer.clone())
428-
.event_format(Glog::default().with_timer(LocalTime::default()))
505+
.with_writer(writer_guard.0.clone())
506+
.event_format(PrefixedFormatter::new(
507+
file_formatter,
508+
prefix_env_var.clone(),
509+
))
429510
.fmt_fields(GlogFields::default().compact())
430511
.with_ansi(false)
431512
.with_filter(
@@ -447,7 +528,10 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
447528
};
448529
let stderr_layer = fmt::Layer::default()
449530
.with_writer(std::io::stderr)
450-
.event_format(Glog::default().with_timer(LocalTime::default()))
531+
.event_format(PrefixedFormatter::new(
532+
FormatterType::Default(Format::default().without_time().with_target(false)),
533+
prefix_env_var,
534+
))
451535
.fmt_fields(GlogFields::default().compact())
452536
.with_ansi(std::io::stderr().is_terminal())
453537
.with_filter(
@@ -478,13 +562,17 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
478562
} else {
479563
None
480564
})
565+
.with(if is_layer_enabled(MONARCH_DEBUG) {
566+
Some(stderr_layer)
567+
} else {
568+
None
569+
})
481570
.with(if is_layer_enabled(DISABLE_RECORDER_TRACING) {
482571
Some(recorder().layer())
483572
} else {
484573
None
485574
})
486575
.with(file_layer)
487-
.with(stderr_layer)
488576
.try_init()
489577
{
490578
tracing::debug!("logging already initialized for this process: {}", err);

monarch_extension/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ fn get_or_add_new_module<'py>(
6767
pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
6868
monarch_hyperactor::runtime::initialize(module.py())?;
6969
let runtime = monarch_hyperactor::runtime::get_tokio_runtime();
70-
::hyperactor::initialize(runtime.handle().clone());
70+
::hyperactor::initialize_with_log_prefix(
71+
runtime.handle().clone(),
72+
Some(::hyperactor_mesh::bootstrap::BOOTSTRAP_INDEX_ENV.to_string()),
73+
);
7174

7275
monarch_hyperactor::shape::register_python_bindings(&get_or_add_new_module(
7376
module,

0 commit comments

Comments
 (0)