diff --git a/debug_mcp_server.py b/debug_mcp_server.py index 10626d8..6713efe 100644 --- a/debug_mcp_server.py +++ b/debug_mcp_server.py @@ -37,7 +37,12 @@ async def debug_mcp_server(): # Set up environment env = os.environ.copy() - env.update({"SBCTL_TOKEN": "test-token-12345", "MCP_BUNDLE_STORAGE": str(temp_bundle_dir)}) + env.update( + { + "SBCTL_TOKEN": "test-token-12345", + "MCP_BUNDLE_STORAGE": str(temp_bundle_dir), + } + ) print("\\n=== Starting MCP Server Manually ===") diff --git a/debug_sbctl.py b/debug_sbctl.py index 45da04b..2045fe0 100644 --- a/debug_sbctl.py +++ b/debug_sbctl.py @@ -38,7 +38,10 @@ async def debug_sbctl(): # Test 1: Check if sbctl can read the bundle print("\n=== Test 1: Running sbctl with --help ===") process = await asyncio.create_subprocess_exec( - "sbctl", "--help", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + "sbctl", + "--help", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() print(f"sbctl --help exit code: {process.returncode}") @@ -52,7 +55,10 @@ async def debug_sbctl(): # Set up environment env = os.environ.copy() env.update( - {"SBCTL_TOKEN": "test-token-12345", "KUBECONFIG": str(temp_dir_path / "kubeconfig")} + { + "SBCTL_TOKEN": "test-token-12345", + "KUBECONFIG": str(temp_dir_path / "kubeconfig"), + } ) cmd = [ @@ -66,7 +72,10 @@ async def debug_sbctl(): # Start the process with timeout process = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, ) print(f"Process started with PID: {process.pid}") diff --git a/packages/x86_64/APKINDEX.tar.gz b/packages/x86_64/APKINDEX.tar.gz index b9cdb94..b3c12ff 100644 Binary files a/packages/x86_64/APKINDEX.tar.gz and b/packages/x86_64/APKINDEX.tar.gz differ diff --git a/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk b/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk index 98608e2..6e89965 100644 Binary files a/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk and b/packages/x86_64/troubleshoot-mcp-server-0.1.0-r0.apk differ diff --git a/src/mcp_server_troubleshoot/__main__.py b/src/mcp_server_troubleshoot/__main__.py index 46478f2..45f61ca 100644 --- a/src/mcp_server_troubleshoot/__main__.py +++ b/src/mcp_server_troubleshoot/__main__.py @@ -79,7 +79,9 @@ def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace: parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging") parser.add_argument("--bundle-dir", type=str, help="Directory to store support bundles") parser.add_argument( - "--show-config", action="store_true", help="Show recommended MCP client configuration" + "--show-config", + action="store_true", + help="Show recommended MCP client configuration", ) return parser.parse_args(args) diff --git a/src/mcp_server_troubleshoot/bundle.py b/src/mcp_server_troubleshoot/bundle.py index bc18c0d..e4fed3e 100644 --- a/src/mcp_server_troubleshoot/bundle.py +++ b/src/mcp_server_troubleshoot/bundle.py @@ -45,7 +45,11 @@ ) # Feature flags from environment variables -CLEANUP_ORPHANED = os.environ.get("SBCTL_CLEANUP_ORPHANED", "true").lower() in ("true", "1", "yes") +CLEANUP_ORPHANED = os.environ.get("SBCTL_CLEANUP_ORPHANED", "true").lower() in ( + "true", + "1", + "yes", +) ALLOW_ALTERNATIVE_KUBECONFIG = os.environ.get( "SBCTL_ALLOW_ALTERNATIVE_KUBECONFIG", "true" ).lower() in ("true", "1", "yes") @@ -97,7 +101,8 @@ class BundleMetadata(BaseModel): kubeconfig_path: Path = Field(description="The path to the kubeconfig file") initialized: bool = Field(description="Whether the bundle has been initialized with sbctl") host_only_bundle: bool = Field( - False, description="Whether this bundle contains only host resources (no cluster resources)" + False, + description="Whether this bundle contains only host resources (no cluster resources)", ) @@ -108,10 +113,12 @@ class InitializeBundleArgs(BaseModel): source: str = Field(description="The source of the bundle (URL or local path)") force: bool = Field( - False, description="Whether to force re-initialization if a bundle is already active" + False, + description="Whether to force re-initialization if a bundle is already active", ) verbosity: Optional[str] = Field( - None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)" + None, + description="Verbosity level for response formatting (minimal|standard|verbose|debug)", ) @field_validator("source") @@ -180,10 +187,12 @@ class ListAvailableBundlesArgs(BaseModel): """ include_invalid: bool = Field( - False, description="Whether to include invalid or inaccessible bundles in the results" + False, + description="Whether to include invalid or inaccessible bundles in the results", ) verbosity: Optional[str] = Field( - None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)" + None, + description="Verbosity level for response formatting (minimal|standard|verbose|debug)", ) @@ -797,7 +806,8 @@ async def _initialize_with_sbctl(self, bundle_path: Path, output_dir: Path) -> P ) try: safe_copy_file( - announced_kubeconfig, kubeconfig_path + announced_kubeconfig, + kubeconfig_path, ) logger.info( f"Successfully copied kubeconfig to {kubeconfig_path}" @@ -1190,7 +1200,10 @@ async def _wait_for_initialization( # Search for any newly created kubeconfig files in common locations if enabled if ALLOW_ALTERNATIVE_KUBECONFIG: - for pattern in ["/tmp/kubeconfig*", "/var/folders/*/*/local-kubeconfig-*"]: + for pattern in [ + "/tmp/kubeconfig*", + "/var/folders/*/*/local-kubeconfig-*", + ]: for path in glob.glob(pattern): kubeconfig_file = Path(path) if kubeconfig_file not in alternative_kubeconfig_paths: @@ -1348,7 +1361,10 @@ async def _terminate_sbctl_process(self) -> None: logger.debug( f"Process {pid} terminated successfully" ) - except (ProcessLookupError, PermissionError) as e: + except ( + ProcessLookupError, + PermissionError, + ) as e: logger.debug(f"Error terminating process {pid}: {e}") except ( psutil.NoSuchProcess, @@ -2032,7 +2048,7 @@ async def list_available_bundles(self, include_invalid: bool = False) -> List[Bu path=str(file_path), relative_path=file_path.name, name=file_path.name, - size_bytes=file_path.stat().st_size if file_path.exists() else 0, + size_bytes=(file_path.stat().st_size if file_path.exists() else 0), modified_time=( file_path.stat().st_mtime if file_path.exists() else 0 ), @@ -2144,7 +2160,11 @@ async def cleanup(self) -> None: and any("sbctl" in arg for arg in proc.info["cmdline"]) ): sbctl_processes.append(proc) - except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + except ( + psutil.NoSuchProcess, + psutil.AccessDenied, + psutil.ZombieProcess, + ): # Process disappeared or access denied - skip it continue diff --git a/src/mcp_server_troubleshoot/files.py b/src/mcp_server_troubleshoot/files.py index 81f64d9..1288273 100644 --- a/src/mcp_server_troubleshoot/files.py +++ b/src/mcp_server_troubleshoot/files.py @@ -57,7 +57,8 @@ class ListFilesArgs(BaseModel): path: str = Field(description="The path within the bundle to list") recursive: bool = Field(False, description="Whether to list recursively") verbosity: Optional[str] = Field( - None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)" + None, + description="Verbosity level for response formatting (minimal|standard|verbose|debug)", ) @field_validator("path") @@ -105,7 +106,8 @@ class ReadFileArgs(BaseModel): None, description="The line number to end reading at (0-indexed, inclusive)" ) verbosity: Optional[str] = Field( - None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)" + None, + description="Verbosity level for response formatting (minimal|standard|verbose|debug)", ) @field_validator("path") @@ -159,7 +161,8 @@ class GrepFilesArgs(BaseModel): max_results_per_file: int = Field(5, description="Maximum number of results to return per file") max_files: int = Field(10, description="Maximum number of files to search/return") verbosity: Optional[str] = Field( - None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)" + None, + description="Verbosity level for response formatting (minimal|standard|verbose|debug)", ) @field_validator("path") @@ -280,7 +283,8 @@ class GrepResult(BaseModel): case_sensitive: bool = Field(description="Whether the search was case-sensitive") truncated: bool = Field(description="Whether the results were truncated due to max_results") files_truncated: bool = Field( - default=False, description="Whether the file list was truncated due to max_files" + default=False, + description="Whether the file list was truncated due to max_files", ) @@ -592,7 +596,12 @@ async def read_file( binary=is_binary, ) - except (PathNotFoundError, InvalidPathError, ReadFileError, FileSystemError) as e: + except ( + PathNotFoundError, + InvalidPathError, + ReadFileError, + FileSystemError, + ) as e: # Re-raise known errors logger.error(f"Error reading file: {str(e)}") raise diff --git a/src/mcp_server_troubleshoot/formatters.py b/src/mcp_server_troubleshoot/formatters.py index 1516be6..2bfc779 100644 --- a/src/mcp_server_troubleshoot/formatters.py +++ b/src/mcp_server_troubleshoot/formatters.py @@ -394,6 +394,119 @@ def format_error(self, error_message: str, diagnostics: Optional[Dict[str, Any]] response += f"\n\nDiagnostic information:\n```json\n{json.dumps(diagnostics, separators=(',', ':'))}\n```" return response + def format_overflow_message(self, tool_name: str, estimated_tokens: int, content: str) -> str: + """ + Format helpful overflow message with tool-specific guidance. + + Args: + tool_name: Name of the MCP tool that generated the content + estimated_tokens: Estimated token count of the content + content: The original content that was too large + + Returns: + Formatted overflow message with tool-specific suggestions and content preview + """ + # Default token limit (can be made configurable) + token_limit = 25000 + + # Base overflow message + message = f"Content too large ({estimated_tokens:,} tokens, limit: {token_limit:,}).\n\n" + + # Tool-specific suggestions + suggestions = self._get_tool_specific_suggestions(tool_name) + if suggestions: + message += "Suggestions to reduce output size:\n" + for i, suggestion in enumerate(suggestions, 1): + message += f"{i}. {suggestion}\n" + message += "\n" + + # Add content preview based on verbosity level + preview = self._generate_content_preview(content) + if preview: + message += preview + + return message + + def _get_tool_specific_suggestions(self, tool_name: str) -> List[str]: + """Get tool-specific suggestions for reducing output size.""" + suggestions_map = { + "list_files": [ + "Use non-recursive listing: list_files(recursive=false)", + 'Target specific directories: list_files(path="specific/subdirectory")', + 'Use minimal verbosity: list_files(verbosity="minimal")', + ], + "read_file": [ + "Read specific line ranges: read_file(start_line=1, end_line=100)", + "Read smaller chunks: read_file(start_line=1, end_line=50)", + "Target specific sections of the file", + ], + "grep_files": [ + "Limit results per file: grep_files(max_results=50, max_files=5)", + 'Use specific file patterns: grep_files(glob_pattern="*.yaml")', + 'Target specific directories: grep_files(path="specific/subdirectory")', + 'Use minimal verbosity: grep_files(verbosity="minimal")', + ], + "kubectl": [ + "Query specific resources: kubectl get pods --selector=app=myapp", + "Use different output formats: kubectl get pods -o name", + "Target specific namespaces: kubectl get pods -n specific-namespace", + "Limit results: kubectl get pods --limit=50", + ], + "initialize_bundle": ['Use minimal verbosity: initialize_bundle(verbosity="minimal")'], + "list_bundles": ['Use minimal verbosity: list_bundles(verbosity="minimal")'], + } + + return suggestions_map.get( + tool_name, + [ + 'Use minimal verbosity if supported: tool_name(verbosity="minimal")', + "Apply filtering parameters to reduce data scope", + "Target more specific data subsets", + ], + ) + + def _generate_content_preview(self, content: str) -> str: + """Generate a content preview based on verbosity level.""" + if self.verbosity == VerbosityLevel.MINIMAL: + # Very brief preview for minimal mode + preview_chars = 100 + preview_text = content[:preview_chars] + if len(content) > preview_chars: + preview_text += "..." + return f"First {preview_chars} characters:\n{preview_text}\n" + + elif self.verbosity == VerbosityLevel.STANDARD: + # Moderate preview for standard mode + preview_chars = 200 + preview_text = content[:preview_chars] + if len(content) > preview_chars: + preview_text += "..." + return f"Showing first {preview_chars} characters:\n{preview_text}\n" + + else: # VERBOSE or DEBUG + # More detailed preview with structure info + preview_chars = 500 + preview_text = content[:preview_chars] + if len(content) > preview_chars: + preview_text += "..." + + # Try to provide structural information + lines = content.split("\n") + total_lines = len(lines) + + preview_msg = ( + f"Showing first {preview_chars} characters (content has {total_lines:,} lines):\n" + ) + preview_msg += "```\n" + preview_text + "\n```\n" + + # Add helpful context about the content structure + if content.strip().startswith("{") or content.strip().startswith("["): + preview_msg += "\n*Note: Content appears to be JSON format*\n" + elif "```" in content: + preview_msg += "\n*Note: Content contains code blocks or formatted sections*\n" + + return preview_msg + def _format_file_size(self, size_bytes: int) -> str: """Format file size in human-readable format.""" if size_bytes < 1024: diff --git a/src/mcp_server_troubleshoot/kubectl.py b/src/mcp_server_troubleshoot/kubectl.py index bd4e75c..39d252b 100644 --- a/src/mcp_server_troubleshoot/kubectl.py +++ b/src/mcp_server_troubleshoot/kubectl.py @@ -46,7 +46,8 @@ class KubectlCommandArgs(BaseModel): timeout: int = Field(30, description="Timeout in seconds for the command") json_output: bool = Field(False, description="Whether to format the output as JSON") verbosity: Optional[str] = Field( - None, description="Verbosity level for response formatting (minimal|standard|verbose|debug)" + None, + description="Verbosity level for response formatting (minimal|standard|verbose|debug)", ) @field_validator("command") diff --git a/src/mcp_server_troubleshoot/server.py b/src/mcp_server_troubleshoot/server.py index 0a2a8bf..7af7398 100644 --- a/src/mcp_server_troubleshoot/server.py +++ b/src/mcp_server_troubleshoot/server.py @@ -27,7 +27,8 @@ ReadFileArgs, ) from .lifecycle import app_lifespan, AppContext -from .formatters import get_formatter +from .formatters import get_formatter, ResponseFormatter +from .size_limiter import SizeLimiter logger = logging.getLogger(__name__) @@ -112,6 +113,36 @@ def get_file_explorer() -> FileExplorer: return _file_explorer +def check_response_size( + content: str, tool_name: str, formatter: ResponseFormatter +) -> List[TextContent]: + """ + Single centralized function to check all MCP tool responses for size limits. + + Args: + content: The content to check for size + tool_name: Name of the tool generating the response (for tool-specific guidance) + formatter: Response formatter instance for verbosity and overflow messages + + Returns: + List of TextContent with either the original content or an overflow message + """ + size_limiter = SizeLimiter() + tokens = size_limiter.estimate_tokens(content) + + if not size_limiter.enabled: + # Size checking disabled - return content as-is + return [TextContent(type="text", text=content)] + + if tokens <= size_limiter.token_limit: + # Content within limits - return as-is + return [TextContent(type="text", text=content)] + else: + # Content exceeds limits - return overflow message + overflow_msg = formatter.format_overflow_message(tool_name, tokens, content) + return [TextContent(type="text", text=overflow_msg)] + + @mcp.tool() async def initialize_bundle(args: InitializeBundleArgs) -> List[TextContent]: """ @@ -153,7 +184,7 @@ async def initialize_bundle(args: InitializeBundleArgs) -> List[TextContent]: # Format response using the formatter response = formatter.format_bundle_initialization(result, api_server_available, diagnostics) - return [TextContent(type="text", text=response)] + return check_response_size(response, "initialize_bundle", formatter) except BundleManagerError as e: error_message = f"Failed to initialize bundle: {str(e)}" @@ -167,7 +198,7 @@ async def initialize_bundle(args: InitializeBundleArgs) -> List[TextContent]: logger.error(f"Failed to get diagnostics: {diag_error}") formatted_error = formatter.format_error(error_message, diagnostics) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "initialize_bundle", formatter) except Exception as e: error_message = f"Unexpected error initializing bundle: {str(e)}" logger.exception(error_message) @@ -180,7 +211,7 @@ async def initialize_bundle(args: InitializeBundleArgs) -> List[TextContent]: logger.error(f"Failed to get diagnostics: {diag_error}") formatted_error = formatter.format_error(error_message, diagnostics) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "initialize_bundle", formatter) @mcp.tool() @@ -208,18 +239,18 @@ async def list_available_bundles(args: ListAvailableBundlesArgs) -> List[TextCon bundles = await bundle_manager.list_available_bundles(args.include_invalid) response = formatter.format_bundle_list(bundles) - return [TextContent(type="text", text=response)] + return check_response_size(response, "list_bundles", formatter) except BundleManagerError as e: error_message = f"Failed to list bundles: {str(e)}" logger.error(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "list_bundles", formatter) except Exception as e: error_message = f"Unexpected error listing bundles: {str(e)}" logger.exception(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "list_bundles", formatter) @mcp.tool() @@ -256,7 +287,7 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: ) logger.error("No bundle initialized for kubectl command") formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "kubectl", formatter) # Check if this is a host-only bundle if active_bundle.host_only_bundle: @@ -267,7 +298,7 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: ) logger.info("kubectl command attempted on host-only bundle") formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "kubectl", formatter) # Check if the API server is available before attempting kubectl api_server_available = await bundle_manager.check_api_server_available() @@ -280,14 +311,14 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: ) logger.error("API server not available for kubectl command") formatted_error = formatter.format_error(error_message, diagnostics) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "kubectl", formatter) # Execute the kubectl command result = await get_kubectl_executor().execute(args.command, args.timeout, args.json_output) # Format response using the formatter response = formatter.format_kubectl_result(result) - return [TextContent(type="text", text=response)] + return check_response_size(response, "kubectl", formatter) except KubectlError as e: error_message = f"kubectl command failed: {str(e)}" @@ -309,7 +340,7 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: logger.error(f"Failed to get diagnostics: {diag_error}") formatted_error = formatter.format_error(error_message, diagnostics) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "kubectl", formatter) except BundleManagerError as e: error_message = f"Bundle error: {str(e)}" logger.error(error_message) @@ -322,7 +353,7 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: logger.error(f"Failed to get diagnostics: {diag_error}") formatted_error = formatter.format_error(error_message, diagnostics) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "kubectl", formatter) except Exception as e: error_message = f"Unexpected error executing kubectl command: {str(e)}" logger.exception(error_message) @@ -335,7 +366,7 @@ async def kubectl(args: KubectlCommandArgs) -> List[TextContent]: logger.error(f"Failed to get diagnostics: {diag_error}") formatted_error = formatter.format_error(error_message, diagnostics) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "kubectl", formatter) @mcp.tool() @@ -366,23 +397,23 @@ async def list_files(args: ListFilesArgs) -> List[TextContent]: try: result = await get_file_explorer().list_files(args.path, args.recursive) response = formatter.format_file_list(result) - return [TextContent(type="text", text=response)] + return check_response_size(response, "list_files", formatter) except FileSystemError as e: error_message = f"File system error: {str(e)}" logger.error(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "list_files", formatter) except BundleManagerError as e: error_message = f"Bundle error: {str(e)}" logger.error(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "list_files", formatter) except Exception as e: error_message = f"Unexpected error listing files: {str(e)}" logger.exception(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "list_files", formatter) @mcp.tool() @@ -414,23 +445,23 @@ async def read_file(args: ReadFileArgs) -> List[TextContent]: try: result = await get_file_explorer().read_file(args.path, args.start_line, args.end_line) response = formatter.format_file_content(result) - return [TextContent(type="text", text=response)] + return check_response_size(response, "read_file", formatter) except FileSystemError as e: error_message = f"File system error: {str(e)}" logger.error(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "read_file", formatter) except BundleManagerError as e: error_message = f"Bundle error: {str(e)}" logger.error(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "read_file", formatter) except Exception as e: error_message = f"Unexpected error reading file: {str(e)}" logger.exception(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "read_file", formatter) @mcp.tool() @@ -476,23 +507,23 @@ async def grep_files(args: GrepFilesArgs) -> List[TextContent]: ) response = formatter.format_grep_results(result) - return [TextContent(type="text", text=response)] + return check_response_size(response, "grep_files", formatter) except FileSystemError as e: error_message = f"File system error: {str(e)}" logger.error(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "grep_files", formatter) except BundleManagerError as e: error_message = f"Bundle error: {str(e)}" logger.error(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "grep_files", formatter) except Exception as e: error_message = f"Unexpected error searching files: {str(e)}" logger.exception(error_message) formatted_error = formatter.format_error(error_message) - return [TextContent(type="text", text=formatted_error)] + return check_response_size(formatted_error, "grep_files", formatter) # Helper function to initialize the bundle manager with a specified directory diff --git a/src/mcp_server_troubleshoot/size_limiter.py b/src/mcp_server_troubleshoot/size_limiter.py new file mode 100644 index 0000000..5cb4868 --- /dev/null +++ b/src/mcp_server_troubleshoot/size_limiter.py @@ -0,0 +1,141 @@ +""" +Size limiting utilities for MCP response optimization. + +This module implements the SizeLimiter class that provides fast token estimation +and content size validation to prevent oversized responses that could impact +MCP client performance. +""" + +import logging +import os +from typing import Optional + +logger = logging.getLogger(__name__) + + +class SizeLimiter: + """ + Provides token estimation and size limiting for MCP responses. + + This class implements fast character-based token approximation and + configurable limits to optimize response sizes while maintaining + functionality for normal-sized content. + """ + + def __init__(self, token_limit: Optional[int] = None): + """ + Initialize the size limiter with configurable token limit. + + Args: + token_limit: Maximum token limit, or None to use environment default + """ + if token_limit is None: + # Default to 25000 tokens, configurable via environment + token_limit = int(os.environ.get("MCP_TOKEN_LIMIT", "25000")) + + self.token_limit = token_limit + + # Check if size checking is enabled (can be disabled for testing) + self.enabled = os.environ.get("MCP_SIZE_CHECK_ENABLED", "true").lower() not in ( + "false", + "0", + "no", + ) + + if not self.enabled: + logger.debug("Size limiting disabled via MCP_SIZE_CHECK_ENABLED") + + def estimate_tokens(self, text: str) -> int: + """ + Estimate token count using fast character-based approximation. + + Uses a simple heuristic of ~4 characters per token, which provides + a reasonable approximation for most text content while being + extremely fast for normal-sized responses. + + Args: + text: The text content to estimate tokens for + + Returns: + Estimated token count + """ + if not text: + return 0 + + # Fast approximation: ~4 characters per token + return len(text) // 4 + + def check_size(self, content: str) -> bool: + """ + Check if content size is within acceptable limits. + + Returns True if the content is within limits or size checking + is disabled, False if it exceeds the configured token limit. + + Args: + content: The content to check + + Returns: + True if content is within limits, False if it exceeds limits + """ + if not self.enabled: + return True + + estimated_tokens = self.estimate_tokens(content) + + if estimated_tokens > self.token_limit: + logger.warning( + f"Content size ({estimated_tokens} estimated tokens) exceeds limit ({self.token_limit})" + ) + return False + + return True + + def get_overflow_summary(self, content: str, max_preview_chars: int = 500) -> str: + """ + Generate a summary for content that exceeds size limits. + + Provides a truncated preview with overflow information when + content is too large to return in full. + + Args: + content: The original content that exceeded limits + max_preview_chars: Maximum characters to include in preview + + Returns: + A summary string with truncated content and overflow info + """ + estimated_tokens = self.estimate_tokens(content) + content_length = len(content) + + # Create truncated preview + if len(content) > max_preview_chars: + preview = content[:max_preview_chars] + "..." + else: + preview = content + + summary = f"""Content size limit exceeded: +- Estimated tokens: {estimated_tokens} +- Token limit: {self.token_limit} +- Content length: {content_length} characters + +Preview (first {min(max_preview_chars, content_length)} chars): +{preview} + +--- Content truncated due to size limits --- +Consider using more specific queries or filters to reduce response size.""" + + return summary + + +def get_size_limiter(token_limit: Optional[int] = None) -> SizeLimiter: + """ + Get a SizeLimiter instance with the specified token limit. + + Args: + token_limit: The token limit, or None to use environment defaults + + Returns: + A configured SizeLimiter instance + """ + return SizeLimiter(token_limit) diff --git a/src/mcp_server_troubleshoot/subprocess_utils.py b/src/mcp_server_troubleshoot/subprocess_utils.py index 84b8882..48bd972 100644 --- a/src/mcp_server_troubleshoot/subprocess_utils.py +++ b/src/mcp_server_troubleshoot/subprocess_utils.py @@ -113,7 +113,9 @@ async def _safe_transport_wait_close( @asynccontextmanager -async def pipe_transport_reader(pipe: Any) -> AsyncGenerator[asyncio.StreamReader, None]: +async def pipe_transport_reader( + pipe: Any, +) -> AsyncGenerator[asyncio.StreamReader, None]: """ Async context manager for managing pipe transport with proper cleanup. diff --git a/tasks/backlog/implement-output-size-limits.md b/tasks/completed/implement-output-size-limits.md similarity index 98% rename from tasks/backlog/implement-output-size-limits.md rename to tasks/completed/implement-output-size-limits.md index ed255cf..57f0d8b 100644 --- a/tasks/backlog/implement-output-size-limits.md +++ b/tasks/completed/implement-output-size-limits.md @@ -1,12 +1,15 @@ # Task: Implement Output Size Limits for MCP Server Responses ## Metadata -**Status**: ready +**Status**: completed **Created**: 2025-07-28 +**Started**: 2025-07-28 +**Completed**: 2025-07-28 **Priority**: high **Complexity**: medium **Estimated effort**: 5 phases **Labels**: optimization, token-usage, user-experience +**PR**: https://github.com/chris-sanders/troubleshoot-mcp-server/pull/45 ## Objective Implement size limits (~25k tokens) on all MCP server responses with helpful overflow messages directing users to filtering and formatting commands to reduce output size. diff --git a/tests/e2e/test_container_bundle_validation.py b/tests/e2e/test_container_bundle_validation.py index 2bdd4db..654f41a 100644 --- a/tests/e2e/test_container_bundle_validation.py +++ b/tests/e2e/test_container_bundle_validation.py @@ -153,7 +153,9 @@ def container_image_available(): """Check if container image exists.""" try: result = subprocess.run( - [CONTAINER_RUNTIME, "image", "exists", CONTAINER_IMAGE], capture_output=True, timeout=10 + [CONTAINER_RUNTIME, "image", "exists", CONTAINER_IMAGE], + capture_output=True, + timeout=10, ) if result.returncode != 0: pytest.skip( @@ -354,7 +356,12 @@ async def test_container_complete_workflow( # Step 3: Test grep (should work even if no matches) grep_response = await client.call_tool( "grep_files", - {"pattern": "kube", "path": "/", "recursive": True, "case_sensitive": False}, + { + "pattern": "kube", + "path": "/", + "recursive": True, + "case_sensitive": False, + }, ) assert "result" in grep_response @@ -410,7 +417,12 @@ def test_build_script_produces_working_image(self, container_runtime_available): # Verify the image was created check_result = subprocess.run( - [CONTAINER_RUNTIME, "image", "exists", "troubleshoot-mcp-server:test-build"], + [ + CONTAINER_RUNTIME, + "image", + "exists", + "troubleshoot-mcp-server:test-build", + ], capture_output=True, timeout=10, ) diff --git a/tests/e2e/test_container_production_validation.py b/tests/e2e/test_container_production_validation.py index 0bb9f6d..18d5275 100644 --- a/tests/e2e/test_container_production_validation.py +++ b/tests/e2e/test_container_production_validation.py @@ -236,7 +236,16 @@ def test_container_isolated_from_host_tools(): # This should fail because busybox doesn't have sbctl result = subprocess.run( - [runtime, "run", "--name", container_name, "--rm", base_image, "which", "sbctl"], + [ + runtime, + "run", + "--name", + container_name, + "--rm", + base_image, + "which", + "sbctl", + ], capture_output=True, text=True, timeout=30, diff --git a/tests/e2e/test_container_shutdown_reliability.py b/tests/e2e/test_container_shutdown_reliability.py index 2f923e6..a1d9a10 100644 --- a/tests/e2e/test_container_shutdown_reliability.py +++ b/tests/e2e/test_container_shutdown_reliability.py @@ -227,7 +227,14 @@ def test_shutdown_during_heavy_load(self): requests = [] for i in range(10): requests.append( - json.dumps({"jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": i + 1}) + json.dumps( + { + "jsonrpc": "2.0", + "method": "tools/list", + "params": {}, + "id": i + 1, + } + ) + "\n" ) diff --git a/tests/e2e/test_mcp_protocol_integration.py b/tests/e2e/test_mcp_protocol_integration.py index 28ea1d4..93a24ce 100644 --- a/tests/e2e/test_mcp_protocol_integration.py +++ b/tests/e2e/test_mcp_protocol_integration.py @@ -112,8 +112,14 @@ async def test_tool_discovery_via_protocol(self, temp_bundle_dir, test_bundle_pa assert "tools" in init_response["capabilities"] # Skip tools/list test and mark as working if we can call a tool tools = [ - {"name": "initialize_bundle", "description": "Initialize bundle"}, - {"name": "list_available_bundles", "description": "List available bundles"}, + { + "name": "initialize_bundle", + "description": "Initialize bundle", + }, + { + "name": "list_available_bundles", + "description": "List available bundles", + }, {"name": "list_files", "description": "List files"}, {"name": "read_file", "description": "Read file"}, {"name": "grep_files", "description": "Grep files"}, diff --git a/tests/fixtures/mock_kubectl.py b/tests/fixtures/mock_kubectl.py index 3e94291..0edaebd 100644 --- a/tests/fixtures/mock_kubectl.py +++ b/tests/fixtures/mock_kubectl.py @@ -46,7 +46,9 @@ def parse_args(): # Version command version_parser = subparsers.add_parser("version", help="Show version info") - version_parser.add_argument("--client", action="store_true", help="Client version only") + version_parser.add_argument( + "--client", action="store_true", help="Client version only" + ) # Get command get_parser = subparsers.add_parser("get", help="Display resources") @@ -100,14 +102,24 @@ def handle_version(args): } if args.client: - print(json.dumps(client_version) if args.output == "json" else "Client Version: v1.26.0") + print( + json.dumps(client_version) + if args.output == "json" + else "Client Version: v1.26.0" + ) return 0 # Try to get server version too api_url = get_api_server_url() if not api_url: - print(json.dumps(client_version) if args.output == "json" else "Client Version: v1.26.0") - print("The connection to the server was refused - did you specify the right host or port?") + print( + json.dumps(client_version) + if args.output == "json" + else "Client Version: v1.26.0" + ) + print( + "The connection to the server was refused - did you specify the right host or port?" + ) return 1 try: @@ -123,7 +135,11 @@ def handle_version(args): ) return 0 except URLError as e: - print(json.dumps(client_version) if args.output == "json" else "Client Version: v1.26.0") + print( + json.dumps(client_version) + if args.output == "json" + else "Client Version: v1.26.0" + ) print(f"Error communicating with server: {e}") return 1 diff --git a/tests/fixtures/mock_sbctl.py b/tests/fixtures/mock_sbctl.py index 4fb06dc..7fe04b6 100755 --- a/tests/fixtures/mock_sbctl.py +++ b/tests/fixtures/mock_sbctl.py @@ -172,7 +172,11 @@ def do_GET(self): "name": "mock-node-1", "uid": "00000000-0000-0000-0000-000000000020", }, - "status": {"conditions": [{"type": "Ready", "status": "True"}]}, + "status": { + "conditions": [ + {"type": "Ready", "status": "True"} + ] + }, } ], } @@ -182,7 +186,9 @@ def do_GET(self): else: # Generic empty list for other resources self.wfile.write( - json.dumps({"kind": "List", "apiVersion": "v1", "items": []}).encode() + json.dumps( + {"kind": "List", "apiVersion": "v1", "items": []} + ).encode() ) return @@ -227,7 +233,9 @@ class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): """Threaded TCP Server with socket reuse capabilities.""" allow_reuse_address = True # Allow quick reuse of sockets - daemon_threads = True # Daemon threads terminate automatically when main thread exits + daemon_threads = ( + True # Daemon threads terminate automatically when main thread exits + ) def start_mock_api_server(port=None): @@ -263,7 +271,9 @@ def start_mock_api_server(port=None): try: httpd = ThreadedTCPServer(("", new_port), handler) - logger.info(f"Successfully started mock Kubernetes API server on port {new_port}") + logger.info( + f"Successfully started mock Kubernetes API server on port {new_port}" + ) # Store the actual port used in an environment variable os.environ["MOCK_K8S_API_PORT"] = str(new_port) @@ -293,10 +303,16 @@ def create_kubeconfig(directory): "apiVersion": "v1", "kind": "Config", "clusters": [ - {"name": "mock-cluster", "cluster": {"server": f"http://localhost:{api_port}"}} + { + "name": "mock-cluster", + "cluster": {"server": f"http://localhost:{api_port}"}, + } ], "contexts": [ - {"name": "mock-context", "context": {"cluster": "mock-cluster", "user": "mock-user"}} + { + "name": "mock-context", + "context": {"cluster": "mock-cluster", "user": "mock-user"}, + } ], "current-context": "mock-context", "users": [{"name": "mock-user", "user": {}}], @@ -365,7 +381,9 @@ def handle_signal(sig, frame): def main(): """Parse arguments and run mock sbctl.""" logger.debug("Starting mock sbctl command parser") - parser = argparse.ArgumentParser(description="Mock sbctl implementation for testing") + parser = argparse.ArgumentParser( + description="Mock sbctl implementation for testing" + ) subparsers = parser.add_subparsers(dest="command", help="Sub-command to execute") # Version command - creates 'version' subcommand diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a69d955..4ef97c7 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -33,7 +33,9 @@ def assert_attributes_exist(obj: Any, attributes: List[str]) -> None: @staticmethod def assert_api_response_valid( - response: List[Any], expected_type: str = "text", contains: Optional[List[str]] = None + response: List[Any], + expected_type: str = "text", + contains: Optional[List[str]] = None, ) -> None: """ Assert that an MCP API response is valid and contains expected content. diff --git a/tests/integration/test_api_server_lifecycle.py b/tests/integration/test_api_server_lifecycle.py index 2af662e..3db57f6 100644 --- a/tests/integration/test_api_server_lifecycle.py +++ b/tests/integration/test_api_server_lifecycle.py @@ -304,14 +304,20 @@ async def _collect_diagnostics(self, bundle_manager: BundleManager) -> Dict[str, except Exception as e: api_error = str(e) - diagnostics["api_server_status"] = {"available": api_available, "error": api_error} + diagnostics["api_server_status"] = { + "available": api_available, + "error": api_error, + } # Resource usage (simplified without psutil) resource_usage = {} if bundle_manager.sbctl_process: try: os.kill(bundle_manager.sbctl_process.pid, 0) - resource_usage = {"pid": bundle_manager.sbctl_process.pid, "process_exists": True} + resource_usage = { + "pid": bundle_manager.sbctl_process.pid, + "process_exists": True, + } except OSError: resource_usage = {"process_exists": False} diff --git a/tests/integration/test_mcp_protocol_errors.py b/tests/integration/test_mcp_protocol_errors.py index 39d16ff..c46f21e 100644 --- a/tests/integration/test_mcp_protocol_errors.py +++ b/tests/integration/test_mcp_protocol_errors.py @@ -136,7 +136,13 @@ async def test_rapid_initialization_requests(self, mcp_client): error_msg = str(response).lower() assert any( keyword in error_msg - for keyword in ["error", "timeout", "readuntil", "coroutine", "waiting"] + for keyword in [ + "error", + "timeout", + "readuntil", + "coroutine", + "waiting", + ] ) # Either some succeed or all fail gracefully diff --git a/tests/integration/test_mcp_protocol_real.py b/tests/integration/test_mcp_protocol_real.py index 78e86e7..92a04c2 100644 --- a/tests/integration/test_mcp_protocol_real.py +++ b/tests/integration/test_mcp_protocol_real.py @@ -103,7 +103,10 @@ async def init_client(): { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, - "clientInfo": {"name": f"test-client-{i}", "version": "1.0.0"}, + "clientInfo": { + "name": f"test-client-{i}", + "version": "1.0.0", + }, }, ) return response diff --git a/tests/integration/test_signal_handling_integration.py b/tests/integration/test_signal_handling_integration.py index 97f5e67..31c0257 100644 --- a/tests/integration/test_signal_handling_integration.py +++ b/tests/integration/test_signal_handling_integration.py @@ -80,7 +80,10 @@ async def main(): def run_server_with_signal( - script_content: str, signal_to_send: int, delay_before_signal: float = 0.5, timeout: float = 5.0 + script_content: str, + signal_to_send: int, + delay_before_signal: float = 0.5, + timeout: float = 5.0, ) -> Tuple[int, str, str]: """ Run a server script and send it a signal. @@ -255,7 +258,10 @@ async def main(): stdout, stderr = process.communicate(timeout=5) # Should handle gracefully - return code of 0 or -15 (SIGTERM) is acceptable - assert process.returncode in (0, -15), f"Unexpected return code: {process.returncode}" + assert process.returncode in ( + 0, + -15, + ), f"Unexpected return code: {process.returncode}" assert "Fatal Python error" not in stderr # On CI, the process might exit too quickly to log anything if stderr: diff --git a/tests/integration/test_tool_functions.py b/tests/integration/test_tool_functions.py index 6abf126..bfa82ff 100644 --- a/tests/integration/test_tool_functions.py +++ b/tests/integration/test_tool_functions.py @@ -31,7 +31,10 @@ kubectl, initialize_with_bundle_dir, ) -from mcp_server_troubleshoot.bundle import InitializeBundleArgs, ListAvailableBundlesArgs +from mcp_server_troubleshoot.bundle import ( + InitializeBundleArgs, + ListAvailableBundlesArgs, +) from mcp_server_troubleshoot.files import ListFilesArgs, ReadFileArgs, GrepFilesArgs from mcp_server_troubleshoot.kubectl import KubectlCommandArgs @@ -355,7 +358,11 @@ async def test_grep_files_function_execution(bundle_storage_dir): # Search for common pattern via direct function call grep_args = GrepFilesArgs( - pattern="version", path="/", file_pattern="*.json", case_sensitive=False, recursive=True + pattern="version", + path="/", + file_pattern="*.json", + case_sensitive=False, + recursive=True, ) grep_result = await grep_files(grep_args) diff --git a/tests/test_utils/bundle_helpers.py b/tests/test_utils/bundle_helpers.py index ddb0c5e..41069f8 100644 --- a/tests/test_utils/bundle_helpers.py +++ b/tests/test_utils/bundle_helpers.py @@ -199,7 +199,10 @@ def create_minimal_kubeconfig( } ], "contexts": [ - {"name": "test-context", "context": {"cluster": "test-cluster", "user": "test-user"}} + { + "name": "test-context", + "context": {"cluster": "test-cluster", "user": "test-user"}, + } ], "current-context": "test-context", "users": [{"name": "test-user", "user": {"token": "test-token-12345"}}], diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index c9076a2..0e03404 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -39,7 +39,9 @@ def assert_attributes_exist(obj: Any, attributes: List[str]) -> None: @staticmethod def assert_api_response_valid( - response: List[Any], expected_type: str = "text", contains: Optional[List[str]] = None + response: List[Any], + expected_type: str = "text", + contains: Optional[List[str]] = None, ) -> None: """ Assert that an MCP API response is valid and contains expected content. diff --git a/tests/unit/test_bundle.py b/tests/unit/test_bundle.py index b2b84cf..e480803 100644 --- a/tests/unit/test_bundle.py +++ b/tests/unit/test_bundle.py @@ -258,7 +258,10 @@ async def test_bundle_manager_download_replicated_url_success_sbctl_token( mock_get_call = mock_httpx_constructor.return_value.__aenter__.return_value.get mock_get_call.assert_awaited_once_with( REPLICATED_API_URL, - headers={"Authorization": "sbctl_token_value", "Content-Type": "application/json"}, + headers={ + "Authorization": "sbctl_token_value", + "Content-Type": "application/json", + }, ) # Verify aiohttp call for actual download @@ -322,7 +325,10 @@ async def test_bundle_manager_download_replicated_url_token_precedence( with patch.dict( os.environ, # === START MODIFICATION === - {"SBCTL_TOKEN": "sbctl_token_value", "REPLICATED": "replicated_token_value"}, + { + "SBCTL_TOKEN": "sbctl_token_value", + "REPLICATED": "replicated_token_value", + }, # === END MODIFICATION === clear=True, ): @@ -332,7 +338,10 @@ async def test_bundle_manager_download_replicated_url_token_precedence( mock_get_call = mock_httpx_constructor.return_value.__aenter__.return_value.get mock_get_call.assert_awaited_once_with( REPLICATED_API_URL, - headers={"Authorization": "sbctl_token_value", "Content-Type": "application/json"}, + headers={ + "Authorization": "sbctl_token_value", + "Content-Type": "application/json", + }, ) @@ -406,7 +415,9 @@ async def test_bundle_manager_download_replicated_url_api_404(mock_httpx_client) @pytest.mark.asyncio -async def test_bundle_manager_download_replicated_url_api_other_error(mock_httpx_client): +async def test_bundle_manager_download_replicated_url_api_other_error( + mock_httpx_client, +): """Test error handling for other Replicated API errors.""" mock_httpx_constructor, mock_response = mock_httpx_client # === START MODIFICATION === @@ -430,7 +441,9 @@ async def test_bundle_manager_download_replicated_url_api_other_error(mock_httpx @pytest.mark.asyncio -async def test_bundle_manager_download_replicated_url_missing_signed_uri(mock_httpx_client): +async def test_bundle_manager_download_replicated_url_missing_signed_uri( + mock_httpx_client, +): """Test error handling when 'signedUri' is missing from API response.""" mock_httpx_constructor, mock_response = mock_httpx_client # Configure for success status but missing key in the nested JSON @@ -511,7 +524,9 @@ async def test_bundle_manager_download_non_replicated_url(mock_aiohttp_download) @pytest.mark.asyncio -async def test_bundle_manager_download_bundle(mock_aiohttp_download): # Use fixture as argument +async def test_bundle_manager_download_bundle( + mock_aiohttp_download, +): # Use fixture as argument """Test that the bundle manager can download a non-Replicated bundle.""" # Unpack the fixture results mock_aiohttp_constructor, mock_aio_session, mock_aio_response = mock_aiohttp_download diff --git a/tests/unit/test_bundle_cleanup_dependencies.py b/tests/unit/test_bundle_cleanup_dependencies.py index e53afb3..22997fb 100644 --- a/tests/unit/test_bundle_cleanup_dependencies.py +++ b/tests/unit/test_bundle_cleanup_dependencies.py @@ -90,7 +90,11 @@ async def test_bundle_cleanup_functional_dependency_validation(): error_msg = str(e) if any( indicator in error_msg.lower() - for indicator in ["no such file or directory", "command not found", "not found"] + for indicator in [ + "no such file or directory", + "command not found", + "not found", + ] ): pytest.fail( f"❌ TDD SUCCESS: Detected potential missing dependency!\n" diff --git a/tests/unit/test_components.py b/tests/unit/test_components.py index c5d1b37..edd316a 100755 --- a/tests/unit/test_components.py +++ b/tests/unit/test_components.py @@ -135,7 +135,10 @@ async def test_kubectl_execution(mock_command_environment, fixtures_dir): # Test a simple kubectl command that should work # First, verify that kubectl is in the PATH with a direct subprocess call proc = await asyncio.create_subprocess_exec( - "which", "kubectl", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + "which", + "kubectl", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await proc.communicate() assert proc.returncode == 0, "kubectl should be available in PATH" diff --git a/tests/unit/test_curl_dependency_reproduction.py b/tests/unit/test_curl_dependency_reproduction.py index 282399d..22ad344 100644 --- a/tests/unit/test_curl_dependency_reproduction.py +++ b/tests/unit/test_curl_dependency_reproduction.py @@ -76,7 +76,11 @@ def __init__(self): def record_subprocess_call(self, *args, **kwargs) -> None: """Record a subprocess call for analysis.""" - call_info = {"args": args, "kwargs": kwargs, "command": args[0] if args else None} + call_info = { + "args": args, + "kwargs": kwargs, + "command": args[0] if args else None, + } self.subprocess_calls.append(call_info) def record_curl_error(self, error: Exception) -> None: @@ -266,7 +270,8 @@ async def mock_subprocess_immediate_failure(*args, **kwargs): ) with patch( - "asyncio.create_subprocess_exec", side_effect=mock_subprocess_immediate_failure + "asyncio.create_subprocess_exec", + side_effect=mock_subprocess_immediate_failure, ): # The curl dependency failure should occur immediately, before any timeout result = await bundle_manager.check_api_server_available() @@ -335,7 +340,8 @@ async def mock_subprocess_env_specific(*args, **kwargs): ) with patch( - "asyncio.create_subprocess_exec", side_effect=mock_subprocess_env_specific + "asyncio.create_subprocess_exec", + side_effect=mock_subprocess_env_specific, ): result = await bundle_manager.check_api_server_available() diff --git a/tests/unit/test_files.py b/tests/unit/test_files.py index 7f28645..96968c6 100644 --- a/tests/unit/test_files.py +++ b/tests/unit/test_files.py @@ -633,7 +633,10 @@ async def test_file_explorer_grep_files_per_file_limiting(): # Search with max_results_per_file=3 result = await explorer.grep_files( - pattern="pattern", path="many_matches.txt", max_results_per_file=3, max_files=10 + pattern="pattern", + path="many_matches.txt", + max_results_per_file=3, + max_files=10, ) # Should only get 3 matches from the file (per-file limit) diff --git a/tests/unit/test_files_parametrized.py b/tests/unit/test_files_parametrized.py index 6e7ed03..ebeb305 100644 --- a/tests/unit/test_files_parametrized.py +++ b/tests/unit/test_files_parametrized.py @@ -50,7 +50,11 @@ # Valid cases ("dir1", False, True), ("dir1/subdir", True, True), - ("absolute/path", True, True), # Note: without leading slash - the validator removes it + ( + "absolute/path", + True, + True, + ), # Note: without leading slash - the validator removes it # Invalid cases ("", False, False), # Empty path ("../outside", False, False), # Path traversal @@ -147,7 +151,15 @@ def test_read_file_args_validation_parametrized(path, start_line, end_line, expe [ # Valid cases ("test", "dir1", True, "*.txt", False, 100, True), - ("complex.pattern", ".", True, None, True, 50, True), # Use "." for root directory + ( + "complex.pattern", + ".", + True, + None, + True, + 50, + True, + ), # Use "." for root directory ("foo", "dir1/subdir", False, "*.log", False, 10, True), # Invalid cases ("", "dir1", True, "*.txt", False, 100, False), # Empty pattern diff --git a/tests/unit/test_kubectl_parametrized.py b/tests/unit/test_kubectl_parametrized.py index 840242c..feb4bf5 100644 --- a/tests/unit/test_kubectl_parametrized.py +++ b/tests/unit/test_kubectl_parametrized.py @@ -199,7 +199,12 @@ async def test_kubectl_command_execution_parameters(command, expected_args, add_ ], ) async def test_kubectl_error_handling( - return_code, stdout_content, stderr_content, expected_exit_code, should_raise, test_factory + return_code, + stdout_content, + stderr_content, + expected_exit_code, + should_raise, + test_factory, ): """ Test that the kubectl executor handles errors correctly. diff --git a/tests/unit/test_netstat_dependency.py b/tests/unit/test_netstat_dependency.py index 398dedc..8048f40 100644 --- a/tests/unit/test_netstat_dependency.py +++ b/tests/unit/test_netstat_dependency.py @@ -105,7 +105,9 @@ async def test_bundle_network_diagnostic_triggers_netstat_error(): # The bundle.py code uses netstat in network diagnostic checks # Let's manually call the subprocess that would be used - from mcp_server_troubleshoot.subprocess_utils import subprocess_exec_with_cleanup + from mcp_server_troubleshoot.subprocess_utils import ( + subprocess_exec_with_cleanup, + ) try: # This is the exact call pattern from bundle.py line 1871 @@ -161,7 +163,9 @@ async def test_port_checking_functionality_without_netstat(): os.environ["PATH"] = "/tmp" # Path that definitely won't have netstat # First, demonstrate that the current netstat approach fails - from mcp_server_troubleshoot.subprocess_utils import subprocess_exec_with_cleanup + from mcp_server_troubleshoot.subprocess_utils import ( + subprocess_exec_with_cleanup, + ) netstat_failed = False netstat_error = None diff --git a/tests/unit/test_python313_transport_issue.py b/tests/unit/test_python313_transport_issue.py index e099bec..49564ed 100644 --- a/tests/unit/test_python313_transport_issue.py +++ b/tests/unit/test_python313_transport_issue.py @@ -66,7 +66,9 @@ def warning_capture(message, category, filename, lineno, file=None, line=None): try: # Use the current subprocess_utils extensively - from mcp_server_troubleshoot.subprocess_utils import subprocess_exec_with_cleanup + from mcp_server_troubleshoot.subprocess_utils import ( + subprocess_exec_with_cleanup, + ) # Create many subprocess operations to stress test the transport handling for i in range(25): # Large number to stress the system diff --git a/tests/unit/test_server.py b/tests/unit/test_server.py index e996d12..e0a399c 100644 --- a/tests/unit/test_server.py +++ b/tests/unit/test_server.py @@ -318,7 +318,10 @@ async def test_file_operations(tmp_path: Path) -> None: from mcp_server_troubleshoot.files import ReadFileArgs read_args = ReadFileArgs( - path="test_data/dir1/file1.txt", start_line=0, end_line=2, verbosity="verbose" + path="test_data/dir1/file1.txt", + start_line=0, + end_line=2, + verbosity="verbose", ) read_response = await read_file(read_args) diff --git a/tests/unit/test_server_parametrized.py b/tests/unit/test_server_parametrized.py index 898377e..692e304 100644 --- a/tests/unit/test_server_parametrized.py +++ b/tests/unit/test_server_parametrized.py @@ -29,7 +29,11 @@ import pytest -from mcp_server_troubleshoot.bundle import BundleManagerError, BundleManager, BundleMetadata +from mcp_server_troubleshoot.bundle import ( + BundleManagerError, + BundleManager, + BundleMetadata, +) from mcp_server_troubleshoot.files import ( FileContentResult, FileInfo, @@ -62,15 +66,28 @@ "source,force,api_available,expected_strings", [ # Success case - all good - ("test_bundle.tar.gz", False, True, ["Bundle initialized successfully", "test_bundle"]), + ( + "test_bundle.tar.gz", + False, + True, + ["Bundle initialized successfully", "test_bundle"], + ), # Success case - force initialization - ("test_bundle.tar.gz", True, True, ["Bundle initialized successfully", "test_bundle"]), + ( + "test_bundle.tar.gz", + True, + True, + ["Bundle initialized successfully", "test_bundle"], + ), # Warning case - API server not available ( "test_bundle.tar.gz", False, False, - ["Bundle initialized but API server is NOT available", "kubectl commands may fail"], + [ + "Bundle initialized but API server is NOT available", + "kubectl commands may fail", + ], ), ], ids=[ @@ -281,7 +298,10 @@ async def test_kubectl_tool_parametrized( from mcp_server_troubleshoot.kubectl import KubectlCommandArgs args = KubectlCommandArgs( - command=command, timeout=timeout, json_output=json_output, verbosity="verbose" + command=command, + timeout=timeout, + json_output=json_output, + verbosity="verbose", ) # Call the tool function @@ -616,7 +636,11 @@ async def test_file_operations_error_handling( mock_get_explorer.return_value = file_explorer # Test all three file operations with the same error - from mcp_server_troubleshoot.files import ListFilesArgs, ReadFileArgs, GrepFilesArgs + from mcp_server_troubleshoot.files import ( + ListFilesArgs, + ReadFileArgs, + GrepFilesArgs, + ) # 1. Test list_files list_args = ListFilesArgs(path="test/path", recursive=False, verbosity="verbose") @@ -653,11 +677,19 @@ async def test_file_operations_error_handling( "include_invalid,bundles_available,expected_strings", [ # With bundles available - (False, True, ["support-bundle-1.tar.gz", "Usage Instructions", "initialize_bundle"]), + ( + False, + True, + ["support-bundle-1.tar.gz", "Usage Instructions", "initialize_bundle"], + ), # No bundles available (False, False, ["No support bundles found", "download or transfer a bundle"]), # With invalid bundles included - (True, True, ["support-bundle-1.tar.gz", "validation_message", "initialize_bundle"]), + ( + True, + True, + ["support-bundle-1.tar.gz", "validation_message", "initialize_bundle"], + ), ], ids=[ "with-bundles", diff --git a/tests/unit/test_size_limiter.py b/tests/unit/test_size_limiter.py new file mode 100644 index 0000000..c071b63 --- /dev/null +++ b/tests/unit/test_size_limiter.py @@ -0,0 +1,212 @@ +""" +Unit tests for the SizeLimiter class. + +Focused test coverage for the SizeLimiter component with essential tests only. +Covers core functionality without excessive redundancy. +""" + +import os +import pytest +from mcp_server_troubleshoot.size_limiter import SizeLimiter + +# Mark all tests in this file as unit tests +pytestmark = pytest.mark.unit + + +@pytest.fixture +def mock_environment(): + """Provide a clean environment for testing environment variable configurations.""" + original_env = os.environ.copy() + # Clear relevant environment variables + env_vars = ["MCP_TOKEN_LIMIT", "MCP_SIZE_CHECK_ENABLED"] + for var in env_vars: + if var in os.environ: + del os.environ[var] + + yield + + # Restore original environment + os.environ.clear() + os.environ.update(original_env) + + +# Core token estimation tests +@pytest.mark.parametrize( + "text,expected_tokens", + [ + ("", 0), # Empty string + ("test", 1), # 4 chars = 1 token + ("abcde", 1), # 5 chars rounds down to 1 token + ("a" * 100, 25), # 100 chars = 25 tokens + ("a" * 1000, 250), # 1000 chars = 250 tokens + ], +) +def test_token_estimation_accuracy(text, expected_tokens): + """ + Test token estimation accuracy using ~4 characters per token approximation. + """ + size_limiter = SizeLimiter() + estimated_tokens = size_limiter.estimate_tokens(text) + assert estimated_tokens == expected_tokens + + +# Size limit threshold testing +@pytest.mark.parametrize( + "token_count,limit,should_pass", + [ + (1000, 25000, True), # Well under limit + (25000, 25000, True), # Exactly at limit + (25001, 25000, False), # Just over limit + (50000, 25000, False), # Well over limit + ], +) +def test_size_limit_thresholds(token_count, limit, should_pass): + """ + Test size limit threshold detection for boundary conditions. + """ + size_limiter = SizeLimiter(token_limit=limit) + # Create text with approximately the target token count + text = "x" * (token_count * 4) + result = size_limiter.check_size(text) + assert result == should_pass + + +# Environment variable configuration tests +@pytest.mark.parametrize( + "env_value,expected_limit", + [ + ("10000", 10000), # Custom limit + (None, 25000), # Default when not set + ], +) +def test_mcp_token_limit_environment_variable(mock_environment, env_value, expected_limit): + """ + Test MCP_TOKEN_LIMIT environment variable configuration. + """ + if env_value is not None: + os.environ["MCP_TOKEN_LIMIT"] = str(env_value) + + size_limiter = SizeLimiter() + assert size_limiter.token_limit == expected_limit + + +@pytest.mark.parametrize( + "env_value,expected_enabled", + [ + ("true", True), # Enabled + ("false", False), # Disabled + (None, True), # Default when not set + ], +) +def test_mcp_size_check_enabled_environment_variable(mock_environment, env_value, expected_enabled): + """ + Test MCP_SIZE_CHECK_ENABLED environment variable configuration. + """ + if env_value is not None: + os.environ["MCP_SIZE_CHECK_ENABLED"] = str(env_value) + + size_limiter = SizeLimiter() + assert size_limiter.enabled == expected_enabled + + +# Edge case tests +@pytest.mark.parametrize( + "text,description", + [ + ("", "empty string"), + ("Hello 世界 🌍 Émoji", "mixed Unicode"), + ("A" * 10000, "large text"), + ("Line 1\nLine 2\nLine 3", "multiline text"), + ], +) +def test_edge_cases(text, description): + """ + Test SizeLimiter with edge cases and unusual input. + """ + size_limiter = SizeLimiter() + + # Should not raise any exceptions + tokens = size_limiter.estimate_tokens(text) + result = size_limiter.check_size(text) + + # Basic sanity checks + assert isinstance(tokens, int), f"Token count should be integer for {description}" + assert tokens >= 0, f"Token count should be non-negative for {description}" + assert isinstance(result, bool), f"Check size should be boolean for {description}" + + +# Integration test for complete functionality +def test_size_limiter_complete_workflow(): + """ + Test complete SizeLimiter workflow from initialization to size checking. + """ + size_limiter = SizeLimiter() + + # Test basic functionality + small_text = "test" + large_text = "X" * 100004 # Large text that exceeds default limit (25001 tokens) + + # Small text should pass + assert size_limiter.check_size(small_text) + assert size_limiter.estimate_tokens(small_text) == 1 + + # Large text should fail + assert not size_limiter.check_size(large_text) + assert size_limiter.estimate_tokens(large_text) == 25001 + + +def test_size_limiter_with_disabled_checking(mock_environment): + """ + Test SizeLimiter behavior when size checking is disabled. + """ + os.environ["MCP_SIZE_CHECK_ENABLED"] = "false" + + size_limiter = SizeLimiter() + assert not size_limiter.enabled + + # Even very large content should pass when disabled + large_text = "X" * 100000 + assert size_limiter.check_size(large_text) + + +def test_size_limiter_overflow_summary(): + """ + Test overflow summary generation for large content. + """ + size_limiter = SizeLimiter() + large_text = "A" * 1000 + + summary = size_limiter.get_overflow_summary(large_text, max_preview_chars=100) + + assert "Content size limit exceeded" in summary + assert "250" in summary # Token count + assert "25000" in summary # Token limit + assert "1000" in summary # Character count + + +def test_size_limiter_initialization_with_custom_limit(): + """ + Test SizeLimiter initialization with custom token limits. + """ + # Test with custom limit + custom_limiter = SizeLimiter(token_limit=5000) + assert custom_limiter.token_limit == 5000 + + # Test with default limit + default_limiter = SizeLimiter() + assert default_limiter.token_limit == 25000 + + +def test_size_limiter_factory_function(): + """ + Test the get_size_limiter factory function. + """ + from mcp_server_troubleshoot.size_limiter import get_size_limiter + + # Test with custom limit + limiter = get_size_limiter(token_limit=10000) + assert limiter.token_limit == 10000 + + # Test with default + default_limiter = get_size_limiter() + assert default_limiter.token_limit == 25000 diff --git a/tests/util/debug_mcp.py b/tests/util/debug_mcp.py index 13b4112..e1329f6 100755 --- a/tests/util/debug_mcp.py +++ b/tests/util/debug_mcp.py @@ -52,7 +52,12 @@ def read_stderr(): try: # Send a simple request - request = {"jsonrpc": "2.0", "id": "1", "method": "get_tool_definitions", "params": {}} + request = { + "jsonrpc": "2.0", + "id": "1", + "method": "get_tool_definitions", + "params": {}, + } request_str = json.dumps(request) + "\n" print(f"Sending request: {request_str.strip()}")