Skip to content

Commit f0385b1

Browse files
pablorfb-metafacebook-github-bot
authored andcommitted
Split Actor logs in their own files
Summary: - Each actor type will create a new file suffixed with the actor name - Each log will contain the proc and rank id of the actor. This will simplify filtering to logs of a single actor - Logs not coming from an actor will be dumped in a common file suffixed 'monarch' TODO: - Support Python Actors - Support multiple configurable streams for Local Differential Revision: D78101221
1 parent e9fef0d commit f0385b1

File tree

2 files changed

+191
-23
lines changed

2 files changed

+191
-23
lines changed

hyperactor/src/proc.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,15 @@ impl<A: Actor> Instance<A> {
919919
}
920920

921921
async fn serve(mut self, mut actor: A) {
922-
let result = self.run_actor_tree(&mut actor).await;
922+
let actor_id = self.cell.actor_id().clone();
923+
let filename = actor_id.name().to_string();
924+
let prefix = format!("{}_actor[{}]", actor_id.proc_id(), actor_id.rank());
925+
926+
let result =
927+
hyperactor_telemetry::with_logging_file_and_prefix(&filename, &prefix, || async {
928+
self.run_actor_tree(&mut actor).await
929+
})
930+
.await;
923931

924932
let actor_status = match result {
925933
Ok(_) => ActorStatus::Stopped,
@@ -1026,7 +1034,7 @@ impl<A: Actor> Instance<A> {
10261034
/// Initialize and run the actor until it fails or is stopped.
10271035
async fn run(&mut self, actor: &mut A) -> Result<(), ActorError> {
10281036
hyperactor_telemetry::declare_static_counter!(MESSAGES_RECEIVED, "actor.messages_received");
1029-
tracing::debug!("entering actor loop: {}", self.self_id());
1037+
tracing::debug!("entering actor loop for {}", self.cell.actor_id());
10301038

10311039
self.change_status(ActorStatus::Initializing);
10321040
actor

hyperactor_telemetry/src/lib.rs

Lines changed: 181 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mod otel;
3636
mod pool;
3737
pub mod recorder;
3838
mod spool;
39+
use std::cell::RefCell;
3940
use std::io::IsTerminal;
4041
use std::io::Write;
4142
use std::str::FromStr;
@@ -61,6 +62,11 @@ use tracing_subscriber::Layer;
6162
use tracing_subscriber::filter::LevelFilter;
6263
use tracing_subscriber::filter::Targets;
6364
use tracing_subscriber::fmt;
65+
use tracing_subscriber::fmt::FormatEvent;
66+
use tracing_subscriber::fmt::FormatFields;
67+
use tracing_subscriber::fmt::MakeWriter;
68+
use tracing_subscriber::fmt::format::Writer;
69+
use tracing_subscriber::registry::LookupSpan;
6470

6571
use crate::recorder::Recorder;
6672

@@ -81,26 +87,28 @@ impl TelemetryClock for DefaultTelemetryClock {
8187
}
8288
}
8389

84-
// Need to keep this around so that the tracing subscriber doesn't drop the writer.
85-
lazy_static! {
86-
static ref WRITER_GUARD: Arc<(NonBlocking, WorkerGuard)> = {
87-
let writer: Box<dyn Write + Send> = match env::Env::current() {
88-
env::Env::Local | env::Env::Test | env::Env::MastEmulator => {
90+
fn writer(filename: &str) -> Box<dyn Write + Send> {
91+
match env::Env::current() {
92+
env::Env::Test | env::Env::MastEmulator => Box::new(std::io::stderr()),
93+
env::Env::Local | env::Env::Mast => match RollingFileAppender::builder()
94+
.rotation(Rotation::HOURLY)
95+
.filename_prefix(format!("dedicated_log_{}", filename))
96+
.filename_suffix("log")
97+
.build("/logs/")
98+
{
99+
Ok(file) => Box::new(file),
100+
Err(e) => {
101+
tracing::warn!("unable to create custom log file: {}", e);
89102
Box::new(std::io::stderr())
90103
}
91-
env::Env::Mast => match RollingFileAppender::builder()
92-
.rotation(Rotation::HOURLY)
93-
.filename_prefix("dedicated_log_monarch")
94-
.filename_suffix("log")
95-
.build("/logs/")
96-
{
97-
Ok(file) => Box::new(file),
98-
Err(e) => {
99-
tracing::warn!("unable to create custom log file: {}", e);
100-
Box::new(std::io::stderr())
101-
}
102-
},
103-
};
104+
},
105+
}
106+
}
107+
108+
// Need to keep this around so that the tracing subscriber doesn't drop the writer.
109+
lazy_static! {
110+
static ref DEFAULT_WRITER_GUARD: Arc<(NonBlocking, WorkerGuard)> = {
111+
let writer: Box<dyn Write + Send> = writer("monarch");
104112
return Arc::new(
105113
tracing_appender::non_blocking::NonBlockingBuilder::default()
106114
.lossy(false)
@@ -415,6 +423,159 @@ macro_rules! declare_static_histogram {
415423
};
416424
}
417425

426+
tokio::task_local! {
427+
static TASK_LOG_FILENAME: RefCell<Option<Box<dyn Write + Send>>>;
428+
static TASK_LOG_PREFIX: RefCell<Option<String>>;
429+
}
430+
431+
/// Set both logging file and prefix for the current tokio task using a scope.
432+
/// This creates a scope where all log messages will be written to the specified file with the given prefix.
433+
pub async fn with_logging_file_and_prefix<F, Fut>(filename: &str, prefix: &str, f: F) -> Fut::Output
434+
where
435+
F: FnOnce() -> Fut,
436+
Fut: std::future::Future,
437+
{
438+
let writer_cell = RefCell::new(Some(writer(filename)));
439+
let prefix_cell = RefCell::new(Some(prefix.to_string()));
440+
441+
TASK_LOG_FILENAME
442+
.scope(writer_cell, async move {
443+
TASK_LOG_PREFIX
444+
.scope(prefix_cell, async move { f().await })
445+
.await
446+
})
447+
.await
448+
}
449+
450+
/// A custom formatter that prepends task-local prefixes to log messages.
451+
/// Wraps the Glog formatter and adds task-specific prefix.
452+
pub struct PrefixedGlogFormatter {
453+
inner: Glog<LocalTime>,
454+
}
455+
456+
impl PrefixedGlogFormatter {
457+
pub fn new() -> Self {
458+
Self {
459+
inner: Glog::default().with_timer(LocalTime::default()),
460+
}
461+
}
462+
}
463+
464+
impl Default for PrefixedGlogFormatter {
465+
fn default() -> Self {
466+
Self::new()
467+
}
468+
}
469+
470+
impl<S, N> FormatEvent<S, N> for PrefixedGlogFormatter
471+
where
472+
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
473+
N: for<'a> FormatFields<'a> + 'static,
474+
{
475+
fn format_event(
476+
&self,
477+
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
478+
mut writer: Writer<'_>,
479+
event: &tracing::Event<'_>,
480+
) -> std::fmt::Result {
481+
let prefix = TASK_LOG_PREFIX
482+
.try_with(|log_prefix| log_prefix.borrow().clone())
483+
.unwrap_or(None);
484+
if let Some(prefix) = prefix {
485+
write!(writer, "[{}]", prefix)?;
486+
}
487+
488+
self.inner.format_event(ctx, writer, event)
489+
}
490+
}
491+
492+
/// A custom writer that routes log events to task-specific files with fallback.
493+
/// Checks if the current task has a specific log file configured,
494+
/// and if not, falls back to the provided fallback writer.
495+
#[derive(Debug, Clone)]
496+
pub struct TaskRoutingWriter<W> {
497+
fallback_writer: W,
498+
}
499+
500+
impl<W> TaskRoutingWriter<W> {
501+
/// Create a new TaskRoutingWriter with the specified fallback writer.
502+
pub fn new(fallback_writer: W) -> Self {
503+
Self { fallback_writer }
504+
}
505+
}
506+
507+
/// Wrapper that implements Write and routes to custom sink if defined, otherwise to fallback.
508+
pub struct CustomWriter<W> {
509+
sink: Sink<W>,
510+
}
511+
512+
enum Sink<W> {
513+
Custom,
514+
Fallback(W),
515+
}
516+
517+
impl<W: std::io::Write> std::io::Write for CustomWriter<W> {
518+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
519+
match &mut self.sink {
520+
Sink::Custom => TASK_LOG_FILENAME
521+
.try_with(|writer_ref| {
522+
let mut writer_borrow = writer_ref.borrow_mut();
523+
if let Some(ref mut writer) = *writer_borrow {
524+
writer.write(buf)
525+
} else {
526+
Err(std::io::Error::new(
527+
std::io::ErrorKind::NotFound,
528+
"Expected custom taks writer but found none.",
529+
))
530+
}
531+
})
532+
.unwrap_or_else(|_| {
533+
Err(std::io::Error::new(
534+
std::io::ErrorKind::NotFound,
535+
"Unable to access task writer.",
536+
))
537+
}),
538+
Sink::Fallback(writer) => writer.write(buf),
539+
}
540+
}
541+
542+
fn flush(&mut self) -> std::io::Result<()> {
543+
match &mut self.sink {
544+
Sink::Custom => TASK_LOG_FILENAME
545+
.try_with(|log| {
546+
let mut log_borrow = log.borrow_mut();
547+
if let Some(ref mut writer) = *log_borrow {
548+
writer.flush()
549+
} else {
550+
Ok(())
551+
}
552+
})
553+
.unwrap_or(Ok(())),
554+
Sink::Fallback(writer) => writer.flush(),
555+
}
556+
}
557+
}
558+
559+
impl<'a, W> MakeWriter<'a> for TaskRoutingWriter<W>
560+
where
561+
W: for<'writer> MakeWriter<'writer> + Clone,
562+
{
563+
type Writer = CustomWriter<<W as MakeWriter<'a>>::Writer>;
564+
565+
fn make_writer(&'a self) -> Self::Writer {
566+
match TASK_LOG_FILENAME.try_with(|log| log.borrow().is_some()) {
567+
Ok(_) => CustomWriter { sink: Sink::Custom },
568+
Err(_) => CustomWriter {
569+
sink: Sink::Fallback(self.fallback_writer.make_writer()),
570+
},
571+
}
572+
}
573+
574+
fn make_writer_for(&'a self, _: &tracing::Metadata<'_>) -> Self::Writer {
575+
self.make_writer()
576+
}
577+
}
578+
418579
/// Set up logging based on the given execution environment. We specialize logging based on how the
419580
/// logs are consumed. The destination scuba table is specialized based on the execution environment.
420581
/// mast -> monarch_tracing/prod
@@ -432,10 +593,9 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
432593
env::Env::Test => "debug",
433594
};
434595

435-
let writer: &NonBlocking = &WRITER_GUARD.0;
436596
let glog = fmt::Layer::default()
437-
.with_writer(writer.clone())
438-
.event_format(Glog::default().with_timer(LocalTime::default()))
597+
.with_writer(TaskRoutingWriter::new(DEFAULT_WRITER_GUARD.0.clone()))
598+
.event_format(PrefixedGlogFormatter::new())
439599
.fmt_fields(GlogFields::default().compact())
440600
.with_ansi(std::io::stderr().is_terminal())
441601
.with_filter(

0 commit comments

Comments
 (0)