Skip to content

refactor: update log handling to use LogLineInfo struct #778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions core/src/ten_manager/src/designer/log_watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio::sync::Mutex;

use crate::designer::DesignerState;
use crate::fs::log_file_watcher::{LogFileContentStream, LogFileWatchOptions};
use crate::log::LogLineInfo;
use crate::pkg_info::property::get_log_file_path;

// Message types for WebSocket communication
Expand All @@ -32,7 +33,7 @@ pub struct SetAppBaseDir {

#[derive(Message, Debug, Serialize, Deserialize)]
#[rtype(result = "()")]
pub struct FileContent(pub String);
pub struct FileContent(pub LogLineInfo);

#[derive(Message, Debug, Serialize, Deserialize)]
#[rtype(result = "()")]
Expand Down Expand Up @@ -160,8 +161,19 @@ impl Handler<FileContent> for WsLogWatcher {
msg: FileContent,
ctx: &mut Self::Context,
) -> Self::Result {
// Send the file content as text to the WebSocket client.
ctx.text(msg.0);
// Send the entire LogLineInfo as JSON to the WebSocket client.
match serde_json::to_string(&msg.0) {
Ok(json_str) => {
ctx.text(json_str);
}
Err(e) => {
// Log the serialization error.
eprintln!("Error serializing LogLineInfo to JSON: {}", e);

// Fallback to just the line content if serialization fails.
ctx.text(msg.0.line);
}
}
}
}

Expand Down
78 changes: 60 additions & 18 deletions core/src/ten_manager/src/fs/log_file_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// Refer to the "LICENSE" file in the root directory for more information.
//
use std::fs::{File, Metadata};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

Expand All @@ -19,14 +19,17 @@ use anyhow::{anyhow, Result};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;

use crate::log::{extract_extension_from_log_line, parse_graph_resources_log};
use crate::log::{GraphResourcesLog, LogLineInfo, LogLineMetadata};

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); // 1 minute timeout.
const DEFAULT_BUFFER_SIZE: usize = 4096; // Default read buffer size.
const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_millis(100);

