@@ -30,12 +30,17 @@ pub const DISABLE_OTEL_METRICS: &str = "DISABLE_OTEL_METRICS";
30
30
/// Set to "1" to disable the recorder output.
31
31
pub const DISABLE_RECORDER_TRACING : & str = "DISABLE_RECORDER_TRACING" ;
32
32
33
+ /// Environment variable to additionally enable stderr logging in Local environment.
34
+ /// Set to "1" to also dump logs to stderr when running in Local environment.
35
+ pub const MONARCH_DEBUG : & str = "MONARCH_DEBUG" ;
36
+
33
37
#[ cfg( fbcode_build) ]
34
38
mod meta;
35
39
mod otel;
36
40
mod pool;
37
41
pub mod recorder;
38
42
mod spool;
43
+ use std:: cell:: RefCell ;
39
44
use std:: io:: IsTerminal ;
40
45
use std:: io:: Write ;
41
46
use std:: str:: FromStr ;
@@ -61,6 +66,12 @@ use tracing_subscriber::Layer;
61
66
use tracing_subscriber:: filter:: LevelFilter ;
62
67
use tracing_subscriber:: filter:: Targets ;
63
68
use tracing_subscriber:: fmt;
69
+ use tracing_subscriber:: fmt:: FormatEvent ;
70
+ use tracing_subscriber:: fmt:: FormatFields ;
71
+ use tracing_subscriber:: fmt:: MakeWriter ;
72
+ use tracing_subscriber:: fmt:: format:: Format ;
73
+ use tracing_subscriber:: fmt:: format:: Writer ;
74
+ use tracing_subscriber:: registry:: LookupSpan ;
64
75
65
76
use crate :: recorder:: Recorder ;
66
77
@@ -81,32 +92,30 @@ impl TelemetryClock for DefaultTelemetryClock {
81
92
}
82
93
}
83
94
84
- // Need to keep this around so that the tracing subscriber doesn't drop the writer.
95
+ fn file_appender ( dir : & str , filename : & str ) -> Box < dyn Write + Send > {
96
+ match RollingFileAppender :: builder ( )
97
+ . rotation ( Rotation :: NEVER )
98
+ . filename_prefix ( filename)
99
+ . filename_suffix ( "log" )
100
+ . build ( dir)
101
+ {
102
+ Ok ( file) => Box :: new ( file) ,
103
+ Err ( e) => {
104
+ tracing:: warn!( "unable to create custom log file: {}" , e) ;
105
+ Box :: new ( std:: io:: stderr ( ) )
106
+ }
107
+ }
108
+ }
109
+
110
+ fn writer ( filename : & str ) -> Box < dyn Write + Send > {
111
+ match env:: Env :: current ( ) {
112
+ env:: Env :: Test | env:: Env :: MastEmulator => Box :: new ( std:: io:: stderr ( ) ) ,
113
+ env:: Env :: Local => file_appender ( "/tmp/" , & format ! ( "monarch_log{}" , filename) ) ,
114
+ env:: Env :: Mast => file_appender ( "/logs/" , & format ! ( "dedicated_log_monarch{}" , filename) ) ,
115
+ }
116
+ }
117
+
85
118
lazy_static ! {
86
- static ref WRITER_GUARD : Arc <( NonBlocking , WorkerGuard ) > = {
87
- let writer: Box <dyn Write + Send > = match env:: Env :: current( ) {
88
- env:: Env :: Local | env:: Env :: Test | env:: Env :: MastEmulator => {
89
- Box :: new( std:: io:: stderr( ) )
90
- }
91
- env:: Env :: Mast => match RollingFileAppender :: builder( )
92
- . rotation( Rotation :: HOURLY )
93
- . filename_prefix( "dedicated_log_monarch" )
94
- . filename_suffix( "log" )
95
- . build( "/logs/" )
96
- {
97
- Ok ( file) => Box :: new( file) ,
98
- Err ( e) => {
99
- tracing:: warn!( "unable to create custom log file: {}" , e) ;
100
- Box :: new( std:: io:: stderr( ) )
101
- }
102
- } ,
103
- } ;
104
- return Arc :: new(
105
- tracing_appender:: non_blocking:: NonBlockingBuilder :: default ( )
106
- . lossy( false )
107
- . finish( writer) ,
108
- ) ;
109
- } ;
110
119
static ref TELEMETRY_CLOCK : Arc <Mutex <Box <dyn TelemetryClock + Send >>> =
111
120
{ Arc :: new( Mutex :: new( Box :: new( DefaultTelemetryClock { } ) ) ) } ;
112
121
}
@@ -415,6 +424,161 @@ macro_rules! declare_static_histogram {
415
424
} ;
416
425
}
417
426
427
+ static WRITER_GUARD : std:: sync:: OnceLock < Arc < ( NonBlocking , WorkerGuard ) > > =
428
+ std:: sync:: OnceLock :: new ( ) ;
429
+
430
+ tokio:: task_local! {
431
+ static TASK_LOG_FILENAME : RefCell <Option <Box <dyn Write + Send >>>;
432
+ static TASK_LOG_PREFIX : RefCell <Option <String >>;
433
+ }
434
+
435
+ /// Set both logging file and prefix for the current tokio task using a scope.
436
+ /// This creates a scope where all log messages will be written to the specified file with the given prefix.
437
+ pub async fn with_task_scoped_logging < F , Fut > ( filename : & str , prefix : & str , f : F ) -> Fut :: Output
438
+ where
439
+ F : FnOnce ( ) -> Fut ,
440
+ Fut : std:: future:: Future ,
441
+ {
442
+ let writer_cell = RefCell :: new ( Some ( writer ( filename) ) ) ;
443
+ let prefix_cell = RefCell :: new ( Some ( prefix. to_string ( ) ) ) ;
444
+
445
+ TASK_LOG_FILENAME
446
+ . scope ( writer_cell, async move {
447
+ TASK_LOG_PREFIX
448
+ . scope ( prefix_cell, async move { f ( ) . await } )
449
+ . await
450
+ } )
451
+ . await
452
+ }
453
+
454
+ /// A custom formatter that prepends task-local prefixes to log messages.
455
+ /// Uses either Glog formatter or default formatter based on environment.
456
+ struct PrefixedFormatter {
457
+ formatter : FormatterType ,
458
+ }
459
+
460
+ enum FormatterType {
461
+ Glog ( Glog < LocalTime > ) ,
462
+ Default ( Format < tracing_subscriber:: fmt:: format:: Full , ( ) > ) ,
463
+ }
464
+
465
+ impl PrefixedFormatter {
466
+ fn new ( formatter : FormatterType ) -> Self {
467
+ Self { formatter }
468
+ }
469
+ }
470
+
471
+ impl < S , N > FormatEvent < S , N > for PrefixedFormatter
472
+ where
473
+ S : tracing:: Subscriber + for < ' a > LookupSpan < ' a > ,
474
+ N : for < ' a > FormatFields < ' a > + ' static ,
475
+ {
476
+ fn format_event (
477
+ & self ,
478
+ ctx : & tracing_subscriber:: fmt:: FmtContext < ' _ , S , N > ,
479
+ mut writer : Writer < ' _ > ,
480
+ event : & tracing:: Event < ' _ > ,
481
+ ) -> std:: fmt:: Result {
482
+ let prefix = TASK_LOG_PREFIX
483
+ . try_with ( |log_prefix| log_prefix. borrow ( ) . clone ( ) )
484
+ . unwrap_or ( None ) ;
485
+ if let Some ( prefix) = prefix {
486
+ write ! ( writer, "[{}]" , prefix) ?;
487
+ }
488
+
489
+ match & self . formatter {
490
+ FormatterType :: Glog ( inner) => inner. format_event ( ctx, writer, event) ,
491
+ FormatterType :: Default ( inner) => inner. format_event ( ctx, writer, event) ,
492
+ }
493
+ }
494
+ }
495
+
496
+ /// A custom writer that routes log events to task-specific files with fallback.
497
+ /// Checks if the current task has a specific log file configured,
498
+ /// and if not, falls back to the provided fallback writer.
499
+ #[ derive( Debug , Clone ) ]
500
+ struct TaskRoutingWriter < W > {
501
+ fallback_writer : W ,
502
+ }
503
+
504
+ impl < W > TaskRoutingWriter < W > {
505
+ fn new ( fallback_writer : W ) -> Self {
506
+ Self { fallback_writer }
507
+ }
508
+ }
509
+
510
+ /// Wrapper that implements Write and routes to custom sink if defined, otherwise to fallback.
511
+ struct CustomWriter < W > {
512
+ sink : Sink < W > ,
513
+ }
514
+
515
+ enum Sink < W > {
516
+ Custom ,
517
+ Fallback ( W ) ,
518
+ }
519
+
520
+ impl < W : std:: io:: Write > std:: io:: Write for CustomWriter < W > {
521
+ fn write ( & mut self , buf : & [ u8 ] ) -> std:: io:: Result < usize > {
522
+ match & mut self . sink {
523
+ Sink :: Custom => TASK_LOG_FILENAME
524
+ . try_with ( |writer_ref| {
525
+ let mut writer_borrow = writer_ref. borrow_mut ( ) ;
526
+ if let Some ( ref mut writer) = * writer_borrow {
527
+ writer. write ( buf)
528
+ } else {
529
+ Err ( std:: io:: Error :: new (
530
+ std:: io:: ErrorKind :: NotFound ,
531
+ "Expected custom taks writer but found none." ,
532
+ ) )
533
+ }
534
+ } )
535
+ . unwrap_or_else ( |_| {
536
+ Err ( std:: io:: Error :: new (
537
+ std:: io:: ErrorKind :: NotFound ,
538
+ "Unable to access task writer." ,
539
+ ) )
540
+ } ) ,
541
+ Sink :: Fallback ( writer) => writer. write ( buf) ,
542
+ }
543
+ }
544
+
545
+ fn flush ( & mut self ) -> std:: io:: Result < ( ) > {
546
+ match & mut self . sink {
547
+ Sink :: Custom => TASK_LOG_FILENAME
548
+ . try_with ( |log| {
549
+ let mut log_borrow = log. borrow_mut ( ) ;
550
+ if let Some ( ref mut writer) = * log_borrow {
551
+ writer. flush ( )
552
+ } else {
553
+ Ok ( ( ) )
554
+ }
555
+ } )
556
+ . unwrap_or ( Ok ( ( ) ) ) ,
557
+ Sink :: Fallback ( writer) => writer. flush ( ) ,
558
+ }
559
+ }
560
+ }
561
+
562
+ impl < ' a , W > MakeWriter < ' a > for TaskRoutingWriter < W >
563
+ where
564
+ W : for < ' writer > MakeWriter < ' writer > + Clone ,
565
+ {
566
+ type Writer = CustomWriter < <W as MakeWriter < ' a > >:: Writer > ;
567
+
568
+ fn make_writer ( & ' a self ) -> Self :: Writer {
569
+ match TASK_LOG_FILENAME . try_with ( |log| log. borrow ( ) . is_some ( ) ) {
570
+ Ok ( _) => CustomWriter { sink : Sink :: Custom } ,
571
+ Err ( _) => CustomWriter {
572
+ sink : Sink :: Fallback ( self . fallback_writer . make_writer ( ) ) ,
573
+ } ,
574
+ }
575
+ }
576
+
577
+ fn make_writer_for ( & ' a self , _: & tracing:: Metadata < ' _ > ) -> Self :: Writer {
578
+ self . make_writer ( )
579
+ }
580
+ }
581
+
418
582
/// Set up logging based on the given execution environment. We specialize logging based on how the
419
583
/// logs are consumed. The destination scuba table is specialized based on the execution environment.
420
584
/// mast -> monarch_tracing/prod
@@ -432,10 +596,22 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
432
596
env:: Env :: Test => "debug" ,
433
597
} ;
434
598
435
- let writer: & NonBlocking = & WRITER_GUARD . 0 ;
599
+ // Create writer and store the guard globally to prevent it from being dropped
600
+ let writer_box: Box < dyn Write + Send > = writer ( & "" ) ;
601
+ let ( non_blocking, guard) = tracing_appender:: non_blocking:: NonBlockingBuilder :: default ( )
602
+ . lossy ( false )
603
+ . finish ( writer_box) ;
604
+
605
+ let writer_guard = Arc :: new ( ( non_blocking, guard) ) ;
606
+ let _ = WRITER_GUARD . set ( writer_guard. clone ( ) ) ;
607
+
608
+ let formatter = match env:: Env :: current ( ) {
609
+ env:: Env :: Mast => FormatterType :: Glog ( Glog :: default ( ) . with_timer ( LocalTime :: default ( ) ) ) ,
610
+ _ => FormatterType :: Default ( Format :: default ( ) . without_time ( ) . with_target ( false ) ) ,
611
+ } ;
436
612
let glog = fmt:: Layer :: default ( )
437
- . with_writer ( writer . clone ( ) )
438
- . event_format ( Glog :: default ( ) . with_timer ( LocalTime :: default ( ) ) )
613
+ . with_writer ( TaskRoutingWriter :: new ( writer_guard . 0 . clone ( ) ) )
614
+ . event_format ( PrefixedFormatter :: new ( formatter ) )
439
615
. fmt_fields ( GlogFields :: default ( ) . compact ( ) )
440
616
. with_ansi ( std:: io:: stderr ( ) . is_terminal ( ) )
441
617
. with_filter (
@@ -449,6 +625,22 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
449
625
. with_target ( "opentelemetry" , LevelFilter :: OFF ) , // otel has some log span under debug that we don't care about
450
626
) ;
451
627
628
+ let debug_layer = fmt:: Layer :: default ( )
629
+ . with_writer ( std:: io:: stderr)
630
+ . event_format ( Format :: default ( ) . without_time ( ) . with_target ( false ) )
631
+ . fmt_fields ( GlogFields :: default ( ) . compact ( ) )
632
+ . with_ansi ( std:: io:: stderr ( ) . is_terminal ( ) )
633
+ . with_filter (
634
+ Targets :: new ( )
635
+ . with_default ( LevelFilter :: from_level (
636
+ tracing:: Level :: from_str (
637
+ & std:: env:: var ( "RUST_LOG" ) . unwrap_or ( glog_level. to_string ( ) ) ,
638
+ )
639
+ . expect ( "Invalid log level" ) ,
640
+ ) )
641
+ . with_target ( "opentelemetry" , LevelFilter :: OFF ) ,
642
+ ) ;
643
+
452
644
use tracing_subscriber:: Registry ;
453
645
use tracing_subscriber:: layer:: SubscriberExt ;
454
646
use tracing_subscriber:: util:: SubscriberInitExt ;
@@ -465,6 +657,11 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
465
657
} else {
466
658
None
467
659
} )
660
+ . with ( if is_layer_enabled ( MONARCH_DEBUG ) {
661
+ Some ( debug_layer)
662
+ } else {
663
+ None
664
+ } )
468
665
. with ( if is_layer_enabled ( DISABLE_GLOG_TRACING ) {
469
666
Some ( glog)
470
667
} else {
0 commit comments