Skip to content

Commit 0297933

Browse files
authored
refactor: update log handling to use LogLineInfo struct (#778)
1 parent b601dab commit 0297933

File tree

8 files changed

+544
-67
lines changed

8 files changed

+544
-67
lines changed

core/src/ten_manager/src/designer/log_watcher/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use tokio::sync::Mutex;
1717

1818
use crate::designer::DesignerState;
1919
use crate::fs::log_file_watcher::{LogFileContentStream, LogFileWatchOptions};
20+
use crate::log::LogLineInfo;
2021
use crate::pkg_info::property::get_log_file_path;
2122

2223
// Message types for WebSocket communication
@@ -32,7 +33,7 @@ pub struct SetAppBaseDir {
3233

3334
#[derive(Message, Debug, Serialize, Deserialize)]
3435
#[rtype(result = "()")]
35-
pub struct FileContent(pub String);
36+
pub struct FileContent(pub LogLineInfo);
3637

3738
#[derive(Message, Debug, Serialize, Deserialize)]
3839
#[rtype(result = "()")]
@@ -160,8 +161,19 @@ impl Handler<FileContent> for WsLogWatcher {
160161
msg: FileContent,
161162
ctx: &mut Self::Context,
162163
) -> Self::Result {
163-
// Send the file content as text to the WebSocket client.
164-
ctx.text(msg.0);
164+
// Send the entire LogLineInfo as JSON to the WebSocket client.
165+
match serde_json::to_string(&msg.0) {
166+
Ok(json_str) => {
167+
ctx.text(json_str);
168+
}
169+
Err(e) => {
170+
// Log the serialization error.
171+
eprintln!("Error serializing LogLineInfo to JSON: {}", e);
172+
173+
// Fallback to just the line content if serialization fails.
174+
ctx.text(msg.0.line);
175+
}
176+
}
165177
}
166178
}
167179

core/src/ten_manager/src/fs/log_file_watcher.rs

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
77
use std::fs::{File, Metadata};
8-
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
8+
use std::io::{BufRead, BufReader, Seek, SeekFrom};
99
use std::path::{Path, PathBuf};
1010
use std::time::{Duration, Instant};
1111

@@ -19,14 +19,17 @@ use anyhow::{anyhow, Result};
1919
use tokio::sync::mpsc::{self, Receiver, Sender};
2020
use tokio::sync::oneshot;
2121

22+
use crate::log::{extract_extension_from_log_line, parse_graph_resources_log};
23+
use crate::log::{GraphResourcesLog, LogLineInfo, LogLineMetadata};
24+
2225
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); // 1 minute timeout.
2326
const DEFAULT_BUFFER_SIZE: usize = 4096; // Default read buffer size.
2427
const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_millis(100);
2528