/// Stream of UTF-8 text file content changes.
pub struct LogFileContentStream {
// Channel for receiving file content as UTF-8 text.
content_rx: Receiver<Result<String>>,
content_rx: Receiver<Result<LogLineInfo>>,

// Sender to signal stop request.
stop_tx: Option<oneshot::Sender<()>>,
Expand All @@ -35,14 +38,14 @@ pub struct LogFileContentStream {
impl LogFileContentStream {
/// Create a new FileContentStream.
fn new(
content_rx: Receiver<Result<String>>,
content_rx: Receiver<Result<LogLineInfo>>,
stop_tx: oneshot::Sender<()>,
) -> Self {
Self { content_rx, stop_tx: Some(stop_tx) }
}

/// Get the next chunk of text from the file.
pub async fn next(&mut self) -> Option<Result<String>> {
pub async fn next(&mut self) -> Option<Result<LogLineInfo>> {
self.content_rx.recv().await
}

Expand Down Expand Up @@ -132,10 +135,19 @@ pub async fn watch_log_file<P: AsRef<Path>>(
/// Actual file watch task running in the background.
async fn watch_log_file_task(
path: PathBuf,
content_tx: Sender<Result<String>>,
content_tx: Sender<Result<LogLineInfo>>,
mut stop_rx: oneshot::Receiver<()>,
options: LogFileWatchOptions,
) {
// Create a GraphResourcesLog instance that will be used throughout this
// task's lifetime.
let mut graph_resources_log = GraphResourcesLog {
graph_id: String::new(),
graph_name: String::new(),
app_uri: None,
extension_threads: std::collections::HashMap::new(),
};

// Open the file.
let mut file = match File::open(&path) {
Ok(f) => f,
Expand All @@ -158,19 +170,21 @@ async fn watch_log_file_task(
return;
}

let mut init_content = String::new();
match file.read_to_string(&mut init_content) {
Ok(n) => {
if n > 0 {
let _ = content_tx.send(Ok(init_content)).await;
}
}
Err(e) => {
let _ = content_tx
.send(Err(anyhow!("Failed to read file as UTF-8 text: {}", e)))
.await;
return;
// Instead of reading the entire file at once, read it line by line.
let mut reader = BufReader::new(&file);
let mut line = String::new();
while let Ok(bytes_read) = reader.read_line(&mut line) {
if bytes_read == 0 {
break; // End of file.
}

// Process each line.
let metadata = process_log_line(&line, &mut graph_resources_log);

let log_line_info = LogLineInfo { line: line.clone(), metadata };

let _ = content_tx.send(Ok(log_line_info)).await;
line.clear(); // Clear for the next line.
}

// Reset last_pos to the current EOF.
Expand Down Expand Up @@ -233,7 +247,15 @@ async fn watch_log_file_task(
match reader.read_line(&mut line) {
Ok(n) if n > 0 => {
last_pos += n as u64;
let _ = content_tx.send(Ok(line)).await;

// Process the new line.
let metadata = process_log_line(&line, &mut graph_resources_log);

let log_line_info = LogLineInfo {
line,
metadata,
};
let _ = content_tx.send(Ok(log_line_info)).await;
last_activity = Instant::now();
}
Ok(_) => {}
Expand Down Expand Up @@ -262,3 +284,23 @@ async fn watch_log_file_task(
}
}
}

/// Process a log line: try to parse as graph resources log first, then try to
/// extract extension information.
fn process_log_line(
log_line: &str,
graph_resources_log: &mut GraphResourcesLog,
) -> Option<LogLineMetadata> {
// First try to parse as graph resources log.
match parse_graph_resources_log(log_line, graph_resources_log) {
Ok(_) => {
// Successfully parsed as graph resources log, but no metadata to
// return.
None
}
Err(_) => {
// Not a graph resources log, try to extract extension information.
extract_extension_from_log_line(log_line, graph_resources_log)
}
}
}
119 changes: 102 additions & 17 deletions core/src/ten_manager/src/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,111 @@
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
use std::collections::HashMap;

use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

pub struct ExtensionThreadInfo {
pub extensions: Vec<String>,
}

pub struct AppInfo {
pub extension_threads: HashMap<String, ExtensionThreadInfo>,
}

pub struct GraphResourcesLog {
pub graph_id: String,
pub graph_name: String,
pub apps: HashMap<Option<String>, AppInfo>,
pub app_uri: Option<String>,
pub extension_threads: HashMap<String, ExtensionThreadInfo>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogLineMetadata {
pub extension: Option<String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogLineInfo {
pub line: String,
pub metadata: Option<LogLineMetadata>,
}

/// Extracts extension information from a log line.
///
/// This function checks if the log line contains extension information in the
/// format "[extension_name]" and returns metadata about the extension if found
/// in the graph resources.
pub fn extract_extension_from_log_line(
log_message: &str,
graph_resources_log: &GraphResourcesLog,
) -> Option<LogLineMetadata> {
// Split the log message by whitespace for initial parsing.
let parts: Vec<&str> = log_message.split_whitespace().collect();
if parts.len() < 5 {
// Need at least date, time, process/thread IDs, log level, and content.
return None;
}

// Extract the process ID and thread ID.
// Format expected: "processID(threadID)".
let process_thread_part = parts[2];
if !process_thread_part.contains('(') || !process_thread_part.contains(')')
{
return None;
}

let thread_id = process_thread_part
.split('(')
.nth(1)? // Get the part after '('.
.trim_end_matches(')'); // Remove the trailing ')'.

// Check if the thread ID exists in graph_resources_log.extension_threads.
if !graph_resources_log.extension_threads.contains_key(thread_id) {
return None;
}

// Check if the log level is 'M', if so, return None.
let log_level_pos = 3;
if parts[log_level_pos] == "M" {
return None;
}

// Find the content part (everything after the function name part).
let function_part = parts[log_level_pos + 1]; // This is the function@file:line part.
if !function_part.contains('@') {
return None;
}

// The content starts from the position after the function part.
let content_index = log_message.find(function_part)? + function_part.len();
let content = log_message[content_index..].trim();

// Check if content begins with [...], if not, return None.
if !content.starts_with('[') || !content.contains(']') {
return None;
}

// Extract the extension name from [...].
let end_pos = content.find(']')?;
if end_pos <= 1 {
return None; // No content inside brackets.
}

let extension_name = &content[1..end_pos];

// Check if extension_name exists in the extension_threads for the given
// thread_id.
if let Some(thread_info) =
graph_resources_log.extension_threads.get(thread_id)
{
if thread_info.extensions.contains(&extension_name.to_string()) {
// Create and return LogLineMetadata.
return Some(LogLineMetadata {
extension: Some(extension_name.to_string()),
});
}
}

None
}

pub fn parse_graph_resources_log(
Expand All @@ -29,19 +118,19 @@ pub fn parse_graph_resources_log(
// Check if the log level is 'M'.
let parts: Vec<&str> = log_message.split_whitespace().collect();
if parts.len() < 4 {
return Ok(());
return Err(anyhow!("Not a valid graph resources log message"));
}

// Check for log level 'M' - it should be in the fourth position after the
// timestamp and process/thread IDs.
let log_level_pos = 3;
if parts.len() <= log_level_pos || parts[log_level_pos] != "M" {
return Ok(());
return Err(anyhow!("Not a valid graph resources log message"));
}

// Find the "[graph resources]" marker.
if !log_message.contains("[graph resources]") {
return Ok(());
return Err(anyhow!("Not a valid graph resources log message"));
}

// Extract the JSON content after "[graph resources]".
Expand All @@ -66,13 +155,9 @@ pub fn parse_graph_resources_log(
// Update graph_resources_log with graph ID and name.
graph_resources_log.graph_id = graph_id.to_string();
graph_resources_log.graph_name = graph_name.to_string();

// Create or get the AppInfo for this app_uri.
let app_key = app_uri.map(|uri| uri.to_string());
let app_info = graph_resources_log
.apps
.entry(app_key)
.or_insert_with(|| AppInfo { extension_threads: HashMap::new() });
if let Some(uri) = app_uri {
graph_resources_log.app_uri = Some(uri.to_string());
}

// Process extension_threads if present.
if let Some(extension_threads) = json_value.get("extension_threads") {
Expand All @@ -90,7 +175,7 @@ pub fn parse_graph_resources_log(
let thread_info =
ExtensionThreadInfo { extensions: extension_names };

app_info
graph_resources_log
.extension_threads
.insert(thread_id.to_string(), thread_info);
}
Expand Down
19 changes: 15 additions & 4 deletions core/src/ten_manager/tests/test_case/designer/log_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tempfile::tempdir;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};

use ten_manager::designer::log_watcher::log_watcher_endpoint;
use ten_manager::log::LogLineInfo;

use crate::test_case::common::builtin_server::start_test_server;

Expand Down Expand Up @@ -89,16 +90,26 @@ async fn test_ws_log_watcher_endpoint() {
let test_content = "Test log message\n";
append_to_log_file(&log_file_path, test_content);

// Check if we receive the content.
// Check if we receive the content - with timeout of 10 seconds.
let mut received_content = false;
while let Some(msg) = read.next().await {
if let Ok(Some(msg)) =
tokio::time::timeout(Duration::from_secs(10), read.next()).await
{
let msg = msg.unwrap();
if msg.is_text() {
let text = msg.to_text().unwrap();
println!("Received text: {}", text);
if text.contains(test_content) {

// Try to parse the JSON response.
if let Ok(log_line_info) = serde_json::from_str::<LogLineInfo>(text)
{
if log_line_info.line.contains(test_content.trim()) {
received_content = true;
}
} else if text.contains(test_content.trim()) {
// Fallback to direct text comparison (for backward
// compatibility).
received_content = true;
break;
}
}
}
Expand Down
Loading
Loading