Skip to content

Commit 048a940

Browse files
pablorfb-metafacebook-github-bot
authored andcommitted
Split Actor logs in their own files
Summary: - Each actor type will create a new file suffixed with the actor name - Each log will contain the proc and rank id of the actor. This will simplify filtering to logs of a single actor - Logs not coming from an actor will be dumped in a common file suffixed 'monarch' - Additionally stream all logs to stdout when `MONARCH_STDOUT=1` TODO: - Support Python Actors Differential Revision: D78101221
1 parent 06d65a5 commit 048a940

File tree

2 files changed

+245
-19
lines changed

2 files changed

+245
-19
lines changed

hyperactor/src/proc.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,14 @@ impl<A: Actor> Instance<A> {
919919
}
920920

921921
async fn serve(mut self, mut actor: A) {
922-
let result = self.run_actor_tree(&mut actor).await;
922+
let actor_id = self.cell.actor_id().clone();
923+
let filename = actor_id.name().to_string();
924+
let prefix = format!("{}_actor[{}]", actor_id.proc_id(), actor_id.rank());
925+
926+
let result = hyperactor_telemetry::with_task_scoped_logging(&filename, &prefix, || async {
927+
self.run_actor_tree(&mut actor).await
928+
})
929+
.await;
923930

924931
let actor_status = match result {
925932
Ok(_) => ActorStatus::Stopped,
@@ -1026,7 +1033,7 @@ impl<A: Actor> Instance<A> {
10261033
/// Initialize and run the actor until it fails or is stopped.
10271034
async fn run(&mut self, actor: &mut A) -> Result<(), ActorError> {
10281035
hyperactor_telemetry::declare_static_counter!(MESSAGES_RECEIVED, "actor.messages_received");
1029-
tracing::debug!("entering actor loop: {}", self.self_id());
1036+
tracing::debug!("entering actor loop for {}", self.cell.actor_id());
10301037

10311038
self.change_status(ActorStatus::Initializing);
10321039
actor

hyperactor_telemetry/src/lib.rs

Lines changed: 236 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,17 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
3030
/// Set to "1" to disable the recorder output.
3131
pub const DISABLE_RECORDER_TRACING: &str = "DISABLE_RECORDER_TRACING";
3232

33+
/// Environment variable to additionally enable stdout logging in Local environment.
34+
/// Set to "1" to also dump logs to stdout when running in Local environment.
35+
pub const MONARCH_STDOUT: &str = "MONARCH_STDOUT";
36+
3337
#[cfg(fbcode_build)]
3438
mod meta;
3539
mod otel;
3640
mod pool;
3741
pub mod recorder;
3842
mod spool;
43+
use std::cell::RefCell;
3944
use std::io::IsTerminal;
4045
use std::io::Write;
4146
use std::str::FromStr;
@@ -61,6 +66,12 @@ use tracing_subscriber::Layer;
6166
use tracing_subscriber::filter::LevelFilter;
6267
use tracing_subscriber::filter::Targets;
6368
use tracing_subscriber::fmt;
69+
use tracing_subscriber::fmt::FormatEvent;
70+
use tracing_subscriber::fmt::FormatFields;
71+
use tracing_subscriber::fmt::MakeWriter;
72+
use tracing_subscriber::fmt::format::Format;
73+
use tracing_subscriber::fmt::format::Writer;
74+
use tracing_subscriber::registry::LookupSpan;
6475

6576
use crate::recorder::Recorder;
6677

@@ -81,26 +92,79 @@ impl TelemetryClock for DefaultTelemetryClock {
8192
}
8293
}
8394

84-
// Need to keep this around so that the tracing subscriber doesn't drop the writer.
85-
lazy_static! {
86-
static ref WRITER_GUARD: Arc<(NonBlocking, WorkerGuard)> = {
87-
let writer: Box<dyn Write + Send> = match env::Env::current() {
88-
env::Env::Local | env::Env::Test | env::Env::MastEmulator => {
89-
Box::new(std::io::stderr())
90-
}
91-
env::Env::Mast => match RollingFileAppender::builder()
92-
.rotation(Rotation::HOURLY)
93-
.filename_prefix("dedicated_log_monarch")
95+
/// A writer that can write to both a primary writer and stdout simultaneously.
96+
struct DualWriter<W: Write> {
97+
primary: W,
98+
stdout: std::io::Stdout,
99+
}
100+
101+
impl<W: Write> DualWriter<W> {
102+
fn new(primary: W) -> Self {
103+
Self {
104+
primary,
105+
stdout: std::io::stdout(),
106+
}
107+
}
108+
}
109+
110+
impl<W: Write> Write for DualWriter<W> {
111+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
112+
let result = self.primary.write(buf);
113+
let _ = self.stdout.write(buf);
114+
result
115+
}
116+
117+
fn flush(&mut self) -> std::io::Result<()> {
118+
let primary_result = self.primary.flush();
119+
let _ = self.stdout.flush();
120+
primary_result
121+
}
122+
}
123+
124+
fn writer(filename: &str) -> Box<dyn Write + Send> {
125+
match env::Env::current() {
126+
env::Env::Test | env::Env::MastEmulator => Box::new(std::io::stderr()),
127+
env::Env::Local => {
128+
let enable_stdout = std::env::var(MONARCH_STDOUT).unwrap_or_default() == "1";
129+
130+
match RollingFileAppender::builder()
131+
.rotation(Rotation::NEVER)
132+
.filename_prefix(format!("monarch_log_{}", filename))
94133
.filename_suffix("log")
95-
.build("/logs/")
134+
.build("/tmp/")
96135
{
97-
Ok(file) => Box::new(file),
136+
Ok(file) => {
137+
if enable_stdout {
138+
Box::new(DualWriter::new(file))
139+
} else {
140+
Box::new(file)
141+
}
142+
}
98143
Err(e) => {
99144
tracing::warn!("unable to create custom log file: {}", e);
100145
Box::new(std::io::stderr())
101146
}
102-
},
103-
};
147+
}
148+
}
149+
env::Env::Mast => match RollingFileAppender::builder()
150+
.rotation(Rotation::HOURLY)
151+
.filename_prefix(format!("dedicated_log_{}", filename))
152+
.filename_suffix("log")
153+
.build("/logs/")
154+
{
155+
Ok(file) => Box::new(file),
156+
Err(e) => {
157+
tracing::warn!("unable to create custom log file: {}", e);
158+
Box::new(std::io::stderr())
159+
}
160+
},
161+
}
162+
}
163+
164+
// Need to keep this around so that the tracing subscriber doesn't drop the writer.
165+
lazy_static! {
166+
static ref DEFAULT_WRITER_GUARD: Arc<(NonBlocking, WorkerGuard)> = {
167+
let writer: Box<dyn Write + Send> = writer("monarch");
104168
return Arc::new(
105169
tracing_appender::non_blocking::NonBlockingBuilder::default()
106170
.lossy(false)
@@ -415,6 +479,158 @@ macro_rules! declare_static_histogram {
415479
};
416480
}
417481

482+
tokio::task_local! {
483+
static TASK_LOG_FILENAME: RefCell<Option<Box<dyn Write + Send>>>;
484+
static TASK_LOG_PREFIX: RefCell<Option<String>>;
485+
}
486+
487+
/// Set both logging file and prefix for the current tokio task using a scope.
488+
/// This creates a scope where all log messages will be written to the specified file with the given prefix.
489+
pub async fn with_task_scoped_logging<F, Fut>(filename: &str, prefix: &str, f: F) -> Fut::Output
490+
where
491+
F: FnOnce() -> Fut,
492+
Fut: std::future::Future,
493+
{
494+
let writer_cell = RefCell::new(Some(writer(filename)));
495+
let prefix_cell = RefCell::new(Some(prefix.to_string()));
496+
497+
TASK_LOG_FILENAME
498+
.scope(writer_cell, async move {
499+
TASK_LOG_PREFIX
500+
.scope(prefix_cell, async move { f().await })
501+
.await
502+
})
503+
.await
504+
}
505+
506+
/// A custom formatter that prepends task-local prefixes to log messages.
507+
/// Uses either Glog formatter or default formatter based on environment.
508+
struct PrefixedFormatter {
509+
formatter: FormatterType,
510+
}
511+
512+
enum FormatterType {
513+
Glog(Glog<LocalTime>),
514+
Default(Format<tracing_subscriber::fmt::format::Full, ()>),
515+
}
516+
517+
impl PrefixedFormatter {
518+
fn new(formatter: FormatterType) -> Self {
519+
Self { formatter }
520+
}
521+
}
522+
523+
impl<S, N> FormatEvent<S, N> for PrefixedFormatter
524+
where
525+
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
526+
N: for<'a> FormatFields<'a> + 'static,
527+
{
528+
fn format_event(
529+
&self,
530+
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
531+
mut writer: Writer<'_>,
532+
event: &tracing::Event<'_>,
533+
) -> std::fmt::Result {
534+
let prefix = TASK_LOG_PREFIX
535+
.try_with(|log_prefix| log_prefix.borrow().clone())
536+
.unwrap_or(None);
537+
if let Some(prefix) = prefix {
538+
write!(writer, "[{}]", prefix)?;
539+
}
540+
541+
match &self.formatter {
542+
FormatterType::Glog(inner) => inner.format_event(ctx, writer, event),
543+
FormatterType::Default(inner) => inner.format_event(ctx, writer, event),
544+
}
545+
}
546+
}
547+
548+
/// A custom writer that routes log events to task-specific files with fallback.
549+
/// Checks if the current task has a specific log file configured,
550+
/// and if not, falls back to the provided fallback writer.
551+
#[derive(Debug, Clone)]
552+
struct TaskRoutingWriter<W> {
553+
fallback_writer: W,
554+
}
555+
556+
impl<W> TaskRoutingWriter<W> {
557+
fn new(fallback_writer: W) -> Self {
558+
Self { fallback_writer }
559+
}
560+
}
561+
562+
/// Wrapper that implements Write and routes to custom sink if defined, otherwise to fallback.
563+
struct CustomWriter<W> {
564+
sink: Sink<W>,
565+
}
566+
567+
enum Sink<W> {
568+
Custom,
569+
Fallback(W),
570+
}
571+
572+
impl<W: std::io::Write> std::io::Write for CustomWriter<W> {
573+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
574+
match &mut self.sink {
575+
Sink::Custom => TASK_LOG_FILENAME
576+
.try_with(|writer_ref| {
577+
let mut writer_borrow = writer_ref.borrow_mut();
578+
if let Some(ref mut writer) = *writer_borrow {
579+
writer.write(buf)
580+
} else {
581+
Err(std::io::Error::new(
582+
std::io::ErrorKind::NotFound,
583+
"Expected custom taks writer but found none.",
584+
))
585+
}
586+
})
587+
.unwrap_or_else(|_| {
588+
Err(std::io::Error::new(
589+
std::io::ErrorKind::NotFound,
590+
"Unable to access task writer.",
591+
))
592+
}),
593+
Sink::Fallback(writer) => writer.write(buf),
594+
}
595+
}
596+
597+
fn flush(&mut self) -> std::io::Result<()> {
598+
match &mut self.sink {
599+
Sink::Custom => TASK_LOG_FILENAME
600+
.try_with(|log| {
601+
let mut log_borrow = log.borrow_mut();
602+
if let Some(ref mut writer) = *log_borrow {
603+
writer.flush()
604+
} else {
605+
Ok(())
606+
}
607+
})
608+
.unwrap_or(Ok(())),
609+
Sink::Fallback(writer) => writer.flush(),
610+
}
611+
}
612+
}
613+
614+
impl<'a, W> MakeWriter<'a> for TaskRoutingWriter<W>
615+
where
616+
W: for<'writer> MakeWriter<'writer> + Clone,
617+
{
618+
type Writer = CustomWriter<<W as MakeWriter<'a>>::Writer>;
619+
620+
fn make_writer(&'a self) -> Self::Writer {
621+
match TASK_LOG_FILENAME.try_with(|log| log.borrow().is_some()) {
622+
Ok(_) => CustomWriter { sink: Sink::Custom },
623+
Err(_) => CustomWriter {
624+
sink: Sink::Fallback(self.fallback_writer.make_writer()),
625+
},
626+
}
627+
}
628+
629+
fn make_writer_for(&'a self, _: &tracing::Metadata<'_>) -> Self::Writer {
630+
self.make_writer()
631+
}
632+
}
633+
418634
/// Set up logging based on the given execution environment. We specialize logging based on how the
419635
/// logs are consumed. The destination scuba table is specialized based on the execution environment.
420636
/// mast -> monarch_tracing/prod
@@ -432,10 +648,13 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
432648
env::Env::Test => "debug",
433649
};
434650

435-
let writer: &NonBlocking = &WRITER_GUARD.0;
651+
let formatter = match env::Env::current() {
652+
env::Env::Mast => FormatterType::Glog(Glog::default().with_timer(LocalTime::default())),
653+
_ => FormatterType::Default(Format::default().without_time().with_target(false)),
654+
};
436655
let glog = fmt::Layer::default()
437-
.with_writer(writer.clone())
438-
.event_format(Glog::default().with_timer(LocalTime::default()))
656+
.with_writer(TaskRoutingWriter::new(DEFAULT_WRITER_GUARD.0.clone()))
657+
.event_format(PrefixedFormatter::new(formatter))
439658
.fmt_fields(GlogFields::default().compact())
440659
.with_ansi(std::io::stderr().is_terminal())
441660
.with_filter(

0 commit comments

Comments
 (0)