2629
/// Stream of UTF-8 text file content changes.
2730
pub struct LogFileContentStream {
2831
// Channel for receiving file content as UTF-8 text.
29-
content_rx: Receiver<Result<String>>,
32+
content_rx: Receiver<Result<LogLineInfo>>,
3033

3134
// Sender to signal stop request.
3235
stop_tx: Option<oneshot::Sender<()>>,
@@ -35,14 +38,14 @@ pub struct LogFileContentStream {
3538
impl LogFileContentStream {
3639
/// Create a new FileContentStream.
3740
fn new(
38-
content_rx: Receiver<Result<String>>,
41+
content_rx: Receiver<Result<LogLineInfo>>,
3942
stop_tx: oneshot::Sender<()>,
4043
) -> Self {
4144
Self { content_rx, stop_tx: Some(stop_tx) }
4245
}
4346

4447
/// Get the next chunk of text from the file.
45-
pub async fn next(&mut self) -> Option<Result<String>> {
48+
pub async fn next(&mut self) -> Option<Result<LogLineInfo>> {
4649
self.content_rx.recv().await
4750
}
4851

@@ -132,10 +135,19 @@ pub async fn watch_log_file<P: AsRef<Path>>(
132135
/// Actual file watch task running in the background.
133136
async fn watch_log_file_task(
134137
path: PathBuf,
135-
content_tx: Sender<Result<String>>,
138+
content_tx: Sender<Result<LogLineInfo>>,
136139
mut stop_rx: oneshot::Receiver<()>,
137140
options: LogFileWatchOptions,
138141
) {
142+
// Create a GraphResourcesLog instance that will be used throughout this
143+
// task's lifetime.
144+
let mut graph_resources_log = GraphResourcesLog {
145+
graph_id: String::new(),
146+
graph_name: String::new(),
147+
app_uri: None,
148+
extension_threads: std::collections::HashMap::new(),
149+
};
150+
139151
// Open the file.
140152
let mut file = match File::open(&path) {
141153
Ok(f) => f,
@@ -158,19 +170,21 @@ async fn watch_log_file_task(
158170
return;
159171
}
160172

161-
let mut init_content = String::new();
162-
match file.read_to_string(&mut init_content) {
163-
Ok(n) => {
164-
if n > 0 {
165-
let _ = content_tx.send(Ok(init_content)).await;
166-
}
167-
}
168-
Err(e) => {
169-
let _ = content_tx
170-
.send(Err(anyhow!("Failed to read file as UTF-8 text: {}", e)))
171-
.await;
172-
return;
173+
// Instead of reading the entire file at once, read it line by line.
174+
let mut reader = BufReader::new(&file);
175+
let mut line = String::new();
176+
while let Ok(bytes_read) = reader.read_line(&mut line) {
177+
if bytes_read == 0 {
178+
break; // End of file.
173179
}
180+
181+
// Process each line.
182+
let metadata = process_log_line(&line, &mut graph_resources_log);
183+
184+
let log_line_info = LogLineInfo { line: line.clone(), metadata };
185+
186+
let _ = content_tx.send(Ok(log_line_info)).await;
187+
line.clear(); // Clear for the next line.
174188
}
175189

176190
// Reset last_pos to the current EOF.
@@ -233,7 +247,15 @@ async fn watch_log_file_task(
233247
match reader.read_line(&mut line) {
234248
Ok(n) if n > 0 => {
235249
last_pos += n as u64;
236-
let _ = content_tx.send(Ok(line)).await;
250+
251+
// Process the new line.
252+
let metadata = process_log_line(&line, &mut graph_resources_log);
253+
254+
let log_line_info = LogLineInfo {
255+
line,
256+
metadata,
257+
};
258+
let _ = content_tx.send(Ok(log_line_info)).await;
237259
last_activity = Instant::now();
238260
}
239261
Ok(_) => {}
@@ -262,3 +284,23 @@ async fn watch_log_file_task(
262284
}
263285
}
264286
}
287+
288+
/// Process a log line: try to parse as graph resources log first, then try to
289+
/// extract extension information.
290+
fn process_log_line(
291+
log_line: &str,
292+
graph_resources_log: &mut GraphResourcesLog,
293+
) -> Option<LogLineMetadata> {
294+
// First try to parse as graph resources log.
295+
match parse_graph_resources_log(log_line, graph_resources_log) {
296+
Ok(_) => {
297+
// Successfully parsed as graph resources log, but no metadata to
298+
// return.
299+
None
300+
}
301+
Err(_) => {
302+
// Not a graph resources log, try to extract extension information.
303+
extract_extension_from_log_line(log_line, graph_resources_log)
304+
}
305+
}
306+
}

core/src/ten_manager/src/log/mod.rs

Lines changed: 102 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,111 @@
44
// Licensed under the Apache License, Version 2.0, with certain conditions.
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
7+
use std::collections::HashMap;
8+
79
use anyhow::{anyhow, Result};
10+
use serde::{Deserialize, Serialize};
811
use serde_json::Value;
9-
use std::collections::HashMap;
1012

1113
pub struct ExtensionThreadInfo {
1214
pub extensions: Vec<String>,
1315
}
1416

15-
pub struct AppInfo {
16-
pub extension_threads: HashMap<String, ExtensionThreadInfo>,
17-
}
18-
1917
pub struct GraphResourcesLog {
2018
pub graph_id: String,
2119
pub graph_name: String,
22-
pub apps: HashMap<Option<String>, AppInfo>,
20+
pub app_uri: Option<String>,
21+
pub extension_threads: HashMap<String, ExtensionThreadInfo>,
22+
}
23+
24+
#[derive(Debug, Clone, Serialize, Deserialize)]
25+
pub struct LogLineMetadata {
26+
pub extension: Option<String>,
27+
}
28+
29+
#[derive(Debug, Clone, Serialize, Deserialize)]
30+
pub struct LogLineInfo {
31+
pub line: String,
32+
pub metadata: Option<LogLineMetadata>,
33+
}
34+
35+
/// Extracts extension information from a log line.
36+
///
37+
/// This function checks if the log line contains extension information in the
38+
/// format "[extension_name]" and returns metadata about the extension if found
39+
/// in the graph resources.
40+
pub fn extract_extension_from_log_line(
41+
log_message: &str,
42+
graph_resources_log: &GraphResourcesLog,
43+
) -> Option<LogLineMetadata> {
44+
// Split the log message by whitespace for initial parsing.
45+
let parts: Vec<&str> = log_message.split_whitespace().collect();
46+
if parts.len() < 5 {
47+
// Need at least date, time, process/thread IDs, log level, and content.
48+
return None;
49+
}
50+
51+
// Extract the process ID and thread ID.
52+
// Format expected: "processID(threadID)".
53+
let process_thread_part = parts[2];
54+
if !process_thread_part.contains('(') || !process_thread_part.contains(')')
55+
{
56+
return None;
57+
}
58+
59+
let thread_id = process_thread_part
60+
.split('(')
61+
.nth(1)? // Get the part after '('.
62+
.trim_end_matches(')'); // Remove the trailing ')'.
63+
64+
// Check if the thread ID exists in graph_resources_log.extension_threads.
65+
if !graph_resources_log.extension_threads.contains_key(thread_id) {
66+
return None;
67+
}
68+
69+
// Check if the log level is 'M', if so, return None.
70+
let log_level_pos = 3;
71+
if parts[log_level_pos] == "M" {
72+
return None;
73+
}
74+
75+
// Find the content part (everything after the function name part).
76+
let function_part = parts[log_level_pos + 1]; // This is the function@file:line part.
77+
if !function_part.contains('@') {
78+
return None;
79+
}
80+
81+
// The content starts from the position after the function part.
82+
let content_index = log_message.find(function_part)? + function_part.len();
83+
let content = log_message[content_index..].trim();
84+
85+
// Check if content begins with [...], if not, return None.
86+
if !content.starts_with('[') || !content.contains(']') {
87+
return None;
88+
}
89+
90+
// Extract the extension name from [...].
91+
let end_pos = content.find(']')?;
92+
if end_pos <= 1 {
93+
return None; // No content inside brackets.
94+
}
95+
96+
let extension_name = &content[1..end_pos];
97+
98+
// Check if extension_name exists in the extension_threads for the given
99+
// thread_id.
100+
if let Some(thread_info) =
101+
graph_resources_log.extension_threads.get(thread_id)
102+
{
103+
if thread_info.extensions.contains(&extension_name.to_string()) {
104+
// Create and return LogLineMetadata.
105+
return Some(LogLineMetadata {
106+
extension: Some(extension_name.to_string()),
107+
});
108+
}
109+
}
110+
111+
None
23112
}
24113

25114
pub fn parse_graph_resources_log(
@@ -29,19 +118,19 @@ pub fn parse_graph_resources_log(
29118
// Check if the log level is 'M'.
30119
let parts: Vec<&str> = log_message.split_whitespace().collect();
31120
if parts.len() < 4 {
32-
return Ok(());
121+
return Err(anyhow!("Not a valid graph resources log message"));
33122
}
34123

35124
// Check for log level 'M' - it should be in the fourth position after the
36125
// timestamp and process/thread IDs.
37126
let log_level_pos = 3;
38127
if parts.len() <= log_level_pos || parts[log_level_pos] != "M" {
39-
return Ok(());
128+
return Err(anyhow!("Not a valid graph resources log message"));
40129
}
41130

42131
// Find the "[graph resources]" marker.
43132
if !log_message.contains("[graph resources]") {
44-
return Ok(());
133+
return Err(anyhow!("Not a valid graph resources log message"));
45134
}
46135

47136
// Extract the JSON content after "[graph resources]".
@@ -66,13 +155,9 @@ pub fn parse_graph_resources_log(
66155
// Update graph_resources_log with graph ID and name.
67156
graph_resources_log.graph_id = graph_id.to_string();
68157
graph_resources_log.graph_name = graph_name.to_string();
69-
70-
// Create or get the AppInfo for this app_uri.
71-
let app_key = app_uri.map(|uri| uri.to_string());
72-
let app_info = graph_resources_log
73-
.apps
74-
.entry(app_key)
75-
.or_insert_with(|| AppInfo { extension_threads: HashMap::new() });
158+
if let Some(uri) = app_uri {
159+
graph_resources_log.app_uri = Some(uri.to_string());
160+
}
76161

77162
// Process extension_threads if present.
78163
if let Some(extension_threads) = json_value.get("extension_threads") {
@@ -90,7 +175,7 @@ pub fn parse_graph_resources_log(
90175
let thread_info =
91176
ExtensionThreadInfo { extensions: extension_names };
92177

93-
app_info
178+
graph_resources_log
94179
.extension_threads
95180
.insert(thread_id.to_string(), thread_info);
96181
}

core/src/ten_manager/tests/test_case/designer/log_watcher.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use tempfile::tempdir;
1616
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
1717

1818
use ten_manager::designer::log_watcher::log_watcher_endpoint;
19+
use ten_manager::log::LogLineInfo;
1920

2021
use crate::test_case::common::builtin_server::start_test_server;
2122

@@ -89,16 +90,26 @@ async fn test_ws_log_watcher_endpoint() {
8990
let test_content = "Test log message\n";
9091
append_to_log_file(&log_file_path, test_content);
9192

92-
// Check if we receive the content.
93+
// Check if we receive the content - with timeout of 10 seconds.
9394
let mut received_content = false;
94-
while let Some(msg) = read.next().await {
95+
if let Ok(Some(msg)) =
96+
tokio::time::timeout(Duration::from_secs(10), read.next()).await
97+
{
9598
let msg = msg.unwrap();
9699
if msg.is_text() {
97100
let text = msg.to_text().unwrap();
98101
println!("Received text: {}", text);
99-
if text.contains(test_content) {
102+
103+
// Try to parse the JSON response.
104+
if let Ok(log_line_info) = serde_json::from_str::<LogLineInfo>(text)
105+
{
106+
if log_line_info.line.contains(test_content.trim()) {
107+
received_content = true;
108+
}
109+
} else if text.contains(test_content.trim()) {
110+
// Fallback to direct text comparison (for backward
111+
// compatibility).
100112
received_content = true;
101-
break;
102113
}
103114
}
104115
}

0 commit comments

Comments
 (0)