1515import re
1616from datetime import datetime
1717from pathlib import Path
18- from typing import List , Optional , Tuple , Union
18+ from typing import Dict , List , Optional , Tuple , Union
1919
2020from camel .logger import get_logger
2121from camel .toolkits .base import BaseToolkit
2626
2727
2828@MCPServer ()
29- class FileWriteToolkit (BaseToolkit ):
30- r"""A toolkit for creating, writing, and modifying text in files.
31-
32- This class provides cross-platform (macOS, Linux, Windows) support for
33- writing to various file formats (Markdown, DOCX, PDF, and plaintext),
34- replacing text in existing files, automatic filename uniquification to
35- prevent overwrites, custom encoding and enhanced formatting options for
36- specialized formats.
29+ class FileToolkit (BaseToolkit ):
30+ r"""A comprehensive toolkit for file operations including reading,
31+ writing, and editing files.
32+
33+ This class provides cross-platform (macOS, Linux, Windows) support for:
34+ - Reading various file formats (text, JSON, YAML, PDF, DOCX)
35+ - Writing to multiple formats (Markdown, DOCX, PDF, plaintext, JSON,
36+ YAML, CSV, HTML)
37+ - Editing and modifying existing files with content replacement
38+ - Automatic backup creation before modifications
39+ - Custom encoding and enhanced formatting options
3740 """
3841
3942 def __init__ (
@@ -126,36 +129,32 @@ def _write_text_file(
126129 with file_path .open ("w" , encoding = encoding ) as f :
127130 f .write (content )
128131
129- def _generate_unique_filename (self , file_path : Path ) -> Path :
130- r"""Generate a unique filename if the target file already exists.
132+ def _create_backup (self , file_path : Path ) -> Optional [ Path ] :
133+ r"""Create a backup of the file if it exists and backup is enabled .
131134
132135 Args:
133- file_path (Path): The original file path.
136+ file_path (Path): The file path to backup .
134137
135138 Returns:
136- Path: A unique file path that doesn't exist yet .
139+ Optional[ Path]: Path to the backup file if created, None otherwise .
137140 """
138- if not file_path .exists ():
139- return file_path
141+ if not self . backup_enabled or not file_path .exists ():
142+ return None
140143
141- # Generate unique filename with timestamp and counter
144+ # Generate backup filename with .bak extension and timestamp
142145 timestamp = datetime .now ().strftime ("%Y%m%d_%H%M%S" )
143- stem = file_path .stem
144- suffix = file_path .suffix
145- parent = file_path .parent
146-
147- # First try with timestamp
148- new_path = parent / f"{ stem } _{ timestamp } { suffix } "
149- if not new_path .exists ():
150- return new_path
151-
152- # If timestamp version exists, add counter
153- counter = 1
154- while True :
155- new_path = parent / f"{ stem } _{ timestamp } _{ counter } { suffix } "
156- if not new_path .exists ():
157- return new_path
158- counter += 1
146+ backup_path = file_path .parent / f"{ file_path .name } .{ timestamp } .bak"
147+
148+ # Copy the file to backup location
149+ import shutil
150+
151+ try :
152+ shutil .copy2 (file_path , backup_path )
153+ logger .info (f"Created backup: { backup_path } " )
154+ return backup_path
155+ except Exception as e :
156+ logger .warning (f"Failed to create backup: { e } " )
157+ return None
159158
160159 def _write_docx_file (self , file_path : Path , content : str ) -> None :
161160 r"""Write text content to a DOCX file with default formatting.
@@ -1006,8 +1005,9 @@ def write_to_file(
10061005 file_path = self ._resolve_filepath (filename )
10071006 file_path .parent .mkdir (parents = True , exist_ok = True )
10081007
1009- # Generate unique filename if file exists
1010- file_path = self ._generate_unique_filename (file_path )
1008+ # Create backup of existing file if backup is enabled
1009+ if file_path .exists () and self .backup_enabled :
1010+ self ._create_backup (file_path )
10111011
10121012 extension = file_path .suffix .lower ()
10131013
@@ -1062,6 +1062,144 @@ def write_to_file(
10621062 logger .error (error_msg )
10631063 return error_msg
10641064
1065+ # ----------------------------------------------
1066+ # Read File Functions
1067+ # ----------------------------------------------
1068+ def read_file (
1069+ self , file_paths : Union [str , List [str ]]
1070+ ) -> Union [str , Dict [str , str ]]:
1071+ r"""Read and return content of one or more files using MarkItDown
1072+ for better format support.
1073+
1074+ This method uses MarkItDownLoader to convert various file formats
1075+ to Markdown. It supports a wide range of formats including:
1076+ - PDF (.pdf)
1077+ - Microsoft Office: Word (.doc, .docx), Excel (.xls, .xlsx),
1078+ PowerPoint (.ppt, .pptx)
1079+ - EPUB (.epub)
1080+ - HTML (.html, .htm)
1081+ - Images (.jpg, .jpeg, .png) for OCR
1082+ - Audio (.mp3, .wav) for transcription
1083+ - Text-based formats (.csv, .json, .xml, .txt, .md)
1084+ - ZIP archives (.zip)
1085+
1086+ Args:
1087+ file_paths (Union[str, List[str]]): A single file path or a list
1088+ of file paths to read. Paths can be relative or absolute.
1089+ If relative, they will be resolved relative to the working
1090+ directory.
1091+
1092+ Returns:
1093+ Union[str, Dict[str, str]]:
1094+ - If a single file path is provided: Returns the content as
1095+ a string.
1096+ - If multiple file paths are provided: Returns a dictionary
1097+ where keys are file paths and values are the corresponding
1098+ content in Markdown format.
1099+ If conversion fails, returns an error message.
1100+ """
1101+ from camel .loaders .markitdown import MarkItDownLoader
1102+
1103+ try :
1104+ # Handle single file path for backward compatibility
1105+ if isinstance (file_paths , str ):
1106+ resolved_path = self ._resolve_filepath (file_paths )
1107+
1108+ # Use MarkItDownLoader to convert the file
1109+ result = MarkItDownLoader ().convert_files (
1110+ file_paths = [str (resolved_path )], parallel = False
1111+ )
1112+
1113+ # Return the converted content or error message
1114+ return result .get (
1115+ str (resolved_path ), f"Failed to read file: { resolved_path } "
1116+ )
1117+
1118+ # Handle multiple file paths
1119+ else :
1120+ resolved_paths = [
1121+ str (self ._resolve_filepath (fp )) for fp in file_paths
1122+ ]
1123+
1124+ # Use MarkItDownLoader to convert files in parallel
1125+ result = MarkItDownLoader ().convert_files (
1126+ file_paths = resolved_paths , parallel = True
1127+ )
1128+
1129+ # Map back to original paths if needed
1130+ return_dict = {}
1131+ for original , resolved in zip (file_paths , resolved_paths ):
1132+ return_dict [original ] = result .get (
1133+ resolved , f"Failed to read file: { resolved } "
1134+ )
1135+
1136+ return return_dict
1137+
1138+ except Exception as e :
1139+ return f"Error reading file(s): { e } "
1140+
1141+ # ----------------------------------------------
1142+ # Edit File Functions
1143+ # ----------------------------------------------
1144+ def edit_file (
1145+ self , file_path : str , old_content : str , new_content : str
1146+ ) -> str :
1147+ r"""Edit a file by replacing specified content.
1148+
1149+ This method performs simple text replacement in files. It reads
1150+ the file, replaces all occurrences of old_content with new_content,
1151+ and writes the result back.
1152+
1153+ Args:
1154+ file_path (str): The path to the file to edit. Can be
1155+ relative or absolute. If relative, it will be resolved
1156+ relative to the working directory.
1157+ old_content (str): The exact text to find and replace.
1158+ new_content (str): The text to replace old_content with.
1159+
1160+ Returns:
1161+ str: A success message if the edit was successful, or an
1162+ error message if the content wasn't found or an error occurred.
1163+ """
1164+ try :
1165+ working_path = self ._resolve_filepath (file_path )
1166+
1167+ if not working_path .exists ():
1168+ return f"Error: File { working_path } does not exist"
1169+
1170+ # Create backup before editing if enabled
1171+ self ._create_backup (working_path )
1172+
1173+ # Read the file content
1174+ try :
1175+ file_text = working_path .read_text (
1176+ encoding = self .default_encoding
1177+ )
1178+ except Exception as e :
1179+ return f"Error reading file: { e } "
1180+
1181+ # Check if the old_content exists in the file
1182+ if old_content not in file_text :
1183+ return (
1184+ f"No replacement performed: '{ old_content } ' not found in "
1185+ f"{ working_path } ."
1186+ )
1187+
1188+ # Replace the content
1189+ new_file_text = file_text .replace (old_content , new_content )
1190+
1191+ # Write back to file
1192+ try :
1193+ working_path .write_text (
1194+ new_file_text , encoding = self .default_encoding
1195+ )
1196+ return f"Successfully edited { working_path } "
1197+ except Exception as e :
1198+ return f"Error writing file: { e } "
1199+
1200+ except Exception as e :
1201+ return f"Error editing file: { e } "
1202+
10651203 def get_tools (self ) -> List [FunctionTool ]:
10661204 r"""Return a list of FunctionTool objects representing the functions
10671205 in the toolkit.
@@ -1072,4 +1210,26 @@ def get_tools(self) -> List[FunctionTool]:
10721210 """
10731211 return [
10741212 FunctionTool (self .write_to_file ),
1213+ FunctionTool (self .read_file ),
1214+ FunctionTool (self .edit_file ),
10751215 ]
1216+
1217+
1218+ # Backward compatibility: FileWriteToolkit as deprecated alias
1219+ class FileWriteToolkit (FileToolkit ):
1220+ r"""Deprecated: Use FileToolkit instead.
1221+
1222+ This class is maintained for backward compatibility only.
1223+ Please use FileToolkit for new code.
1224+ """
1225+
1226+ def __init__ (self , * args , ** kwargs ):
1227+ import warnings
1228+
1229+ warnings .warn (
1230+ "FileWriteToolkit is deprecated and will be removed in a "
1231+ "future version. Please use FileToolkit instead." ,
1232+ DeprecationWarning ,
1233+ stacklevel = 2 ,
1234+ )
1235+ super ().__init__ (* args , ** kwargs )
0 commit comments