diff --git a/Cargo.lock b/Cargo.lock index be1f06cbbad..76a5ba064f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4525,11 +4525,10 @@ name = "tedge_actors" version = "1.4.2" dependencies = [ "async-trait", - "env_logger", "futures", - "log", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] @@ -4593,6 +4592,7 @@ dependencies = [ "tokio", "toml 0.8.8", "tracing", + "tracing-appender", "tracing-subscriber", "url", "which", @@ -5285,6 +5285,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror 1.0.69", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.27" diff --git a/Cargo.toml b/Cargo.toml index 1c7a095f55a..86d36a50442 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -191,6 +191,7 @@ toml = "0.8" tower = "0.4" tower-http = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } +tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["time", "env-filter"] } try-traits = "0.1" url = "2.3" diff --git a/crates/common/tedge_config/Cargo.toml b/crates/common/tedge_config/Cargo.toml index baf4d918ecc..3510f78dc9c 100644 --- a/crates/common/tedge_config/Cargo.toml +++ b/crates/common/tedge_config/Cargo.toml @@ -35,6 +35,7 @@ thiserror = { workspace = true } tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } +tracing-appender = { workspace = true } tracing-subscriber = { workspace = true } url = { workspace = true } which = { workspace = true } diff --git a/crates/common/tedge_config/src/system_toml/log_level.rs b/crates/common/tedge_config/src/system_toml/log_level.rs index 39f03a8ef00..76abbb7e079 100644 --- a/crates/common/tedge_config/src/system_toml/log_level.rs +++ b/crates/common/tedge_config/src/system_toml/log_level.rs @@ -5,6 +5,12 @@ use super::SystemTomlError; use crate::cli::LogConfigArgs; use std::io::IsTerminal; use std::str::FromStr; +use tracing::metadata::LevelFilter; +use tracing_appender::rolling::*; +use tracing_subscriber::filter::filter_fn; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::Layer; /// Configures and enables logging taking into account flags, env variables and file config. /// @@ -19,7 +25,8 @@ pub fn log_init( flags: &LogConfigArgs, config_dir: &Utf8Path, ) -> Result<(), SystemTomlError> { - let subscriber = tracing_subscriber::fmt() + // General logging + let log_layer = tracing_subscriber::fmt::layer() .with_writer(std::io::stderr) .with_ansi(std::io::stderr().is_terminal()) .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339()); @@ -28,22 +35,60 @@ pub fn log_init( .log_level .or(flags.debug.then_some(tracing::Level::DEBUG)); - if let Some(log_level) = log_level { - subscriber.with_max_level(log_level).init(); - return Ok(()); - } - - if std::env::var("RUST_LOG").is_ok() { - subscriber - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + let log_layer = if let Some(log_level) = log_level { + log_layer + .with_filter(LevelFilter::from_level(log_level)) + .boxed() + } else if std::env::var("RUST_LOG").is_ok() { + log_layer .with_file(true) .with_line_number(true) - .init(); - return Ok(()); - } - - let log_level = get_log_level(sname, config_dir)?; - subscriber.with_max_level(log_level).init(); + .with_filter(tracing_subscriber::EnvFilter::from_default_env()) + .boxed() + } else { + let log_level = get_log_level(sname, config_dir)?; + log_layer + .with_filter(LevelFilter::from_level(log_level)) + .boxed() + }; + + // Audit journal + let audit_appender = RollingFileAppender::builder() + .rotation(Rotation::DAILY) + .filename_prefix("tedge.audit.log") + .max_log_files(7); + let audit_layer = audit_appender + .build("/var/log/tedge") + .ok() + .map(|audit_appender| { + tracing_subscriber::fmt::layer() + .with_writer(audit_appender) + .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339()) + .with_filter(LevelFilter::INFO) + .with_filter(filter_fn(|metadata| metadata.target() == "Audit")) + }); + + // Actor traces + let trace_appender = RollingFileAppender::builder() + .rotation(Rotation::DAILY) + .filename_prefix("tedge.actors.log") + .max_log_files(2); + let trace_layer = trace_appender + .build("/var/log/tedge") + .ok() + .map(|trace_appender| { + tracing_subscriber::fmt::layer() + .with_writer(trace_appender) + .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339()) + .with_filter(LevelFilter::DEBUG) + .with_filter(filter_fn(|metadata| metadata.target() == "Actors")) + }); + + tracing_subscriber::registry() + .with(audit_layer) + .with(trace_layer) + .with(log_layer) + .init(); Ok(()) } diff --git a/crates/core/tedge/src/cli/config/commands/add.rs b/crates/core/tedge/src/cli/config/commands/add.rs index 8345dbfa4ef..2ef63543693 100644 --- a/crates/core/tedge/src/cli/config/commands/add.rs +++ b/crates/core/tedge/src/cli/config/commands/add.rs @@ -27,6 +27,7 @@ impl Command for AddConfigCommand { }) .await .map_err(anyhow::Error::new)?; + tracing::info!(target: "Audit", "tedge config add {} {}", &self.key, &self.value); Ok(()) } } diff --git a/crates/core/tedge/src/cli/config/commands/remove.rs b/crates/core/tedge/src/cli/config/commands/remove.rs index bc4a048dc6e..d839590c76c 100644 --- a/crates/core/tedge/src/cli/config/commands/remove.rs +++ b/crates/core/tedge/src/cli/config/commands/remove.rs @@ -23,6 +23,7 @@ impl Command for RemoveConfigCommand { }) .await .map_err(anyhow::Error::new)?; + tracing::info!(target: "Audit", "tedge config remove {} {}", &self.key, &self.value); Ok(()) } } diff --git a/crates/core/tedge/src/cli/config/commands/set.rs b/crates/core/tedge/src/cli/config/commands/set.rs index bb1b1bb100d..fb9b15148d1 100644 --- a/crates/core/tedge/src/cli/config/commands/set.rs +++ b/crates/core/tedge/src/cli/config/commands/set.rs @@ -27,6 +27,7 @@ impl Command for SetConfigCommand { }) .await .map_err(anyhow::Error::new)?; + tracing::info!(target: "Audit", "tedge config set {} {}", &self.key, &self.value); Ok(()) } } diff --git a/crates/core/tedge/src/cli/config/commands/unset.rs b/crates/core/tedge/src/cli/config/commands/unset.rs index 40512fa4dfd..599dd3aaaf9 100644 --- a/crates/core/tedge/src/cli/config/commands/unset.rs +++ b/crates/core/tedge/src/cli/config/commands/unset.rs @@ -19,6 +19,7 @@ impl Command for UnsetConfigCommand { .update_toml(&|dto, _reader| Ok(dto.try_unset_key(&self.key)?)) .await .map_err(anyhow::Error::new)?; + tracing::info!(target: "Audit", "tedge config unset {}", &self.key); Ok(()) } } diff --git a/crates/core/tedge_actors/Cargo.toml b/crates/core/tedge_actors/Cargo.toml index c25fb351823..8be2ab11646 100644 --- a/crates/core/tedge_actors/Cargo.toml +++ b/crates/core/tedge_actors/Cargo.toml @@ -17,7 +17,6 @@ test-helpers = [] [dependencies] async-trait = { workspace = true } futures = { workspace = true } -log = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, default_features = false, features = [ "sync", @@ -25,9 +24,9 @@ tokio = { workspace = true, default_features = false, features = [ "macros", "time", ] } +tracing = { workspace = true } [dev-dependencies] -env_logger = { workspace = true } # TODO: remove me tokio = { workspace = true, default_features = false, features = [ "rt-multi-thread", ] } diff --git a/crates/core/tedge_actors/src/message_boxes.rs b/crates/core/tedge_actors/src/message_boxes.rs index 8bd4028a56e..15f0cd501bd 100644 --- a/crates/core/tedge_actors/src/message_boxes.rs +++ b/crates/core/tedge_actors/src/message_boxes.rs @@ -93,7 +93,6 @@ use crate::RuntimeRequest; use async_trait::async_trait; use futures::channel::mpsc; use futures::StreamExt; -use log::debug; use std::fmt::Debug; #[async_trait] @@ -160,22 +159,22 @@ impl LoggingReceiver { } #[async_trait] -impl MessageReceiver for LoggingReceiver { +impl MessageReceiver for LoggingReceiver { async fn try_recv(&mut self) -> Result, RuntimeRequest> { let message = self.receiver.try_recv().await; - debug!(target: &self.name, "recv {:?}", message); + log_message_received(&self.name, &message); message } async fn recv(&mut self) -> Option { let message = self.receiver.recv().await; - debug!(target: &self.name, "recv {:?}", message); + log_message_received(&self.name, &message); message } async fn recv_signal(&mut self) -> Option { let message = self.receiver.recv_signal().await; - debug!(target: &self.name, "recv {:?}", message); + log_message_received(&self.name, &message); message } } @@ -208,8 +207,14 @@ impl Sender for LoggingSender { } } -pub fn log_message_sent(target: &str, message: I) { - debug!(target: target, "send {message:?}"); +#[inline] +pub fn log_message_received(actor: &str, message: &I) { + tracing::debug!(target: "Actors", actor, recv = ?message); +} + +#[inline] +pub fn log_message_sent(actor: &str, message: &I) { + tracing::debug!(target: "Actors", actor, send = ?message); } /// An unbounded receiver @@ -251,10 +256,10 @@ impl UnboundedLoggingReceiver { } #[async_trait] -impl MessageReceiver for UnboundedLoggingReceiver { +impl MessageReceiver for UnboundedLoggingReceiver { async fn try_recv(&mut self) -> Result, RuntimeRequest> { let message = self.next_message().await; - debug!(target: &self.name, "recv {:?}", message); + log_message_received(&self.name, &message); message } @@ -263,13 +268,13 @@ impl MessageReceiver for UnboundedLoggingReceiver Some(message), _ => None, }; - debug!(target: &self.name, "recv {:?}", message); + log_message_received(&self.name, &message); message } async fn recv_signal(&mut self) -> Option { let message = self.signal_receiver.next().await; - debug!(target: &self.name, "recv {:?}", message); + log_message_received(&self.name, &message); message } } diff --git a/crates/core/tedge_actors/src/runtime.rs b/crates/core/tedge_actors/src/runtime.rs index 91505d5bb24..f39eceae358 100644 --- a/crates/core/tedge_actors/src/runtime.rs +++ b/crates/core/tedge_actors/src/runtime.rs @@ -11,14 +11,14 @@ use crate::RuntimeRequestSink; use futures::channel::mpsc; use futures::prelude::*; use futures::stream::FuturesUnordered; -use log::debug; -use log::error; -use log::info; use std::collections::HashMap; use std::panic; use std::time::Duration; use tokio::task::JoinError; use tokio::task::JoinHandle; +use tracing::debug; +use tracing::error; +use tracing::info; /// Actions sent by actors to the runtime #[derive(Debug)] @@ -95,7 +95,7 @@ impl Runtime { /// and all the running tasks have reach completion (successfully or not). pub async fn run_to_completion(self) -> Result<(), RuntimeError> { if let Err(err) = Runtime::wait_for_completion(self.bg_task).await { - error!("Aborted due to {err}"); + error!(target: "Actors", "Aborted due to {err}"); std::process::exit(1) } @@ -138,7 +138,7 @@ impl RuntimeHandle { /// Send an action to the runtime async fn send(&mut self, action: RuntimeAction) -> Result<(), ChannelError> { - debug!(target: "Runtime", "schedule {:?}", action); + debug!(target: "Actors", "schedule {:?}", action); self.actions_sender.send(action).await?; Ok(()) } @@ -175,7 +175,7 @@ impl RuntimeActor { } async fn run(mut self) -> Result<(), RuntimeError> { - info!(target: "Runtime", "Started"); + info!(target: "Actors", "Started"); let mut aborting_error = None; let mut actors_count: usize = 0; loop { @@ -186,7 +186,7 @@ impl RuntimeActor { match action { RuntimeAction::Spawn(actor) => { let running_name = format!("{}-{}", actor.name(), actors_count); - info!(target: "Runtime", "Running {running_name}"); + info!(target: "Actors", "Running {running_name}"); self.send_event(RuntimeEvent::Started { task: running_name.clone(), }) @@ -196,14 +196,14 @@ impl RuntimeActor { actors_count += 1; } RuntimeAction::Shutdown => { - info!(target: "Runtime", "Shutting down"); + info!(target: "Actors", "Shutting down"); shutdown_actors(&mut self.running_actors).await; break; } } } None => { - info!(target: "Runtime", "Runtime actions channel closed, runtime stopping"); + info!(target: "Actors", "Runtime actions channel closed, runtime stopping"); shutdown_actors(&mut self.running_actors).await; break; } @@ -211,7 +211,7 @@ impl RuntimeActor { }, Some(finished_actor) = self.futures.next() => { if let Err(error) = self.handle_actor_finishing(finished_actor).await { - info!(target: "Runtime", "Shutting down on error: {error}"); + info!(target: "Actors", "Shutting down on error: {error}"); aborting_error = Some(error); shutdown_actors(&mut self.running_actors).await; break @@ -222,12 +222,12 @@ impl RuntimeActor { tokio::select! { _ = tokio::time::sleep(self.cleanup_duration) => { - error!(target: "Runtime", "Timeout waiting for all actors to shutdown"); + error!(target: "Actors", "Timeout waiting for all actors to shutdown"); for still_running in self.running_actors.keys() { - error!(target: "Runtime", "Failed to shutdown: {still_running}") + error!(target: "Actors", "Failed to shutdown: {still_running}") } } - _ = self.wait_for_actors_to_finish() => info!(target: "Runtime", "All actors have finished") + _ = self.wait_for_actors_to_finish() => info!(target: "Actors", "All actors have finished") } match aborting_error { @@ -248,18 +248,18 @@ impl RuntimeActor { ) -> Result<(), RuntimeError> { match finished_actor { Err(e) => { - error!(target: "Runtime", "Failed to execute actor: {e}"); + error!(target: "Actors", "Failed to execute actor: {e}"); Err(RuntimeError::JoinError(e)) } Ok(Ok(actor)) => { self.running_actors.remove(&actor); - info!(target: "Runtime", "Actor has finished: {actor}"); + info!(target: "Actors", "Actor has finished: {actor}"); self.send_event(RuntimeEvent::Stopped { task: actor }).await; Ok(()) } Ok(Err((actor, error))) => { self.running_actors.remove(&actor); - error!(target: "Runtime", "Actor {actor} has finished unsuccessfully: {error:?}"); + error!(target: "Actors", "Actor {actor} has finished unsuccessfully: {error:?}"); self.send_event(RuntimeEvent::Aborted { task: actor.clone(), error: format!("{error}"), @@ -273,7 +273,7 @@ impl RuntimeActor { async fn send_event(&mut self, event: RuntimeEvent) { if let Some(events) = &mut self.events { if let Err(e) = events.send(event).await { - error!(target: "Runtime", "Failed to send RuntimeEvent: {e}"); + error!(target: "Actors", "Failed to send RuntimeEvent: {e}"); } } } @@ -286,10 +286,10 @@ where for (running_as, sender) in a { match sender.send(RuntimeRequest::Shutdown).await { Ok(()) => { - debug!(target: "Runtime", "Successfully sent shutdown request to {running_as}") + debug!(target: "Actors", "Successfully sent shutdown request to {running_as}") } Err(e) => { - error!(target: "Runtime", "Failed to send shutdown request to {running_as}: {e:?}") + error!(target: "Actors", "Failed to send shutdown request to {running_as}: {e:?}") } } } @@ -421,11 +421,6 @@ mod tests { mpsc::Receiver, RuntimeActor, ) { - // TODO: remove logging or add something smarter because logging is useful - let _ = env_logger::builder() - .is_test(true) - .filter_level(log::LevelFilter::Trace) - .try_init(); let (actions_sender, actions_receiver) = mpsc::channel(16); let (events_sender, events_receiver) = mpsc::channel::(16); let ra = RuntimeActor::new( diff --git a/crates/core/tedge_actors/src/servers/message_boxes.rs b/crates/core/tedge_actors/src/servers/message_boxes.rs index 0f54201cc48..c289be97bc2 100644 --- a/crates/core/tedge_actors/src/servers/message_boxes.rs +++ b/crates/core/tedge_actors/src/servers/message_boxes.rs @@ -55,7 +55,7 @@ impl ConcurrentServerMessageBox { if let Err(err) = result { - log::error!("Fail to run a request to completion: {err}"); + tracing::error!(target: "Actors", "Fail to run a request to completion: {err}"); } } else => { @@ -73,7 +73,7 @@ impl ConcurrentServerMessageBox { if let Err(err) = result { - log::error!("Fail to run a request to completion: {err}"); + tracing::error!(target: "Actors", "Fail to run a request to completion: {err}"); } ControlFlow::Continue(()) }, diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index 9dbbe50a1f7..10d81ff0a32 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -65,6 +65,7 @@ impl Actor for WorkflowActor { } async fn run(mut self) -> Result<(), RuntimeError> { + tracing::info!(target: "Audit", "tedge-agent started"); self.workflow_repository.load().await; self.publish_operation_capabilities().await?; self.load_command_board().await?; @@ -95,11 +96,15 @@ impl Actor for WorkflowActor { ) .await { + tracing::info!(target: "Audit", "Updated capability {}", + updated_capability.topic.as_ref(), + ); self.mqtt_publisher.send(updated_capability).await? } } } } + tracing::info!(target: "Audit", "tedge-agent stopped"); Ok(()) } } @@ -159,6 +164,7 @@ impl WorkflowActor { Ok(Some(new_state)) => { self.persist_command_board().await?; if new_state.is_init() { + tracing::info!(target: "Audit", "Execute {operation} command, log = {}", log_file.path); self.process_command_update(new_state.with_log_path(&log_file.path)) .await?; } @@ -211,6 +217,11 @@ impl WorkflowActor { match action { OperationAction::Clear => { + tracing::info!( + target: "Audit", + "{} {operation} command", + if state.is_successful() {"Executed"} else { "Failed"}, + ); if let Some(invoking_command) = self.workflow_repository.invoking_command_state(&state) {