Skip to content

Commit b601dab

Browse files
authored
refactor: update log handling to support UTF-8 text and enhance related tests (#777)
1 parent 866fc93 commit b601dab

File tree

9 files changed

+297
-89
lines changed

9 files changed

+297
-89
lines changed

core/src/ten_manager/src/designer/log_watcher/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use serde_json::json;
1616
use tokio::sync::Mutex;
1717

1818
use crate::designer::DesignerState;
19-
use crate::fs::file_watcher::{FileContentStream, FileWatchOptions};
19+
use crate::fs::log_file_watcher::{LogFileContentStream, LogFileWatchOptions};
2020
use crate::pkg_info::property::get_log_file_path;
2121

2222
// Message types for WebSocket communication
@@ -32,7 +32,7 @@ pub struct SetAppBaseDir {
3232

3333
#[derive(Message, Debug, Serialize, Deserialize)]
3434
#[rtype(result = "()")]
35-
pub struct FileContent(pub Vec<u8>);
35+
pub struct FileContent(pub String);
3636

3737
#[derive(Message, Debug, Serialize, Deserialize)]
3838
#[rtype(result = "()")]
@@ -48,12 +48,12 @@ pub struct InfoMessage(pub String);
4848

4949
#[derive(Message)]
5050
#[rtype(result = "()")]
51-
pub struct StoreWatcher(pub FileContentStream);
51+
pub struct StoreWatcher(pub LogFileContentStream);
5252

5353
// WebSocket actor for log file watching.
5454
struct WsLogWatcher {
5555
app_base_dir: Option<String>,
56-
file_watcher: Option<Arc<Mutex<FileContentStream>>>,
56+
file_watcher: Option<Arc<Mutex<LogFileContentStream>>>,
5757
}
5858

5959
impl WsLogWatcher {
@@ -125,10 +125,10 @@ impl Handler<SetAppBaseDir> for WsLogWatcher {
125125
};
126126

127127
// Create file watch options.
128-
let options = FileWatchOptions::default();
128+
let options = LogFileWatchOptions::default();
129129

130130
// Start watching the file.
131-
match crate::fs::file_watcher::watch_file(
131+
match crate::fs::log_file_watcher::watch_log_file(
132132
&log_file_path,
133133
Some(options),
134134
)
@@ -160,8 +160,8 @@ impl Handler<FileContent> for WsLogWatcher {
160160
msg: FileContent,
161161
ctx: &mut Self::Context,
162162
) -> Self::Result {
163-
// Send the file content to the WebSocket client.
164-
ctx.binary(msg.0);
163+
// Send the file content as text to the WebSocket client.
164+
ctx.text(msg.0);
165165
}
166166
}
167167

core/src/ten_manager/src/fs/file_watcher.rs renamed to core/src/ten_manager/src/fs/log_file_watcher.rs

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,26 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); // 1 minute timeout.
2323
const DEFAULT_BUFFER_SIZE: usize = 4096; // Default read buffer size.
2424
const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_millis(100);
2525

26-
/// Stream of file content changes.
27-
pub struct FileContentStream {
28-
// Channel for receiving file content.
29-
content_rx: Receiver<Result<Vec<u8>>>,
26+
/// Stream of UTF-8 text file content changes.
27+
pub struct LogFileContentStream {
28+
// Channel for receiving file content as UTF-8 text.
29+
content_rx: Receiver<Result<String>>,
3030

3131
// Sender to signal stop request.
3232
stop_tx: Option<oneshot::Sender<()>>,
3333
}
3434

35-
impl FileContentStream {
35+
impl LogFileContentStream {
3636
/// Create a new FileContentStream.
3737
fn new(
38-
content_rx: Receiver<Result<Vec<u8>>>,
38+
content_rx: Receiver<Result<String>>,
3939
stop_tx: oneshot::Sender<()>,
4040
) -> Self {
4141
Self { content_rx, stop_tx: Some(stop_tx) }
4242
}
4343

44-
/// Get the next chunk of data from the file.
45-
pub async fn next(&mut self) -> Option<Result<Vec<u8>>> {
44+
/// Get the next chunk of text from the file.
45+
pub async fn next(&mut self) -> Option<Result<String>> {
4646
self.content_rx.recv().await
4747
}
4848

@@ -54,15 +54,15 @@ impl FileContentStream {
5454
}
5555
}
5656

57-
impl Drop for FileContentStream {
57+
impl Drop for LogFileContentStream {
5858
fn drop(&mut self) {
5959
self.stop();
6060
}
6161
}
6262

6363
/// Options for watching a file.
6464
#[derive(Clone)]
65-
pub struct FileWatchOptions {
65+
pub struct LogFileWatchOptions {
6666
/// Timeout for waiting for new content after reaching EOF.
6767
pub timeout: Duration,
6868

@@ -73,7 +73,7 @@ pub struct FileWatchOptions {
7373
pub check_interval: Duration,
7474
}
7575

76-
impl Default for FileWatchOptions {
76+
impl Default for LogFileWatchOptions {
7777
fn default() -> Self {
7878
Self {
7979
timeout: DEFAULT_TIMEOUT,
@@ -97,17 +97,17 @@ fn is_same_file(a: &Metadata, b: &Metadata) -> bool {
9797
}
9898
}
9999

100-
/// Watch a file for changes and stream its content.
100+
/// Watch a UTF-8 text file for changes and stream its content.
101101
///
102102
/// Returns a FileContentStream that can be used to read the content of the file
103103
/// as it changes. The stream will end when either:
104104
/// 1. The caller stops it by calling `stop()` or dropping the stream.
105105
/// 2. No new content is available after reaching EOF and the timeout is
106106
/// reached.
107-
pub async fn watch_file<P: AsRef<Path>>(
107+
pub async fn watch_log_file<P: AsRef<Path>>(
108108
path: P,
109-
options: Option<FileWatchOptions>,
110-
) -> Result<FileContentStream> {
109+
options: Option<LogFileWatchOptions>,
110+
) -> Result<LogFileContentStream> {
111111
let path = path.as_ref().to_path_buf();
112112

113113
// Ensure the file exists before we start watching it.
@@ -123,18 +123,18 @@ pub async fn watch_file<P: AsRef<Path>>(
123123

124124
// Spawn a task to watch the file.
125125
tokio::spawn(async move {
126-
watch_file_task(path, content_tx, stop_rx, options).await;
126+
watch_log_file_task(path, content_tx, stop_rx, options).await;
127127
});
128128

129-
Ok(FileContentStream::new(content_rx, stop_tx))
129+
Ok(LogFileContentStream::new(content_rx, stop_tx))
130130
}
131131

132132
/// Actual file watch task running in the background.
133-
async fn watch_file_task(
133+
async fn watch_log_file_task(
134134
path: PathBuf,
135-
content_tx: Sender<Result<Vec<u8>>>,
135+
content_tx: Sender<Result<String>>,
136136
mut stop_rx: oneshot::Receiver<()>,
137-
options: FileWatchOptions,
137+
options: LogFileWatchOptions,
138138
) {
139139
// Open the file.
140140
let mut file = match File::open(&path) {
@@ -157,15 +157,18 @@ async fn watch_file_task(
157157
let _ = content_tx.send(Err(anyhow!(e))).await;
158158
return;
159159
}
160-
let mut init_buf = Vec::new();
161-
match file.read_to_end(&mut init_buf) {
160+
161+
let mut init_content = String::new();
162+
match file.read_to_string(&mut init_content) {
162163
Ok(n) => {
163164
if n > 0 {
164-
let _ = content_tx.send(Ok(init_buf)).await;
165+
let _ = content_tx.send(Ok(init_content)).await;
165166
}
166167
}
167168
Err(e) => {
168-
let _ = content_tx.send(Err(anyhow!(e))).await;
169+
let _ = content_tx
170+
.send(Err(anyhow!("Failed to read file as UTF-8 text: {}", e)))
171+
.await;
169172
return;
170173
}
171174
}
@@ -226,17 +229,25 @@ async fn watch_file_task(
226229
}
227230

228231
let mut reader = BufReader::with_capacity(options.buffer_size, &file);
229-
let mut buf = Vec::with_capacity(options.buffer_size);
230-
match reader.read_until(0, &mut buf) {
232+
let mut line = String::with_capacity(options.buffer_size);
233+
match reader.read_line(&mut line) {
231234
Ok(n) if n > 0 => {
232235
last_pos += n as u64;
233-
let _ = content_tx.send(Ok(buf)).await;
236+
let _ = content_tx.send(Ok(line)).await;
234237
last_activity = Instant::now();
235238
}
236239
Ok(_) => {}
237240
Err(e) => {
238-
eprintln!("Error reading from file: {}", e);
239-
let _ = content_tx.send(Err(anyhow::anyhow!(e))).await;
241+
// Specific error for UTF-8 decoding failures.
242+
if e.kind() == std::io::ErrorKind::InvalidData {
243+
eprintln!("Error: Invalid UTF-8 data in file");
244+
let _ = content_tx
245+
.send(Err(anyhow!("Invalid UTF-8 data in file")))
246+
.await;
247+
} else {
248+
eprintln!("Error reading from file: {}", e);
249+
let _ = content_tx.send(Err(anyhow::anyhow!(e))).await;
250+
}
240251
break;
241252
}
242253
}

core/src/ten_manager/src/fs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
77
pub mod file_type;
8-
pub mod file_watcher;
98
pub mod json;
9+
pub mod log_file_watcher;
1010

1111
use std::env;
1212
use std::path::{Path, PathBuf};

core/src/ten_manager/src/log/mod.rs

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,100 @@
44
// Licensed under the Apache License, Version 2.0, with certain conditions.
55
// Refer to the "LICENSE" file in the root directory for more information.
66
//
7-
use anyhow::Result;
7+
use anyhow::{anyhow, Result};
8+
use serde_json::Value;
89
use std::collections::HashMap;
910

10-
pub struct ExtensionInfo {
11-
pub thread_id: u64,
11+
pub struct ExtensionThreadInfo {
12+
pub extensions: Vec<String>,
13+
}
14+
15+
pub struct AppInfo {
16+
pub extension_threads: HashMap<String, ExtensionThreadInfo>,
1217
}
1318

1419
pub struct GraphResourcesLog {
15-
pub app_uri: Option<String>,
1620
pub graph_id: String,
1721
pub graph_name: String,
18-
pub extensions: HashMap<String, ExtensionInfo>,
22+
pub apps: HashMap<Option<String>, AppInfo>,
1923
}
2024

2125
pub fn parse_graph_resources_log(
2226
log_message: &str,
2327
graph_resources_log: &mut GraphResourcesLog,
2428
) -> Result<()> {
29+
// Check if the log level is 'M'.
30+
let parts: Vec<&str> = log_message.split_whitespace().collect();
31+
if parts.len() < 4 {
32+
return Ok(());
33+
}
34+
35+
// Check for log level 'M' - it should be in the fourth position after the
36+
// timestamp and process/thread IDs.
37+
let log_level_pos = 3;
38+
if parts.len() <= log_level_pos || parts[log_level_pos] != "M" {
39+
return Ok(());
40+
}
41+
42+
// Find the "[graph resources]" marker.
43+
if !log_message.contains("[graph resources]") {
44+
return Ok(());
45+
}
46+
47+
// Extract the JSON content after "[graph resources]".
48+
let json_content = log_message
49+
.split("[graph resources]")
50+
.nth(1)
51+
.ok_or_else(|| anyhow!("Failed to extract JSON content"))?
52+
.trim();
53+
54+
// Parse the JSON content.
55+
let json_value: Value = serde_json::from_str(json_content)?;
56+
57+
// Extract data from the JSON.
58+
let app_uri = json_value.get("app_uri").and_then(|v| v.as_str());
59+
let graph_id = json_value["graph id"]
60+
.as_str()
61+
.ok_or_else(|| anyhow!("Missing graph id"))?;
62+
let graph_name = json_value["graph name"]
63+
.as_str()
64+
.ok_or_else(|| anyhow!("Missing graph name"))?;
65+
66+
// Update graph_resources_log with graph ID and name.
67+
graph_resources_log.graph_id = graph_id.to_string();
68+
graph_resources_log.graph_name = graph_name.to_string();
69+
70+
// Create or get the AppInfo for this app_uri.
71+
let app_key = app_uri.map(|uri| uri.to_string());
72+
let app_info = graph_resources_log
73+
.apps
74+
.entry(app_key)
75+
.or_insert_with(|| AppInfo { extension_threads: HashMap::new() });
76+
77+
// Process extension_threads if present.
78+
if let Some(extension_threads) = json_value.get("extension_threads") {
79+
if let Some(extension_threads_obj) = extension_threads.as_object() {
80+
for (thread_id, thread_info) in extension_threads_obj {
81+
if let Some(extensions_array) = thread_info.get("extensions") {
82+
if let Some(extensions) = extensions_array.as_array() {
83+
let mut extension_names = Vec::new();
84+
for ext in extensions {
85+
if let Some(ext_name) = ext.as_str() {
86+
extension_names.push(ext_name.to_string());
87+
}
88+
}
89+
90+
let thread_info =
91+
ExtensionThreadInfo { extensions: extension_names };
92+
93+
app_info
94+
.extension_threads
95+
.insert(thread_id.to_string(), thread_info);
96+
}
97+
}
98+
}
99+
}
100+
}
101+
25102
Ok(())
26103
}

core/src/ten_manager/tests/test_case/designer/log_watcher.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,10 @@ async fn test_ws_log_watcher_endpoint() {
9393
let mut received_content = false;
9494
while let Some(msg) = read.next().await {
9595
let msg = msg.unwrap();
96-
if msg.is_binary() {
97-
let binary = msg.into_data();
98-
let content = String::from_utf8_lossy(&binary);
99-
println!("Received binary: {}", content);
100-
if content.contains(test_content) {
96+
if msg.is_text() {
97+
let text = msg.to_text().unwrap();
98+
println!("Received text: {}", text);
99+
if text.contains(test_content) {
101100
received_content = true;
102101
break;
103102
}

0 commit comments

Comments
 (0)