Skip to content

Commit c0fb40b

Browse files
pablorfb-metafacebook-github-bot
authored andcommitted
Split Actor logs in their own files (#495)
Summary: - Each actor type will create a new file suffixed with the actor name - Each log will contain the proc and rank id of the actor. This will simplify filtering to logs of a single actor - Logs not coming from an actor will be dumped in a common file suffixed 'monarch' - Additionally stream all logs to stdout when `MONARCH_STDOUT=1` TODO: - Support Python Actors Differential Revision: D78101221
1 parent 985ed47 commit c0fb40b

File tree

2 files changed

+234
-30
lines changed

2 files changed

+234
-30
lines changed

hyperactor/src/proc.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,14 @@ impl<A: Actor> Instance<A> {
919919
}
920920

921921
async fn serve(mut self, mut actor: A) {
922-
let result = self.run_actor_tree(&mut actor).await;
922+
let actor_id = self.cell.actor_id().clone();
923+
let filename = actor_id.name().to_string();
924+
let prefix = format!("{}", actor_id);
925+
926+
let result = hyperactor_telemetry::with_task_scoped_logging(&filename, &prefix, || async {
927+
self.run_actor_tree(&mut actor).await
928+
})
929+
.await;
923930

924931
let actor_status = match result {
925932
Ok(_) => ActorStatus::Stopped,
@@ -1026,7 +1033,7 @@ impl<A: Actor> Instance<A> {
10261033
/// Initialize and run the actor until it fails or is stopped.
10271034
async fn run(&mut self, actor: &mut A) -> Result<(), ActorError> {
10281035
hyperactor_telemetry::declare_static_counter!(MESSAGES_RECEIVED, "actor.messages_received");
1029-
tracing::debug!("entering actor loop: {}", self.self_id());
1036+
tracing::debug!("entering actor loop for {}", self.cell.actor_id());
10301037

10311038
self.change_status(ActorStatus::Initializing);
10321039
actor

hyperactor_telemetry/src/lib.rs

Lines changed: 225 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,17 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
3030
/// Set to "1" to disable the recorder output.
3131
pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
3232

33+
/// Environment variable to additionally enable stderr logging in Local environment.
34+
/// Set to "1" to also dump logs to stderr when running in Local environment.
35+
pub const MONARCH_DEBUG: &str = "MONARCH_DEBUG";
36+
3337
#[cfg(fbcode_build)]
3438
mod meta;
3539
mod otel;
3640
mod pool;
3741
pub mod recorder;
3842
mod spool;
43+
use std::cell::RefCell;
3944
use std::io::IsTerminal;
4045
use std::io::Write;
4146
use std::str::FromStr;
@@ -61,6 +66,12 @@ use tracing_subscriber::Layer;
6166
use tracing_subscriber::filter::LevelFilter;
6267
use tracing_subscriber::filter::Targets;
6368
use tracing_subscriber::fmt;
69+
use tracing_subscriber::fmt::FormatEvent;
70+
use tracing_subscriber::fmt::FormatFields;
71+
use tracing_subscriber::fmt::MakeWriter;
72+
use tracing_subscriber::fmt::format::Format;
73+
use tracing_subscriber::fmt::format::Writer;
74+
use tracing_subscriber::registry::LookupSpan;
6475

6576
use crate::recorder::Recorder;
6677

@@ -81,32 +92,30 @@ impl TelemetryClock for DefaultTelemetryClock {
8192
}
8293
}
8394

84-
// Need to keep this around so that the tracing subscriber doesn't drop the writer.
95+
fn file_appender(dir: &str, filename: &str) -> Box<dyn Write + Send> {
96+
match RollingFileAppender::builder()
97+
.rotation(Rotation::NEVER)
98+
.filename_prefix(filename)
99+
.filename_suffix("log")
100+
.build(dir)
101+
{
102+
Ok(file) => Box::new(file),
103+
Err(e) => {
104+
tracing::warn!("unable to create custom log file: {}", e);
105+
Box::new(std::io::stderr())
106+
}
107+
}
108+
}
109+
110+
fn writer(filename: &str) -> Box<dyn Write + Send> {
111+
match env::Env::current() {
112+
env::Env::Test | env::Env::MastEmulator => Box::new(std::io::stderr()),
113+
env::Env::Local => file_appender("/tmp/", &format!("monarch_log{}", filename)),
114+
env::Env::Mast => file_appender("/logs/", &format!("dedicated_log_monarch{}", filename)),
115+
}
116+
}
117+
85118
lazy_static! {
86-
static ref WRITER_GUARD: Arc<(NonBlocking, WorkerGuard)> = {
87-
let writer: Box<dyn Write + Send> = match env::Env::current() {
88-
env::Env::Local | env::Env::Test | env::Env::MastEmulator => {
89-
Box::new(std::io::stderr())
90-
}
91-
env::Env::Mast => match RollingFileAppender::builder()
92-
.rotation(Rotation::HOURLY)
93-
.filename_prefix("dedicated_log_monarch")
94-
.filename_suffix("log")
95-
.build("/logs/")
96-
{
97-
Ok(file) => Box::new(file),
98-
Err(e) => {
99-
tracing::warn!("unable to create custom log file: {}", e);
100-
Box::new(std::io::stderr())
101-
}
102-
},
103-
};
104-
return Arc::new(
105-
tracing_appender::non_blocking::NonBlockingBuilder::default()
106-
.lossy(false)
107-
.finish(writer),
108-
);
109-
};
110119
static ref TELEMETRY_CLOCK: Arc<Mutex<Box<dyn TelemetryClock + Send>>> =
111120
{ Arc::new(Mutex::new(Box::new(DefaultTelemetryClock {}))) };
112121
}
@@ -415,6 +424,161 @@ macro_rules! declare_static_histogram {
415424
};
416425
}
417426

427+
static WRITER_GUARD: std::sync::OnceLock<Arc<(NonBlocking, WorkerGuard)>> =
428+
std::sync::OnceLock::new();
429+
430+
tokio::task_local! {
431+
static TASK_LOG_FILENAME: RefCell<Option<Box<dyn Write + Send>>>;
432+
static TASK_LOG_PREFIX: RefCell<Option<String>>;
433+
}
434+
435+
/// Set both logging file and prefix for the current tokio task using a scope.
436+
/// This creates a scope where all log messages will be written to the specified file with the given prefix.
437+
pub async fn with_task_scoped_logging<F, Fut>(filename: &str, prefix: &str, f: F) -> Fut::Output
438+
where
439+
F: FnOnce() -> Fut,
440+
Fut: std::future::Future,
441+
{
442+
let writer_cell = RefCell::new(Some(writer(filename)));
443+
let prefix_cell = RefCell::new(Some(prefix.to_string()));
444+
445+
TASK_LOG_FILENAME
446+
.scope(writer_cell, async move {
447+
TASK_LOG_PREFIX
448+
.scope(prefix_cell, async move { f().await })
449+
.await
450+
})
451+
.await
452+
}
453+
454+
/// A custom formatter that prepends task-local prefixes to log messages.
455+
/// Uses either Glog formatter or default formatter based on environment.
456+
struct PrefixedFormatter {
457+
formatter: FormatterType,
458+
}
459+
460+
enum FormatterType {
461+
Glog(Glog<LocalTime>),
462+
Default(Format<tracing_subscriber::fmt::format::Full, ()>),
463+
}
464+
465+
impl PrefixedFormatter {
466+
fn new(formatter: FormatterType) -> Self {
467+
Self { formatter }
468+
}
469+
}
470+
471+
impl<S, N> FormatEvent<S, N> for PrefixedFormatter
472+
where
473+
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
474+
N: for<'a> FormatFields<'a> + 'static,
475+
{
476+
fn format_event(
477+
&self,
478+
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
479+
mut writer: Writer<'_>,
480+
event: &tracing::Event<'_>,
481+
) -> std::fmt::Result {
482+
let prefix = TASK_LOG_PREFIX
483+
.try_with(|log_prefix| log_prefix.borrow().clone())
484+
.unwrap_or(None);
485+
if let Some(prefix) = prefix {
486+
write!(writer, "[{}]", prefix)?;
487+
}
488+
489+
match &self.formatter {
490+
FormatterType::Glog(inner) => inner.format_event(ctx, writer, event),
491+
FormatterType::Default(inner) => inner.format_event(ctx, writer, event),
492+
}
493+
}
494+
}
495+
496+
/// A custom writer that routes log events to task-specific files with fallback.
497+
/// Checks if the current task has a specific log file configured,
498+
/// and if not, falls back to the provided fallback writer.
499+
#[derive(Debug, Clone)]
500+
struct TaskRoutingWriter<W> {
501+
fallback_writer: W,
502+
}
503+
504+
impl<W> TaskRoutingWriter<W> {
505+
fn new(fallback_writer: W) -> Self {
506+
Self { fallback_writer }
507+
}
508+
}
509+
510+
/// Wrapper that implements Write and routes to custom sink if defined, otherwise to fallback.
511+
struct CustomWriter<W> {
512+
sink: Sink<W>,
513+
}
514+
515+
enum Sink<W> {
516+
Custom,
517+
Fallback(W),
518+
}
519+
520+
impl<W: std::io::Write> std::io::Write for CustomWriter<W> {
521+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
522+
match &mut self.sink {
523+
Sink::Custom => TASK_LOG_FILENAME
524+
.try_with(|writer_ref| {
525+
let mut writer_borrow = writer_ref.borrow_mut();
526+
if let Some(ref mut writer) = *writer_borrow {
527+
writer.write(buf)
528+
} else {
529+
Err(std::io::Error::new(
530+
std::io::ErrorKind::NotFound,
531+
"Expected custom taks writer but found none.",
532+
))
533+
}
534+
})
535+
.unwrap_or_else(|_| {
536+
Err(std::io::Error::new(
537+
std::io::ErrorKind::NotFound,
538+
"Unable to access task writer.",
539+
))
540+
}),
541+
Sink::Fallback(writer) => writer.write(buf),
542+
}
543+
}
544+
545+
fn flush(&mut self) -> std::io::Result<()> {
546+
match &mut self.sink {
547+
Sink::Custom => TASK_LOG_FILENAME
548+
.try_with(|log| {
549+
let mut log_borrow = log.borrow_mut();
550+
if let Some(ref mut writer) = *log_borrow {
551+
writer.flush()
552+
} else {
553+
Ok(())
554+
}
555+
})
556+
.unwrap_or(Ok(())),
557+
Sink::Fallback(writer) => writer.flush(),
558+
}
559+
}
560+
}
561+
562+
impl<'a, W> MakeWriter<'a> for TaskRoutingWriter<W>
563+
where
564+
W: for<'writer> MakeWriter<'writer> + Clone,
565+
{
566+
type Writer = CustomWriter<<W as MakeWriter<'a>>::Writer>;
567+
568+
fn make_writer(&'a self) -> Self::Writer {
569+
match TASK_LOG_FILENAME.try_with(|log| log.borrow().is_some()) {
570+
Ok(_) => CustomWriter { sink: Sink::Custom },
571+
Err(_) => CustomWriter {
572+
sink: Sink::Fallback(self.fallback_writer.make_writer()),
573+
},
574+
}
575+
}
576+
577+
fn make_writer_for(&'a self, _: &tracing::Metadata<'_>) -> Self::Writer {
578+
self.make_writer()
579+
}
580+
}
581+
418582
/// Set up logging based on the given execution environment. We specialize logging based on how the
419583
/// logs are consumed. The destination scuba table is specialized based on the execution environment.
420584
/// mast -> monarch_tracing/prod
@@ -432,10 +596,22 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
432596
env::Env::Test => "debug",
433597
};
434598

435-
let writer: &NonBlocking = &WRITER_GUARD.0;
599+
// Create writer and store the guard globally to prevent it from being dropped
600+
let writer_box: Box<dyn Write + Send> = writer(&"");
601+
let (non_blocking, guard) = tracing_appender::non_blocking::NonBlockingBuilder::default()
602+
.lossy(false)
603+
.finish(writer_box);
604+
605+
let writer_guard = Arc::new((non_blocking, guard));
606+
let _ = WRITER_GUARD.set(writer_guard.clone());
607+
608+
let formatter = match env::Env::current() {
609+
env::Env::Mast => FormatterType::Glog(Glog::default().with_timer(LocalTime::default())),
610+
_ => FormatterType::Default(Format::default().without_time().with_target(false)),
611+
};
436612
let glog = fmt::Layer::default()
437-
.with_writer(writer.clone())
438-
.event_format(Glog::default().with_timer(LocalTime::default()))
613+
.with_writer(TaskRoutingWriter::new(writer_guard.0.clone()))
614+
.event_format(PrefixedFormatter::new(formatter))
439615
.fmt_fields(GlogFields::default().compact())
440616
.with_ansi(std::io::stderr().is_terminal())
441617
.with_filter(
@@ -449,6 +625,22 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
449625
.with_target("opentelemetry", LevelFilter::OFF), // otel has some log span under debug that we don't care about
450626
);
451627

628+
let debug_layer = fmt::Layer::default()
629+
.with_writer(std::io::stderr)
630+
.event_format(Format::default().without_time().with_target(false))
631+
.fmt_fields(GlogFields::default().compact())
632+
.with_ansi(std::io::stderr().is_terminal())
633+
.with_filter(
634+
Targets::new()
635+
.with_default(LevelFilter::from_level(
636+
tracing::Level::from_str(
637+
&std::env::var("RUST_LOG").unwrap_or(glog_level.to_string()),
638+
)
639+
.expect("Invalid log level"),
640+
))
641+
.with_target("opentelemetry", LevelFilter::OFF),
642+
);
643+
452644
use tracing_subscriber::Registry;
453645
use tracing_subscriber::layer::SubscriberExt;
454646
use tracing_subscriber::util::SubscriberInitExt;
@@ -465,6 +657,11 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
465657
} else {
466658
None
467659
})
660+
.with(if is_layer_enabled(MONARCH_DEBUG) {
661+
Some(debug_layer)
662+
} else {
663+
None
664+
})
468665
.with(if is_layer_enabled(DISABLE_GLOG_TRACING) {
469666
Some(glog)
470667
} else {

0 commit comments

Comments
 (0)